In [None]:
import os
import gc

import numpy as np
import keras
import tensorflow
import glob
import pandas as pd
#from tensorflow.keras.utils import np_utils
from PIL import Image
from sklearn.model_selection import StratifiedKFold
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.optimizers import Adam 


#from tensorflow.keras.models import Sequential
#from tensorflow.keras import layers, models
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer
from sklearn.utils import shuffle
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import MinMaxScaler
#from tensorflow.keras.applications import VGG16
import matplotlib.pyplot as plt

from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from sklearn.model_selection import cross_val_score, KFold
from sklearn.metrics import r2_score

from keras import backend as K
from keras.layers import Conv2D, Add, Input, BatchNormalization, Activation, MaxPooling2D, GlobalAveragePooling2D, Dense
from keras.models import Model

# -------------------------------------------------------------------------------------
#                        初期設定部
# -------------------------------------------------------------------------------------

from keras import backend as K
from keras.layers import Conv2D, Add, Input, BatchNormalization, Activation, MaxPooling2D, GlobalAveragePooling2D, Dense
from keras.models import Model

def shortcut(conv, residual, stride, filters):
    conv_shape = K.int_shape(conv)
    residual_shape = K.int_shape(residual)
    
    # フィルタ数またはストライドが異なる場合、ショートカット経路で1x1の畳み込みを使用
    if conv_shape != residual_shape:
        residual = Conv2D(filters=filters, kernel_size=(1, 1), strides=(stride, stride), padding="same")(residual)
        residual = BatchNormalization()(residual)
    return Add()([conv, residual])

def bottleneck_res_block(x, filters, stride=1):
    # Bottleneck architecture with three layers
    conv = Conv2D(filters=filters, kernel_size=(1, 1), strides=(stride, stride), padding="same", kernel_initializer='he_normal')(x)
    conv = BatchNormalization()(conv)
    conv = Activation("relu")(conv)
    
    conv = Conv2D(filters=filters, kernel_size=(3, 3), strides=(1, 1), padding="same", kernel_initializer='he_normal')(conv)
    conv = BatchNormalization()(conv)
    conv = Activation("relu")(conv)
    
    conv = Conv2D(filters=4*filters, kernel_size=(1, 1), strides=(1, 1), padding="same", kernel_initializer='he_normal')(conv)
    conv = BatchNormalization()(conv)
    
    short_cut = shortcut(conv, x, stride, 4*filters)
    conv = Activation("relu")(short_cut)
    return conv

class ResNet50:
    def __init__(self, input_shape, nb_classes):
        self.input_shape = input_shape
        self.nb_classes = nb_classes
        self.model = self.make_model()

    def make_model(self):
        inputs = Input(self.input_shape)
        x = Conv2D(filters=64, kernel_size=(7, 7), strides=(2, 2), padding="same", kernel_initializer='he_normal')(inputs)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        x = MaxPooling2D(pool_size=(3, 3), strides=(2, 2), padding="same")(x)

        # ResNet50 layers: 3-4-6-3 structure with bottleneck blocks
        x = bottleneck_res_block(x, 64)
        x = bottleneck_res_block(x, 64)
        x = bottleneck_res_block(x, 64)

        x = bottleneck_res_block(x, 128, stride=2)
        x = bottleneck_res_block(x, 128)
        x = bottleneck_res_block(x, 128)
        x = bottleneck_res_block(x, 128)

        x = bottleneck_res_block(x, 256, stride=2)
        x = bottleneck_res_block(x, 256)
        x = bottleneck_res_block(x, 256)
        x = bottleneck_res_block(x, 256)
        x = bottleneck_res_block(x, 256)
        x = bottleneck_res_block(x, 256)

        x = bottleneck_res_block(x, 512, stride=2)
        x = bottleneck_res_block(x, 512)
        x = bottleneck_res_block(x, 512)

        x = GlobalAveragePooling2D()(x)
        outputs = Dense(units=self.nb_classes, activation='linear')(x)
        ResNetModel = Model(inputs=inputs, outputs=outputs)
        return ResNetModel

