In [None]:
# 二値分類の場合は'binary、'多クラス分類の場合は'categorical'、回帰の場合は'raw'
mode = "binary"

# 教師データが含まれる列の名前
class_label = "class"

# 画像のサイズ
img_size = 100

# バッチサイズ
batch_size = 16

# 使用するワーカーの数（PCによって異なる）
worker = 3

In [None]:
# ライブラリを読み込む
import os
import time
import datetime
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import cv2
from tqdm import tqdm
from scipy.stats import pearsonr

# TensorFlowとKeras関連のインポート
import keras
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model, model_from_json,
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Input, Dropout, Flatten, Dense, BatchNormalization
from tensorflow.keras.activations import softmax, relu
from tensorflow.keras.optimizers import Adam, Nadam, RMSprop
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# sklearnから評価指標をインポート
from sklearn.metrics import mean_squared_error, r2_score


In [None]:
# csvの読み込み
class dataLoad():
    def __init__(self, train_csv_path, val_csv_path, test_csv_path, class_label="class2"):
        self.train_csv_path = train_csv_path
        self.val_csv_path = val_csv_path
        self.test_csv_path = test_csv_path
        self.class_label = class_label

    def setup(self):
        df_train = pd.read_csv(self.train_csv_path)
        df_val = pd.read_csv(self.val_csv_path)
        df_test = pd.read_csv(self.test_csv_path)
        return df_train, df_val, df_test

DL = dataLoad('csv/traincsv', 
              'csv/val.csv', 
              'csv/test.csv', 
              class_label=class_label)

train, val, test = DL.setup()

In [None]:
# 欠損値の削除
def cl_str(train, val, test):
    train = train.dropna()
    val = val.dropna()
    test = test.dropna()
    return train, val, test

train, val, test = cl_str(train, val, test)

In [None]:
# csvを使用して、画像を読み込みます。

class generateGen():
    def __init__(self, train_csv, val_csv, test_csv,
                 train_img_path, val_img_path, test_img_path,
                 img_size=200, batch_size=64, class_label="class2", mode="binary"):
        self.train_csv = train_csv
        self.val_csv = val_csv
        self.test_csv = test_csv
        self.train_img_path = train_img_path
        self.val_img_path = val_img_path
        self.test_img_path = test_img_path
        self.img_size = img_size
        self.batch_size = batch_size
        self.class_label = class_label
        self.mode = mode


    def datagen(self, df, path):
        df.img_path = df.img_path.apply(lambda x: x)

        if self.mode=="binary":
            df[self.class_label] = df[self.class_label].astype('str')
        elif self.mode == "categorical":
            df[self.class_label] = df[self.class_label].astype('str')

        datagen = image.ImageDataGenerator(rescale=1./255,
                                        #    horizontal_flip=True,
                                        #    vertical_flip=True,
                                        #    rotation_range=360,
                                        #    preprocessing_function=preprocess_image,
                                           )


        generator=datagen.flow_from_dataframe(dataframe=df,
                                              directory=path,
                                              x_col="img_path",
                                              y_col= self.class_label,
                                              batch_size=self.batch_size,
                                              class_mode=self.mode,
                                              target_size=(self.img_size, self.img_size),
                                              shuffle=False,
                                              seed=2525,
                                             )


        NUB_STEPS=generator.n//generator.batch_size

        return generator, NUB_STEPS

    def test_datagen(self, df, path):
        df.img_path = df.img_path.apply(lambda x: x)
        if self.mode == "binary":
            df[self.class_label] = df[self.class_label].astype('str')
        elif self.mode == "categorical":
            df[self.class_label] = df[self.class_label].astype('str')

        datagen = image.ImageDataGenerator(rescale=1./255,
                                           # horizontal_flip=True,
                                           # vertical_flip=True,
                                           # rotation_range=360,
                                        #    preprocessing_function=preprocess_image,
                                           )

        test_generator = datagen.flow_from_dataframe(dataframe=df,
                                                directory=path,
                                                x_col="img_path",
                                                y_col=self.class_label,
                                                batch_size=self.batch_size,
                                                class_mode=self.mode,
                                                target_size=(self.img_size, self.img_size),
                                                shuffle=False,
                                                seed=2525,
                                                )

        testNUB_STEPS = test_generator.n // test_generator.batch_size

        return test_generator, testNUB_STEPS

    def setup(self):
        train_generator, train_steps = self.datagen(self.train_csv, self.train_img_path)
        val_generator, val_steps = self.datagen(self.val_csv, self.val_img_path)
        test_generator, test_steps = self.test_datagen(self.test_csv, self.test_img_path)

        return  train_generator, train_steps, val_generator, val_steps, test_generator, test_steps

