# 人工知能とソフトコンピューティング 第8回 CNN 演習
## 追加資料
* MNISTの手書き文字分類タスクに対してエポック数を増やして訓練した結果
* 同じタスクを1層の全結合ニューラルネットワークで分類した結果

## Google Collaboratory 利用の準備

以下の準備をする
* pytorch lightningを環境にインストールする
* Google Driveにアクセスできるようにする

In [None]:
!pip install lightning

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### 準備

In [None]:
base_path = '/content/drive/MyDrive/Colab Notebooks'

In [None]:
from pathlib import Path # ファイルパスの取り扱い
import matplotlib.pyplot as plt # グラフ描画
import numpy as np # 数値取扱い
import pandas as pd # データ解析
from sklearn.model_selection import train_test_split # 訓練データとテストデータの分割
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, random_split
import pytorch_lightning as L
from pytorch_lightning.utilities.model_summary import ModelSummary
from torchmetrics import Accuracy
from torchvision import transforms # 画像変換ライブラリ
from torchvision.datasets import MNIST # MNISTのデータ

#訓練データ
train_dataset = MNIST(root=Path(base_path, 'data'),
                      train=True,
                      transform=transforms.ToTensor(),
                      download = True)
#検証データ
test_dataset = MNIST(root=Path(base_path, 'data'),
                      train=False,
                      transform=transforms.ToTensor(),
                      download = True)

# 60000枚の学習用ベースデータを 48000枚（訓練用データ）と12000枚（訓練中の評価用データ）に分割
# 乱数シードを固定して毎回同じ分割になるようにする
generator = torch.Generator().manual_seed(0)
train_dataset, val_dataset = random_split(train_dataset, [0.8, 0.2], generator=generator)


### ニューラルネットワークの定義とコンパイル
* model_cnn は演習で用いた資料と同じもの
* model_fcn は全結合ネットワーク1層のみ（単純パーセプトロン）

