<a href="https://colab.research.google.com/github/create-alt/CNN_for_Cifar10/blob/main/CNN_for_Cifar10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **CNNによるCifar-10分類**

環境：<br>
Google Colaboratory<br>
T4 GPU<br>
必要ライブラリ等はコード内で記述

# GPU確認とGoogleDrive接続

モデルの学習時にGPUが必要となるので接続を確認しておく。

In [None]:
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

学習した結果の重みをGoogleDriveに保存するため接続しておく。

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd {"/content/drive/MyDrive/重みを格納するフォルダ"}

# ライブラリのimportと定数・関数定義

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import pandas as pd
from skimage import io
from sklearn.model_selection import StratifiedKFold
import os
import cv2
import gc

import keras
import tensorflow as tf
from keras import backend as K
from keras.preprocessing.image import ImageDataGenerator,array_to_img,img_to_array
from keras.datasets import cifar10
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import BatchNormalization
from keras.layers import Conv2D, Conv3D, MaxPooling2D, MaxPooling3D, AveragePooling2D, AveragePooling3D

label_dict = {0:"飛行機", 1:"自動車", 2:"鳥", 3:"猫", 4:"鹿", 5:"犬", 6:"カエル", 7:"馬", 8:"船", 9:"トラック"} #label定義

#定数定義（class作成時にまとめられる）
NUM_CLASSES = 10
SEED=0

In [None]:
#データを読み込む
def load():
  (X_train,y_train),(X_test,y_test) = cifar10.load_data()
  print("default X_train's shape is",X_train.shape)
  print("default y_train's shape is",y_train.shape,"\n")

  # 入力データ[0, 1]の範囲に正規化
  X_train = X_train.astype('float32')
  X_test = X_test.astype('float32')
  # 255で割ったものを新たに変数とする->画素の最大が255なので、255で割ると[0, 1]になる
  X_train /= 255
  X_test /= 255

  #以下データ拡張

  #seed値固定
  tf.random.set_seed(SEED)

  params = {"zoom_range" : 0.3,
          "horizontal_flip" : True,
          "rotation_range" : 30,
          "height_shift_range": 0.1,
          "width_shift_range": 0.1,
          "channel_shift_range": 0.3}

  #データ拡張用オブジェクトの定義
  data_generator = keras.preprocessing.image.ImageDataGenerator(**params)

  # 同じ画像を複製する
  train_aug = X_train.copy()
  train_aug = np.concatenate([train_aug,train_aug])

  y_aug = np.concatenate([y_train,y_train])

  #データ拡張
  generator = data_generator.flow(train_aug, y_aug, batch_size=100000)

  # 変換後のデータを取得
  batch_x = generator.next()

  X_train = np.concatenate([X_train,batch_x[0]])
  y_train = np.concatenate([y_train,batch_x[1]])

  print(X_train.shape)

  return (X_train,y_train),(X_test,y_test)

#plot用関数(教材から引用)
def plot_history_of_model(model):
    # 学習をグラフ化（正解率）
    print(model.history.history)
    acc = model.history.history['accuracy']
    val_acc = model.history.history['val_accuracy']

    # Accuracy Plot
    plt.plot(acc)
    plt.plot(val_acc)
    plt.ylim(0.0,1.0)
    plt.title('Accuracy')
    plt.legend(['train', 'valid'], loc='upper left')
    plt.show()

    # Loss Plot
    loss = model.history.history['loss']
    val_loss = model.history.history['val_loss']
    plt.plot(loss ,label = 'training loss')
    plt.plot(val_loss, label= 'validation loss')
    #plt.ylim(0,3)
    plt.title('Training and Validation loss')
    plt.legend()
    plt.show()

#checkpointの設定
def create_checkpoint(path):
  #path(TensorFlow公式サイトより引用 https://www.tensorflow.org/tutorials/keras/save_and_load?hl=ja)

  #重みの格納場所(公式サイトでは.ckptを使用していたが安全性の面から.safetensorsを使用)
  checkpoint_path = "Weight_of_model/" + path + ".safetensors"
  checkpoint_dir = os.path.dirname(checkpoint_path)

  cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                  save_weights_only=True,
                                                  verbose=1)

  return checkpoint_path,cp_callback