gen = generateGen(train , val, test,
                  train_img_path,
                  val_img_path,
                  test_img_path,
                  img_size=img_size, 
                  batch_size=batch_size, 
                  class_label=class_label,
                  mode=mode)


train_generator, train_steps, val_generator, val_steps, test_generator, test_steps = gen.setup()

In [None]:
# モデルの構築

class CreateModel():
    def __init__(self, n_out, img_dim=200, mode="class"):
        self.n_out = n_out
        self.img_dim = img_dim
        self.mode = mode

    def setParametor(self):
        if self.mode == "binary":
            activation = 'sigmoid'
            loss_function = keras.losses.binary_crossentropy
            metrics = "accuracy"
        elif self.mode == "categorical":
            activation = 'softmax'
            loss_function = keras.losses.categorical_crossentropy
            metrics = "accuracy"
        elif self.mode == "raw":
            activation = "linear"
            loss_function = keras.losses.mean_squared_error
            metrics = "mae"
        else:
            print("modeが間違っています。")
        return activation, loss_function, metrics

    def resnet50(self):
        activation, loss_function, metrics = self.setParametor()
        input_tensor = Input(shape=(self.img_dim, self.img_dim, 3))

        base_model = ResNet50(weights="imagenet", include_top=False, input_tensor=input_tensor)

        x = GlobalAveragePooling2D()(base_model.output)
        x = Dropout(0.3)(x)
        x = Dense(512, activation=relu)(x)
        x = Dropout(0.2)(x)
        x = Dense(128, activation=relu)(x)
        x = Dropout(0.2)(x)
        x = BatchNormalization()(x)
        output_layer = Dense(self.n_out, activation=activation, name="Output_Layer")(x)
        model = Model(input_tensor, output_layer)

        for layers in model.layers:
            layers.trainable = True

        lr = 0.01
        optimizer=Adam(lr=lr)
        model.compile(optimizer=optimizer, loss=loss_function,  metrics=[metrics])
        return model
      
    def MCresnet50(self):
        activation, loss_function, metrics = self.setParametor()
        input_tensor = Input(shape=(self.img_dim, self.img_dim, 3))

        base_model = ResNet50(weights="imagenet", include_top=False, input_tensor=input_tensor)

        x = GlobalAveragePooling2D()(base_model.output)
        x = Dropout(0.5)(x, training=True)
        x = Dense(512, activation=relu)(x)
        x = Dropout(0.5)(x, training=True)
        x = Dense(128, activation=relu)(x)
        x = Dropout(0.5)(x, training=True)
        x = BatchNormalization()(x)
        output_layer = Dense(self.n_out, activation=activation, name="Output_Layer")(x)
        model = Model(input_tensor, output_layer)

        for layers in model.layers:
            layers.trainable = True

        lr = 0.001
        optimizer=Adam(lr=lr)
        model.compile(optimizer=optimizer, loss=loss_function,  metrics=[metrics])
        return model


    def vgg16(self):
        activation, loss_function, metrics = self.setParametor()

        input_tensor = Input(shape=(self.img_dim, self.img_dim, 3))
        base_model = VGG16(weights="imagenet", include_top=False, input_tensor=input_tensor)

        x = GlobalAveragePooling2D()(base_model.output)
        x = Dropout(0.3)(x)
        x = Dense(512, activation=relu)(x)
        x = Dropout(0.2)(x)
        x = Dense(128, activation=relu)(x)
        x = Dropout(0.2)(x)
        x = BatchNormalization()(x)
        output_layer = Dense(self.n_out, activation=activation, name="Output_Layer")(x)
        model = Model(input_tensor, output_layer)

        for layers in model.layers:
            layers.trainable = True

        lr = 0.001
        optimizer=Adam(lr=lr,decay=0.1)
        model.compile(optimizer=optimizer, loss=loss_function,  metrics=[metrics])
        return model