In [None]:
# CNNの定義（同じもの）
class MNISTCNN(L.LightningModule):
    def __init__(self):
        super().__init__()
        self.save_hyperparameters() # ハイパーパラメータを保存
        self.example_input_array = torch.zeros((1, 1, 28, 28)) # 枚数×色チャネル数×横ピクセル数×縦ピクセル数

        # 出力クラス数（数字 0～9 の10クラス分類）
        self.num_classes = 10 
        # CNNモデル定義
        self.model = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, padding=0),
            nn.Flatten(), # Flatten層
            nn.Linear(in_features=64, out_features=self.num_classes), # 全結合層
        )

        # 損失関数: クロスエントロピー損失関数（多クラス分類用）
        self.criterion = nn.CrossEntropyLoss()
        
        # 評価指標: Kerasのmetrics=['accuracy'] に対応
        self.train_acc = Accuracy(task="multiclass", num_classes=self.num_classes)
        self.val_acc = Accuracy(task="multiclass", num_classes=self.num_classes)
        self.test_acc = Accuracy(task="multiclass", num_classes=self.num_classes)

    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        # オプティマイザ: Adam (デフォルトの学習率 0.001)
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001) # デフォルトの学習率を設定
        return optimizer

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        self.train_acc(preds, y)
        
        self.log('train_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('train_accuracy', self.train_acc, on_step=False, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        self.val_acc(preds, y)
        
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_accuracy', self.val_acc, on_step=False, on_epoch=True)

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        self.test_acc(preds, y)
        
        self.log('test_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('test_accuracy', self.test_acc, on_step=False, on_epoch=True)

model_cnn = MNISTCNN()
print(ModelSummary(model_cnn, max_depth=-1))

In [None]:
class MNISTFCN(L.LightningModule):
    def __init__(self):
        super().__init__()
        self.save_hyperparameters()
        self.num_classes = 10
        self.input_size = 28
        self.example_input_array = torch.zeros((1, 1, self.input_size, self.input_size))

        self.model = nn.Sequential(
            nn.Flatten(), # Flatten層: 1次元データへの変換
            nn.Linear(in_features=self.input_size*self.input_size, out_features=self.num_classes)
        )

        # 損失関数: クロスエントロピー損失関数（多クラス分類用）
        self.criterion = nn.CrossEntropyLoss()
        
        # 評価指標: 訓練・検証・テストでの精度計算用
        self.train_acc = Accuracy(task="multiclass", num_classes=self.num_classes)
        self.val_acc = Accuracy(task="multiclass", num_classes=self.num_classes)
        self.test_acc = Accuracy(task="multiclass", num_classes=self.num_classes)

    def forward(self, x):
        return self.model(x)

    def configure_optimizers(self):
        # オプティマイザ: Adam (デフォルトの学習率 0.001)
        optimizer = torch.optim.Adam(self.parameters(), lr=0.001) # デフォルトの学習率を設定
        return optimizer

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        self.train_acc(preds, y)
        
        self.log('train_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('train_accuracy', self.train_acc, on_step=False, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        self.val_acc(preds, y)
        
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_accuracy', self.val_acc, on_step=False, on_epoch=True)

    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.criterion(logits, y)

        preds = torch.argmax(logits, dim=1)
        self.test_acc(preds, y)
        
        self.log('test_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('test_accuracy', self.test_acc, on_step=False, on_epoch=True)

model_fcn = MNISTFCN()
print(ModelSummary(model_fcn, max_depth=-1))

### 定義した二つのネットワークの訓練
* エポック数100で訓練（時間がかかる可能性があるので実行する場合は適切に調整）
* バッチサイズや投入するデータは演習と同じ
* 訓練履歴はそれぞれの変数に格納

In [None]:
epochs = 10 # エポック数
batch_size = 32 # バッチサイズ

# 1. データローダーとモデルのインスタンス化

train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True
)

val_dataloader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    shuffle=False
)

# 訓練履歴を保存するフォルダ
log_folder = Path(base_path, "logs")

# CNNモデルの学習
checkpoint_callback_cnn = L.callbacks.ModelCheckpoint(
    save_last=True,
)
cnn_logger = L.loggers.CSVLogger(log_folder, name='cnn2')
cnn_trainer = L.Trainer(
    max_epochs=epochs,
    logger=cnn_logger,
    callbacks=[checkpoint_callback_cnn],
    deterministic=True,
    enable_progress_bar=True
)
cnn_trainer.fit(model_cnn, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)

# FCNモデルの学習
checkpoint_callback_fcn = L.callbacks.ModelCheckpoint(
    save_last=True,
)
fcn_logger = L.loggers.CSVLogger(log_folder, name='fcn2')
fcn_trainer = L.Trainer(
    max_epochs=epochs,
    logger=fcn_logger,
    callbacks=[checkpoint_callback_fcn],
    deterministic=True,
    enable_progress_bar=True
)
fcn_trainer.fit(model_fcn, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)

In [None]:
def load_history(log_path):
    log_file = Path(log_path, "metrics.csv")
    history = pd.read_csv(log_file)
    train_history = history.dropna(subset=['train_loss']).reset_index(drop=True) 
    val_history = history.dropna(subset=['val_loss']).reset_index(drop=True)
    return train_history, val_history

def show_accuracy_and_loss_graphs(train_history, val_history, epochs, epoch_from = 1): # 訓練履歴（精度・損失）を描画する関数
    training_accuracies = train_history["train_accuracy"] # 訓練用データに対する精度
    validation_accuracies = val_history["val_accuracy"] # 評価用データに対する精度
    training_losses = train_history["train_loss"] # 訓練用データに対する損失
    validation_losses = val_history["val_loss"] # 評価用データに対する損失
    
    epochs_range = range(epoch_from, epochs + 1) # 1 から epochs までの描画範囲を指定
    figure = plt.figure(figsize = (10, 5))
    subplot = plt.subplot(1, 2, 1)
    subplot.plot(epochs_range, training_accuracies, label = "Training Accuracy") # 訓練用データに対する精度のグラフ描画
    subplot.plot(epochs_range, validation_accuracies, label = "Validation Accuracy") # 評価用データに対する精度のグラフ描画
    subplot.set_xlabel("epochs")
    subplot.legend()
    plt.title("Training and Validation Accuracy")
    subplot = plt.subplot(1, 2, 2)
    subplot.plot(epochs_range, training_losses, label = "Training Loss") # 訓練用データに対する損失のグラフ描画
    subplot.plot(epochs_range, validation_losses, label = "Validation Loss") # 評価用データに対する損失のグラフ描画
    subplot.set_xlabel("epochs")
    subplot.legend()
    plt.title("Training and Validation Loss")
    plt.show()

In [None]:
print("")
print("CNN エポック数{}の訓練履歴".format(epochs))
cnn_log_path = Path(log_folder, "cnn2", f"version_{cnn_trainer.logger.version}")
cnn_train_history, cnn_val_history = load_history(cnn_log_path)
show_accuracy_and_loss_graphs(cnn_train_history, cnn_val_history, epochs)

print("")
print("FCN エポック数{}の訓練履歴".format(epochs))
fcn_log_path = Path(log_folder, "fcn2", f"version_{fcn_trainer.logger.version}")
fcn_train_history, fcn_val_history = load_history(fcn_log_path)
show_accuracy_and_loss_graphs(fcn_train_history, fcn_val_history, epochs)

### 二つのモデルにテストデータを与えて評価・分類

In [None]:
test_dataloader = DataLoader(
    test_dataset,
    batch_size=len(test_dataset)
)
score_cnn = cnn_trainer.test(model_cnn, dataloaders=test_dataloader, verbose=False)[0] # CNN評価
score_fcn = fcn_trainer.test(model_fcn, dataloaders=test_dataloader, verbose=False)[0] # FCN評価

print("CNN {}エポック 精度 =".format(epochs), score_cnn['test_accuracy'])
print("CNN {}エポック 損失 =".format(epochs), score_cnn['test_loss'])
print("FCN {}エポック 精度 =".format(epochs), score_fcn['test_accuracy'])
print("FCN {}エポック 損失 =".format(epochs), score_fcn['test_loss'])

In [None]:
test_images, test_labels = next(iter(test_dataloader)) # 画像とラベルをTensorで取得

predict_data_loader = DataLoader(
    test_images,
    batch_size=len(test_images)
)

def predict(trainer, data_loader):
    probs = trainer.predict(dataloaders=data_loader)
    predictions = torch.cat(probs, dim=0) # リストprobの要素を結合して1つのTensorにする
    predictions = nn.functional.softmax(predictions, dim=1) # 各クラスの予測値の合計が1になるようにする（確率に変換）
    return predictions

predictions_cnn = predict(cnn_trainer, predict_data_loader) # CNN分類
predictions_fcn = predict(fcn_trainer, predict_data_loader) # FCN分類

In [None]:
def show_prediction_result(image, correct, result):
    figure = plt.figure(figsize = [6.4, 2.4])
    subplot = plt.subplot(1, 2, 1)
    plt.imshow(image, cmap = plt.cm.binary)
    plt.title("correct label = {}".format(correct))
    plt.axis("off")
    subplot = plt.subplot(1, 2, 2)
    plt.bar(range(len(result)), result)
    plt.xticks(range(len(result)), range(len(result)))
    plt.ylim(0, 1)
    plt.title("probabilites")
    plt.show()

num_examples = 5
count = 0
print("")
print("CNNで正しく分類された画像の例") 
for i in range(len(predictions_cnn)):
    if (np.argmax(predictions_cnn[i]) == test_labels[i]):
        show_prediction_result(test_images[i].squeeze(), test_labels[i], predictions_cnn[i])
        count += 1
        if (count > num_examples):
            break
print("")
print("CNNで誤って分類された画像の例")
count = 0
for i in range(len(predictions_cnn)):
    if (np.argmax(predictions_cnn[i]) != test_labels[i]):
        show_prediction_result(test_images[i].squeeze(), test_labels[i], predictions_cnn[i])
        count += 1
        if (count > num_examples):
            break
count = 0
print("")
print("FCNで正しく分類された画像の例") 
for i in range(len(predictions_fcn)):
    if (np.argmax(predictions_fcn[i]) == test_labels[i]):
        show_prediction_result(test_images[i].squeeze(), test_labels[i], predictions_fcn[i])
        count += 1
        if (count > num_examples):
            break
print("")
print("FCNで誤って分類された画像の例")
count = 0
for i in range(len(predictions_fcn)):
    if (np.argmax(predictions_fcn[i]) != test_labels[i]):
        show_prediction_result(test_images[i].squeeze(), test_labels[i], predictions_fcn[i])
        count += 1
        if (count > num_examples):
            break