def build(input_shape, nb_classes):
    return ResNet50(input_shape, nb_classes).model

model = ResNet50(input_shape=(224,224,3), nb_classes=1).model

# GrayScaleのときに1、COLORのときに3にする
COLOR_CHANNEL = 3

# 入力画像サイズ(画像サイズは正方形とする)
INPUT_IMAGE_SIZE = 224

# 訓練時のバッチサイズとエポック数
BATCH_SIZE = 32
EPOCH_NUM = 500

# 使用する訓練画像の入ったフォルダ(ルート)
TRAIN_PATH =  ""
# 使用する訓練画像の各クラスのフォルダ名
folder = [""]


# CLASS数を取得する
CLASS_NUM = len(folder)
print("クラス数 : " + str(CLASS_NUM))

#testを除いた中からvalidationの割合
validation_rate = 0.2

early_stopping = keras.callbacks.EarlyStopping(
   monitor='val_loss',
   min_delta=0,
   patience=100,
   verbose=0,
   mode='auto',
   baseline=None,
   restore_best_weights=True,
)

# 分割数を設定
k = 5

#分割交差検証の平均算出用
test_loss = [0]*k
test_mae = [0]*k
sum_mae = 0
sum_val_mae = 0
sum_test_mae = 0
sum_loss = 0
sum_val_loss = 0
sum_test_loss = 0
sum_r2 = 0

    # 各フォルダの画像を読み込む
v_image = []
v_label = []
for folder_name in folder:
    dir = os.path.join(TRAIN_PATH, folder_name)
    files = glob.glob(os.path.join(dir, "*.png"))
    print(dir)
    for file in files:
        if COLOR_CHANNEL == 1:
            img = load_img(file, color_mode="grayscale", target_size=(INPUT_IMAGE_SIZE, INPUT_IMAGE_SIZE))
        elif COLOR_CHANNEL == 3:
            img = load_img(file, color_mode="rgb", target_size=(INPUT_IMAGE_SIZE, INPUT_IMAGE_SIZE))
        array = img_to_array(img)
        v_image.append(array)
        v_label.append(int(folder_name))  # フォルダ名をラベルとして使用
v_image = np.array(v_image)
v_label = np.array(v_label)

# ラベルの確認
print(v_label)


# imageの画素値をint型からfloat型にする
v_image = v_image.astype('float32')
# 画素値を[0～255]⇒[0～1]とする
v_image = v_image / 255.0

# データの前処理
# ラベルを0から3の整数に変換
#label_binarizer = LabelBinarizer()
#v_label = label_binarizer.fit_transform(v_label)

# 正解ラベルの形式を変換
#v_label = np_utils.to_categorical(v_label, CLASS_NUM)

#正規化
v_label = v_label.reshape(-1,1)
scaler = MinMaxScaler()
v_label = scaler.fit_transform(v_label)



# ラベルの確認
print(v_label)