#3次元データ用のニューラルネットワーク
def create_3Dmodel():
  #seed値固定
  tf.random.set_seed(SEED)
  # マスクを適応したカラー画像に関するモデル
  model = Sequential()

  model.add(Conv3D(64, kernel_size=(3, 3, 1),
                                padding="SAME",
                                activation='relu'))
  model.add(Dropout(0.3))
  model.add(Conv3D(64, (3, 3, 1), padding="SAME",activation='relu'))
  model.add(MaxPooling3D(pool_size=(3, 3, 1))) # max pooling layer
  model.add(BatchNormalization())
  model.add(Dropout(0.3))

  model.add(Conv3D(128, (3, 3, 1), padding="SAME" ,activation='relu'))
  model.add(Dropout(0.3))
  model.add(Conv3D(128, (3, 3, 1), padding="SAME" ,activation='relu'))
  model.add(MaxPooling3D(pool_size=(3, 3, 1))) # max pooling layer
  model.add(Dropout(0.3))

  model.add(Flatten())

  model.add(Dense(256, activation='relu')) # 全結合層
  model.add(Dense(NUM_CLASSES, activation='softmax')) #classごとに確率を出力

  # 損失関数,最適化関数,評価指標を指定してモデルをコンパイル->学習できる形にする
  model.compile(loss=keras.losses.categorical_crossentropy,
          #optimizer=keras.optimizers.Adadelta(),
          optimizer=tf.keras.optimizers.Adam(),
          metrics=['accuracy'])

  return model

#2次元データ用のニューラルネットワーク
def create_2Dmodel(input_size):
  #seed値固定
  tf.random.set_seed(SEED)
  # グレースケール化した画像に対するモデル
  model = Sequential()
  #addで層を追加する,↓だとconv2dを追加
  model.add(Conv2D(64, kernel_size=(3, 3),padding="SAME",
                        activation='relu',
                        input_shape=input_size)) # 3×3のカーネルサイズの2D Convolution layer
  model.add(Conv2D(64, (3, 3),padding="SAME", activation='relu'))
  model.add(Dropout(0.3))

  model.add(Conv2D(64, (3, 3),padding="SAME", activation='relu'))
  model.add(Dropout(0.3))
  model.add(Conv2D(64, (3, 3),padding="SAME", activation='relu'))
  model.add(MaxPooling2D(pool_size=(3, 3)))
  model.add(Dropout(0.3))

  model.add(Conv2D(128, (3,3),padding="SAME",activation='relu'))
  model.add(Conv2D(128, (3,3),padding="SAME",activation='relu'))
  model.add(Dropout(0.3))
  model.add(MaxPooling2D(pool_size=(3, 3)))

  model.add(Flatten())
  model.add(Dense(256, activation='relu')) # 全結合層
  model.add(Dense(NUM_CLASSES, activation='softmax'))

  # モデルの学習
  # 損失関数,最適化関数,評価指標を指定してモデルをコンパイル->学習できる形にする
  model.compile(loss=keras.losses.categorical_crossentropy,
                            #optimizer=keras.optimizers.Adadelta(),
                            optimizer=tf.keras.optimizers.Adam(),
                            metrics=['accuracy'])

  return model

# 分類モデルの構築と学習

In [None]:
(X_train,y_train),(X_test,y_test) = load()
#画像サイズ指定
img_rows, img_cols = X_train.shape[1], X_train.shape[2]

次のコードでは特に分類精度の低かった猫データについて追加拡張を行っている。

In [None]:
X_cat = X_train[np.where(y_train == 3)[0]]

tf.random.set_seed(SEED)

params = {"zoom_range" : 0.3,
        "horizontal_flip" : True,
        "rotation_range" : 30,
        "height_shift_range": 0.1,
        "width_shift_range": 0.1,
        "channel_shift_range": 0.3}

#データ拡張用オブジェクトの定義
data_generator = keras.preprocessing.image.ImageDataGenerator(**params)

# 同じ画像を複製する
train_aug = X_cat.copy()

y_aug = y_train[np.where(y_train == 3)[0]]