ml = CreateModel(n_out=1, img_dim=img_size, mode=mode)
model = ml.MCresnet50()

In [None]:
# 訓練開始
epoch = 2

dt_now = datetime.datetime.now()
new_folder = str(dt_now.year) + str(dt_now.month) + str(dt_now.day)
if not os.path.exists(new_folder):
    os.mkdir(new_folder)

ES = EarlyStopping(monitor='val_loss',
                            min_delta=0.0001,
                            patience=3,
                            verbose=1,
                            mode='auto')

reduce_lr  = ReduceLROnPlateau(monitor='val_loss',
                                min_delta=0.0004,
                                patience=2,
                                factor=0.1,
                                min_lr=1e-6,
                                mode='auto',
                                verbose=1)

modelCheckpoint = ModelCheckpoint(filepath=new_folder + '/model_{epoch:02d}.h5',
                                  monitor='val_loss',
                                  verbose=1,
                                  save_best_only=True,
                                  save_weights_only=False,
                                  )

history = model.fit(x=train_generator,
                    steps_per_epoch=train_steps,
                    validation_data=val_generator,
                    validation_steps=val_steps,
                    epochs=epoch,
                    callbacks=[reduce_lr, modelCheckpoint, ES],
                    verbose=1,
                    workers=worker,
                    )

A = pd.DataFrame(history.history)
A.to_csv(new_folder + '/acc_loss.csv')

open(new_folder + '/model.json', "w").write(model.to_json())
model.save_weights(new_folder + "/model.h5")

if mode == "binary":
    acc=history.history['accuracy']
    val_acc=history.history['val_accuracy']

    plt.plot(acc,label="Accuracy")
    plt.plot(val_acc)
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend(['Acc','val_acc'])
    plt.plot( np.argmax(history.history["val_accuracy"]), np.max(history.history["val_accuracy"]), marker="x", color="r", label="best model")
    plt.plot(history.history["loss"], label="loss")
    plt.plot(history.history["val_loss"], label="val_loss")
    plt.plot( np.argmin(history.history["val_loss"]), np.min(history.history["val_loss"]), marker="x", color="r", label="best model")
    plt.xlabel("Epochs")
    plt.ylabel("log_loss")
    plt.savefig(new_folder + "/loss.png")
    plt.legend()


elif mode == "categorical":
    acc=history.history['accuracy']
    val_acc=history.history['val_accuracy']

    plt.plot(acc,label="Accuracy")
    plt.plot(val_acc)
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend(['Acc','val_accuracy'])
    plt.plot( np.argmax(history.history["val_accuracy"]), np.max(history.history["val_accuracy"]), marker="x", color="r", label="best model")
    plt.savefig(new_folder + "/acc.png")
    plt.show()

    # plt.figure(figsize=(8, 8))
    plt.title("Learning curve")
    plt.plot(history.history["loss"], label="loss")
    plt.plot(history.history["val_loss"], label="val_loss")
    plt.plot( np.argmin(history.history["val_loss"]), np.min(history.history["val_loss"]), marker="x", color="r", label="best model")
    plt.xlabel("Epochs")
    plt.ylabel("log_loss")
    plt.savefig(new_folder + "/loss.png")
    plt.legend()


elif mode == "raw":
    # plt.figure(figsize=(8, 8))
    plt.title("Learning curve")
    plt.plot(history.history["loss"], label="loss")
    plt.plot(history.history["val_loss"], label="val_loss")
    plt.plot( np.argmin(history.history["val_loss"]), np.min(history.history["val_loss"]), marker="x", color="r", label="best model")
    plt.xlabel("Epochs")
    plt.ylabel("log_loss")
    plt.savefig(new_folder + "/loss.png")
    plt.legend()