# 学習用データと検証用データに分割する
kf = KFold(n_splits=k, shuffle=True, random_state=0)
for fold_num, (train_index, test_index) in enumerate(kf.split(v_image)):
    print("Train:", train_index, "Test:", test_index)
    print(f"Fold {fold_num+1}:")
    # 訓練データとラベル
    train_data, train_labels = v_image[train_index], v_label[train_index]

    # テストデータとラベル
    test_data, test_labels = v_image[test_index], v_label[test_index]
    
    # 訓練データのラベル分布 
    train_label_counts = pd.Series(train_labels.ravel()).value_counts()
    # テストデータのラベル分布 
    test_label_counts = pd.Series(test_labels.ravel()).value_counts() 
    print(f"Train label distribution:\n{train_label_counts}") 
    print(f"Test label distribution:\n{test_label_counts}") 
    print("-" * 30)
    

    #labelbinarizer
    #train_labels = label_binarizer.fit_transform(train_labels)
    #test_labels = label_binarizer.fit_transform(test_labels)

    #シャッフル
    train_data, train_labels = shuffle(train_data, train_labels, random_state=0)

    model = ResNet50(input_shape=(224,224,3), nb_classes=1).model

    # モデルのコンパイル
    model.compile(optimizer=Adam(learning_rate=0.00005),
              loss='mean_squared_error',
              metrics=['mae'])

    # 訓練中に損失とMAEを記録するためのリスト
    history = model.fit(train_data, train_labels, validation_split = validation_rate, batch_size=BATCH_SIZE, epochs=EPOCH_NUM, callbacks = early_stopping)
    # 訓練中の損失とMAEの変化をプロット
    loss = history.history['loss']
    val_loss = history.history['val_loss']
    mae = history.history['mae']
    val_mae = history.history['val_mae']
    epochs = range(1, len(loss) + 1)

    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, 'b', label='Training Loss')
    plt.plot(epochs, val_loss, 'r', label='Validation Loss')
    plt.title('Training and Validation Loss')
    #plt.yscale('log')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.subplot(1, 2, 2)
    plt.plot(epochs, mae, 'b', label='Training MAE')
    plt.plot(epochs, val_mae, 'r', label='Validation MAE')
    plt.title('Training and Validation MAE')
    #plt.yscale('log')
    plt.xlabel('Epochs')
    plt.ylabel('MAE')
    plt.legend()
    plt.show()

    score = model.evaluate(test_data, test_labels, verbose=0)
    print(len(test_data))
    test_loss[fold_num]=score[0]
    test_mae[fold_num]=score[1]
    print('Loss:', score[0])
    print('Mae:', score[1])

    # 予測値と正解の散布図を作成
    y_pred = model.predict(test_data)

    sum_mae = sum_mae + mae[-1]
    sum_val_mae = sum_val_mae + val_mae[-1]
    sum_test_mae = sum_test_mae + score[1]
    sum_loss = sum_loss + loss[-1]
    sum_val_loss = sum_val_loss + val_loss[-1]
    sum_test_loss = sum_test_loss + score[0]
    sum_r2 = sum_r2 + r2_score(test_labels, y_pred)


    # 1次元に変換する
    #y_pred = y_pred.astype(np.int32)
    #y_pred = y_pred.flatten()
    #print("valid_labelsのデータ型:", type(test_labels[0]))
    #print("y_predのデータ型:", type(y_pred[0]))
    #print(test_labels)
    #print(len(test_labels))
    #print(y_pred)
    #print(len(y_pred))

    # 1次元に変換する
    #y_pred = y_pred.flatten()
    # 逆変換の例 
    y_pred_scale = scaler.inverse_transform(y_pred)
    test_labels_scale = scaler.inverse_transform(test_labels)
    plt.scatter(test_labels_scale, y_pred_scale)
    plt.xlabel('True Values')
    plt.ylabel('Predictions')
    plt.title('True Values vs. Predictions')
    plt.show()

    #R2scoreの算出
    print(f"r2:{r2_score(test_labels, y_pred)}")


ave_mae = sum_mae/k
ave_val_mae = sum_val_mae/k
ave_test_mae = sum_test_mae/k
ave_loss = sum_loss/k
ave_val_loss = sum_val_loss/k
ave_test_loss = sum_test_loss/k
ave_r2 = sum_r2/k

for j in range(k):
    print(f"hold{j+1}_test_loss:{test_loss[j]}")


for j in range(k):
    print(f"hold{j+1}_test_mae:{test_mae[j]}")


print(f"平均mae: {ave_mae}")
print(f"平均val_mae: {ave_val_mae}")
print(f"平均test_mae: {ave_test_mae}")
print(f"平均loss:{ave_loss}")
print(f"平均val_loss:{ave_val_loss}")
print(f"平均test_loss:{ave_test_loss}")
print(f"平均r2:{ave_r2}")

In [None]:
gc.collect()