#データ拡張
generator = data_generator.flow(train_aug, y_aug, batch_size=15000)

# 変換後のデータを取得
batch_x = generator.next()

X_train = np.concatenate([X_train,batch_x[0]])
y_train = np.concatenate([y_train,batch_x[1]])

print(X_train.shape)
print(y_train.shape)

In [None]:
n_split=3 #いくつに分割するか（何度学習するか）今回は1度のみ学習します
kf = StratifiedKFold(n_split,shuffle=True,random_state=0)
split_index=[[] for i in range(n_split)] #分割後のindexを格納するためのlist

#train_indexは学習用データのindex,valid_indexはtest用データのindex(X,y共用)
for fold,(train_index, valid_index) in enumerate(kf.split(X_train,y_train)):
    split_index[fold] = [train_index,valid_index]

print(split_index)

In [None]:
#データをkerasが読み取れる形式に変換する(one-hotベクトル)
y_train = keras.utils.to_categorical(y_train, NUM_CLASSES)
y_test = keras.utils.to_categorical(y_test, NUM_CLASSES)
print("y_train's shape after change to one-hot vec is",y_train.shape)

In [None]:
X_train = np.expand_dims(X_train, axis=-1)  # 最後の次元にチャンネル数を追加
X_test = np.expand_dims(X_test, axis=-1)  # 最後の次元にチャンネル数を追加

In [None]:
checkpoint_path, cp_callback = create_checkpoint("重みを保存するファイルのパスをここに記載")

In [None]:
NUMBER_OF_EPOCH = 32
BATCH_SIZE = 128

model_color = create_3Dmodel()

# モデルの学習
model_color.fit(X_train[split_index[0][0]], y_train[split_index[0][0]],
                batch_size=BATCH_SIZE,
                epochs=NUMBER_OF_EPOCH,
                verbose=1,
                validation_data=(X_train[split_index[0][1]], y_train[split_index[0][1]]),
                callbacks=[cp_callback])

plot_history_of_model(model_color)

In [None]:
"""
#保存した重みの読み込み
model_color = create_3Dmodel()

model_color.load_weights(checkpoint_path)
"""

In [None]:
pred_color = model_color.predict(X_test)
count = 0
for i in range(len(X_test)):
    if np.argmax(pred_color[i]) == np.argmax(y_test[i]):
        count +=1

print("accuracy :",count/len(X_test))

#accuracy : 0.8055

In [None]:
count = 0

negative_list = np.zeros(10)
pred_list = np.zeros(10)

true = np.zeros(10)
pred = np.zeros(10)

my_matrix = np.zeros((10,10))

for i in range(len(pred_color)):
    if np.argmax(pred_color[i]) == np.argmax(y_test[i]):
        count +=1
    else:
      negative_list[np.argmax(y_test[i])] += 1
      pred_list[np.argmax(pred_color[i])] += 1

    true[np.argmax(y_test[i])] += 1
    pred[np.argmax(pred_color[i])] += 1

    my_matrix[np.argmax(y_test[i])][np.argmax(pred_color[i])] += 1

print("accuracy :",count/len(pred_color))
print(label_dict)
print(negative_list)
print(pred_list,"\n")

print(true)
print(pred,"\n")

print(my_matrix)

In [None]:
del X_train,X_test
gc.collect()

# 元データにマスク処理を施したデータに対する分類モデル

In [None]:
(X_train,y_train),(X_test,y_test) = load()

In [None]:
X_cat = X_train[np.where(y_train == 3)[0]]

tf.random.set_seed(SEED)

params = {"zoom_range" : 0.3,
        "horizontal_flip" : True,
        "rotation_range" : 30,
        "height_shift_range": 0.1,
        "width_shift_range": 0.1,
        "channel_shift_range": 0.3}

#データ拡張用オブジェクトの定義
data_generator = keras.preprocessing.image.ImageDataGenerator(**params)

# 同じ画像を複製する
train_aug = X_cat.copy()

y_aug = y_train[np.where(y_train == 3)[0]]

#データ拡張
generator = data_generator.flow(train_aug, y_aug, batch_size=15000)

# 変換後のデータを取得
batch_x = generator.next()