In [None]:
path = './モデル'
model_path = 'model.h5'

json_file = open(path + "/model.json", 'r')
loaded_model_json = json_file.read()
json_file.close()
model = model_from_json(loaded_model_json)

if not os.path.exists(path):
    os.mkdir(path)

model.load_weights(path + '/' + model_path)

if mode == "binary":
    activation = 'sigmoid'
    loss_function = tf.keras.losses.binary_crossentropy
    metrics = "accuracy"
elif mode == "categorical":
    activation = 'softmax'
    loss_function = tf.keras.losses.categorical_crossentropy
    metrics = "accuracy"
elif mode == "raw":
    activation = "linear"
    loss_function = tf.keras.losses.mean_squared_error
    metrics = "mae"
else:
    print("modeが間違っています。")


lr = 0.0005
optimizer=Adam(lr=lr)
model.compile(optimizer=optimizer, loss=loss_function,  metrics=[metrics])


In [None]:
test = test.dropna()
test

In [None]:

preds = model.predict(x=test_generator, 
                          workers=worker)



pred2 = [np.argmax(pred) for pred in preds]
result = pd.DataFrame(columns = ['img_path', 'true', 'pred', 'pred5'])
test = test.dropna()
result['img_path'] = test["img_path"]
result['true'] = test['class'].astype('float').astype('int')
result['pred'] = preds
result['pred2'] = pred2


result.to_csv(path + '/test_result.csv')

from sklearn.metrics import confusion_matrix
cm = confusion_matrix(result["true"], result["pred2"])
print(cm)

plt.figure(figsize=(7, 5))
sns.set(font_scale=1.4)
ax = sns.heatmap(cm, annot=True, fmt="d", square=True)
ax.set_xlabel("predict")
ax.set_ylabel("true")
ax.set_ylim(2.0, 0)
plt.savefig(path + "/test_cm.png")
plt.show()

In [None]:
    
for i in tqdm(range(10)):
    pred = tqdm(model.predict(x=test_generator, steps=test_steps, workers=worker))
    y_label = pd.DataFrame(test['img_path'])
    y2 = pd.DataFrame(test[class_label])
    y2[class_label] = y2[class_label].astype('float').astype('int')
    pred2 = pd.DataFrame(pred)
    pred3 = round(pred2)
    pred3.columns = ['pred2']
    pred3['pred2'] = pred3['pred2'].astype('int')
    result = pd.concat([y_label, y2, pred2, pred3], axis=1)
    result.columns = ['Label', 'true', 'pred', 'pred2']
    result.to_csv(path + '/test_result_' + str(i) + '.csv')
    a = result.query('true == pred2')
    print('No', i)
    print((len(a) / len(result) * 100))

In [None]:
'''
raw
'''

preds = model.predict(x=test_generator, workers=worker)
prob = pd.DataFrame(preds, columns=['predict'])
prob = prob.reset_index(drop=True)

y_label = pd.DataFrame(test['img_path'])
y_label = y_label.reset_index(drop=True)
y2 = pd.DataFrame(test[class_label])
y2 = y2.reset_index(drop=True)

result = pd.concat([y_label, y2, prob], axis=1)
result.columns = ['Label', 'true', 'pred']

result.to_csv(path + '/test_result.csv', index=False)

plt.scatter(result['true'], result['pred'])
plt.legend(loc='upper right')
plt.ylabel("true")
plt.ylabel("predict")
plt.xlim(0, 50)
plt.ylim(0, 50)
plt.savefig(path + '/test_regression.png')
plt.show()


In [None]:
from sklearn.metrics import mean_squared_error
from scipy.stats import pearsonr
from sklearn.metrics import r2_score

print('RMSE テスト: %.2f' %(
    mean_squared_error(result['true'], result['pred'])
))

print('R^2 テスト: %.2f' %(
    r2_score(result['true'], result['pred'])
))