X_train = np.concatenate([X_train,batch_x[0]])
y_train = np.concatenate([y_train,batch_x[1]])

print(X_train.shape)
print(y_train.shape)

次のコードでは画像へマスク処理を施している。

In [None]:
row, col = np.ogrid[:img_rows, :img_cols]

cnt_row, cnt_col = img_rows / 2, img_cols / 2 # Center of the disk
outer_disk_mask = ((row - cnt_row)**2 + (col - cnt_col)**2 > (img_cols / 2)**2)

print(outer_disk_mask.shape)

X_train_masked = X_train.copy()
X_test_masked = X_test.copy()
for i in range(X_train.shape[0]):
    X_train_masked[i][outer_disk_mask] = 0
for i in range(X_test.shape[0]):
    X_test_masked[i][outer_disk_mask] = 0

#メモリ開放
del X_train,row,col,cnt_row,cnt_col
gc.collect()

In [None]:
#データをkerasが読み取れる形式に変換する(one-hotベクトル)
y_train = keras.utils.to_categorical(y_train, NUM_CLASSES)
y_test = keras.utils.to_categorical(y_test, NUM_CLASSES)
print("y_train's shape after change to one-hot vec is",y_train.shape)

In [None]:
X_train_masked = np.expand_dims(X_train_masked, axis=-1)  # 最後の次元にチャンネルを追加
X_test_masked = np.expand_dims(X_test, axis=-1)

In [None]:
#checkpointの設定
checkpoint_path,cp_callback = create_checkpoint("重みを保存するファイルのパスをここに記載")

In [None]:
#学習時には実行する
NUMBER_OF_EPOCH = 32
BATCH_SIZE = 128

model_color_masked = create_3Dmodel()

# モデルの学習
model_color_masked.fit(X_train_masked[split_index[0][0]], y_train[split_index[0][0]],
                      batch_size=BATCH_SIZE,
                      epochs=NUMBER_OF_EPOCH,
                      verbose=1,
                      validation_data=(X_train_masked[split_index[0][1]], y_train[split_index[0][1]]),
                      callbacks=[cp_callback])

In [None]:
plot_history_of_model(model_color_masked)

In [None]:
"""
#保存した重みを読み込み、精度を出力
model_color_masked = create_3Dmodel()

model_color_masked.load_weights(checkpoint_path)

loss, acc = model_color_masked.evaluate(X_test_masked, y_test, verbose=2)
print("Untrained model, accuracy: {:5.2f}%".format(100 * acc))
"""

In [None]:
del X_train_masked

gc.collect()

In [None]:
pred_color_masked = model_color_masked.predict(X_test_masked)

count = 0
for i in range(len(X_test_masked)):
    if np.argmax(pred_color_masked[i]) == np.argmax(y_test[i]):
        count +=1

print("accuracy :",count/len(X_test_masked))

In [None]:
del X_test_masked

gc.collect()

# グレースケール化した画像にマスクを適応させた画像についての分類モデル

In [None]:
(X_train,y_train),(X_test,y_test) = load()

In [None]:
X_cat = X_train[np.where(y_train == 3)[0]]

tf.random.set_seed(SEED)

params = {"zoom_range" : 0.3,
        "horizontal_flip" : True,
        "rotation_range" : 30,
        "height_shift_range": 0.1,
        "width_shift_range": 0.1,
        "channel_shift_range": 0.3}

#データ拡張用オブジェクトの定義
data_generator = keras.preprocessing.image.ImageDataGenerator(**params)

# 同じ画像を複製する
train_aug = X_cat.copy()

y_aug = y_train[np.where(y_train == 3)[0]]

#データ拡張
generator = data_generator.flow(train_aug, y_aug, batch_size=15000)

# 変換後のデータを取得
batch_x = generator.next()

X_train = np.concatenate([X_train,batch_x[0]])
y_train = np.concatenate([y_train,batch_x[1]])

print(X_train.shape)
print(y_train.shape)

前回のモデル作成時に行ったマスク処理を施す。

In [None]:
X_train_masked = X_train.copy()
X_test_masked = X_test.copy()
for i in range(X_train.shape[0]):
    X_train_masked[i][outer_disk_mask] = 0
for i in range(X_test.shape[0]):
    X_test_masked[i][outer_disk_mask] = 0

In [None]:
del X_train,X_test,outer_disk_mask
gc.collect()

グレースケール化を施す。

In [None]:
X_train_masked_gray = np.empty((X_train_masked.shape[0],32,32))

for i in range(X_train_masked.shape[0]):
    im_gray = cv2.cvtColor(X_train_masked[i], cv2.COLOR_BGR2GRAY)
    X_train_masked_gray[i] = im_gray

print("X_train_masked_gray's shape is",X_train_masked_gray.shape)


X_test_masked_gray = np.empty((X_test_masked.shape[0],X_test_masked.shape[1],X_test_masked.shape[2]))

for i in range(X_test_masked.shape[0]):
    im_gray = cv2.cvtColor(X_test_masked[i], cv2.COLOR_BGR2GRAY)
    X_test_masked_gray[i] = im_gray

print("X_test_masked_gray's shape is",X_test_masked_gray.shape)

In [None]:
del X_train_masked,X_test_masked
gc.collect()

In [None]:
#データをkerasが読み取れる形式に変換する(one-hotベクトル)
y_train = keras.utils.to_categorical(y_train, NUM_CLASSES)
y_test = keras.utils.to_categorical(y_test, NUM_CLASSES)
print("y_train's shape after change to one-hot vec is",y_train.shape)

In [None]:
#checkpointの設定
checkpoint_path,cp_callback = create_checkpoint("重みを保存するファイルのパスをここに記載")

In [None]:
#https://qiita.com/yy1003/items/c590d1a26918e4abe512やhttps://deepage.net/deep_learning/2016/10/26/batch_normalization.html参照

NUMBER_OF_EPOCH = 32
BATCH_SIZE = 128

input_size  = (img_rows,img_cols,1)

model_masked_gray = create_2Dmodel(input_size)

model_masked_gray.fit(X_train_masked_gray[split_index[0][0]], y_train[split_index[0][0]],
                      batch_size=BATCH_SIZE,
                      epochs=NUMBER_OF_EPOCH,
                      verbose=1,
                      validation_data=(X_train_masked_gray[split_index[0][1]], y_train[split_index[0][1]]),
                      callbacks=[cp_callback])

plot_history_of_model(model_masked_gray)

In [None]:
"""
#model作成
input_size  = (img_rows,img_cols,1)
model_masked_gray = create_2Dmodel(input_size)
#保存済み重みのロード
model_masked_gray.load_weights(checkpoint_path)

loss, acc = model_masked_gray.evaluate(X_test_masked_gray, y_test, verbose=2)
print("Untrained model, accuracy: {:5.2f}%".format(100 * acc))
"""

In [None]:
del X_train_masked_gray
gc.collect()

In [None]:
pred_masked_gray = model_masked_gray.predict(X_test_masked_gray)

count = 0
for i in range(len(X_test_masked_gray)):
    if np.argmax(pred_masked_gray[i]) == np.argmax(y_test[i]):
        count +=1

print("accuracy :",count/len(X_test_masked_gray))

In [None]:
del X_test_masked_gray,model_masked_gray
gc.collect()

# 元データをグレースケール化したデータの分類モデル

In [None]:
(X_train,y_train),(X_test,y_test) = load()

In [None]:
X_cat = X_train[np.where(y_train == 3)[0]]

tf.random.set_seed(SEED)

params = {"zoom_range" : 0.3,
        "horizontal_flip" : True,
        "rotation_range" : 30,
        "height_shift_range": 0.1,
        "width_shift_range": 0.1,
        "channel_shift_range": 0.3}

#データ拡張用オブジェクトの定義
data_generator = keras.preprocessing.image.ImageDataGenerator(**params)

# 同じ画像を複製する
train_aug = X_cat.copy()

y_aug = y_train[np.where(y_train == 3)[0]]

#データ拡張
generator = data_generator.flow(train_aug, y_aug, batch_size=15000)

# 変換後のデータを取得
batch_x = generator.next()

X_train = np.concatenate([X_train,batch_x[0]])
y_train = np.concatenate([y_train,batch_x[1]])

print(X_train.shape)
print(y_train.shape)

グレースケール化を施す。

In [None]:
X_train_gray = np.empty((X_train.shape[0],32,32))

for i in range(X_train.shape[0]):
    im_gray = cv2.cvtColor(X_train[i], cv2.COLOR_BGR2GRAY)
    X_train_gray[i] = im_gray

print("X_train_gray's shape is",X_train_gray.shape)


X_test_gray = np.empty((X_test.shape[0],X_test.shape[1],X_test.shape[2]))

for i in range(X_test.shape[0]):
    im_gray = cv2.cvtColor(X_test[i], cv2.COLOR_BGR2GRAY)
    X_test_gray[i] = im_gray

print("X_test_gray's shape is",X_test_gray.shape)

del X_train,X_test
gc.collect()

In [None]:
#データをkerasが読み取れる形式に変換する(one-hotベクトル)
y_train = keras.utils.to_categorical(y_train, NUM_CLASSES)
y_test = keras.utils.to_categorical(y_test, NUM_CLASSES)
print("y_train's shape after change to one-hot vec is",y_train.shape)

In [None]:
checkpoint_path,cp_callback = create_checkpoint("重みを保存するファイルのパスをここに記載")

In [None]:
#https://qiita.com/yy1003/items/c590d1a26918e4abe512やhttps://deepage.net/deep_learning/2016/10/26/batch_normalization.html参照

NUMBER_OF_EPOCH = 32
BATCH_SIZE = 128

input_size  = (img_rows,img_cols,1)

model_gray = create_2Dmodel(input_size)

model_gray.fit(X_train_gray[split_index[0][0]], y_train[split_index[0][0]],
              batch_size=BATCH_SIZE,
              epochs=NUMBER_OF_EPOCH,
              verbose=1,
              validation_data=(X_train_gray[split_index[0][1]], y_train[split_index[0][1]]),
              callbacks=[cp_callback])

plot_history_of_model(model_gray)

In [None]:
"""
#保存済みの重みを読み込み
input_size  = (img_rows,img_cols,1)
model_gray = create_2Dmodel(input_size)

model_gray.load_weights(checkpoint_path)
"""

In [None]:
del X_train_gray,y_train
gc.collect()

In [None]:
pred_gray = model_gray.predict(X_test_gray)

count = 0
for i in range(len(X_test_gray)):
    if np.argmax(pred_gray[i]) == np.argmax(y_test[i]):
        count +=1

print("accuracy :",count/len(X_test_gray))

# アンサンブル学習

これまでの予測結果をアンサンブルすることで精度の向上を目指す。

In [None]:
import seaborn as sns

pred_average = (pred_color + pred_color_masked + pred_gray + pred_masked_gray) / 4
count = 0

negative_list = np.zeros(10)
pred_list = np.zeros(10)

true = np.zeros(10)
pred = np.zeros(10)

my_matrix = np.zeros((10,10))

for i in range(len(pred_color)):
    if np.argmax(pred_average[i]) == np.argmax(y_test[i]):
        count +=1
    else:
      negative_list[np.argmax(y_test[i])] += 1
      pred_list[np.argmax(pred_average[i])] += 1

    true[np.argmax(y_test[i])] += 1
    pred[np.argmax(pred_average[i])] += 1

    my_matrix[np.argmax(y_test[i])][np.argmax(pred_average[i])] += 1

print("accuracy :",count/len(pred_color))
print(label_dict)
print(negative_list)
print(pred_list,"\n")

print(true)
print(pred,"\n")

print(my_matrix)

上記のコードで出力した混同行列を視覚的に理解しやすく成形する。

In [None]:
labels = ['飛行機', '自動車', '鳥', '猫', '鹿', '犬', 'カエル', '馬', '船', 'トラック']

In [None]:
df = pd.DataFrame(data=my_matrix, index=labels, columns=labels)

In [None]:
df

In [None]:
!pip install japanize_matplotlib

In [None]:
import japanize_matplotlib

In [None]:
plt.figure(figsize = (12,9))
plt.xlabel("pred")
plt.ylabel("true")
sns.heatmap(df, annot=True, cmap="hot",fmt='.0f')

plt.show()