## お手軽 Convolutional neural network (CNN)

### 概要
**簡易的な実装**なので、精度・速度ともに実用には不向きです。学習には、GPUを使っても多少時間がかるので、気長に実行してください（Colabのタイムアウトに注意してください）。

同シリーズ「お手軽BoVW」と同じデータセット（Caltech101）を使っていますので、比較しながら実行すると良いでしょう。また、コードを追うと、近年のモダンな深層学習ライブラリ（ここではPyTorchを使います）の便利さも実感できるかと思います。

ちなみに、実はCaltech101（約100クラス・約10000画像）は**小さいデータセット**なので、あまり深層学習には向かないです。が、これくらいのデータセットしか集められないこと、よくありますよね。。。そういう場合にどうするのか、よくある方法も紹介します。

参考：https://debuggercafe.com/getting-95-accuracy-on-the-caltech101-dataset-using-deep-learning/

### 準備
**GPUインスタンスで実行してください：**
1.   「ランタイム」から「ランタイムのタイプを変更」
2.   ハードウェアアクセラレータを「GPU」に

最後のセルなどで使っているサンプル画像は、githubリポジトリ`ou_dip/bovw/`以下にありますので、ランタイム起動後にアップロードしてください。




In [None]:
## 大量データにアクセスする場合、Googleドライブ上だと遅くなるのでコメントアウトしています
# Googleドライブへのマウント（Colab用コード）
# from google.colab import drive
# drive.mount('/content/drive')
# %cd "/content/drive/My Drive/Colab Notebooks/ou_dip/"

import cv2
import numpy as np  
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline
import time
import pickle
import os

def imshow(img):
  if img.ndim == 3:
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    display(Image.fromarray(img))
  else:
    display(Image.fromarray(img))

## Step 0: 入力データの準備
ここでは、Caltech101というデータセットを使う。101クラスのデータセットで、およそ10000枚の画像が含まれる。

http://www.vision.caltech.edu/Image_Datasets/Caltech101/

ここでは、8割をtrain、1割をvalidation、1割をtestにランダムに分割する。ここで、各データセットは以下のものを指す（資料によっては呼び方が違ったりする）

* training datasetは、機械学習器の学習に使う
* validation datasetは、学習器のハイパーパラメータ（繰り返し回数など）選択などに使う。
* test datasetは、学習器の精度評価などに使う。**注意：ハイパーパラメータ探索にtestデータセットを使ってはいけない！**

データのダウンロードには、PyTorch（とtorchvision）という深層学習向けライブラリを使っている。


### 共通の設定

In [None]:
from tqdm.notebook import tqdm
import random

# バッチサイズ（1つのミニバッチに含まれるサンプル数）
batch_size = 64

# 学習するエポック数
n_epochs = 10

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

if torch.cuda.is_available():
    device = 'cuda' # GPUが使えるなら使う
else:
    device = 'cpu'

# 画像読み込み時の変換の指定。これを入れると、画像読み込み時に自動化してくれる
transforms = transforms.Compose([
    #transforms.ToPILImage(),        # PILImageという画像の保持形式にし、
    torchvision.transforms.Grayscale(num_output_channels=3),  # グレイスケール画像のチャンネル数を強制的に3にする
    transforms.Resize((224, 224)),  # 224x224にリサイズし、
    transforms.ToTensor(),          # PyTorchの多次元配列の型（Tensor型）にし、
    transforms.Normalize(mean = [0.485,0.456,0.406], std=[0.229,0.224,0.225]),  # 各チャネルを、引数で与えたmeanとstdで正規化する
])

# Caltech101データセットのダウンロード
dataset = torchvision.datasets.Caltech101(root='./data', download=True, transform=transforms)
categories = dataset.categories # category list

# train, val, testデータセットの作成（ランダム分割）
n_samples = len(dataset)
train_size = int(n_samples * 0.8)
val_size = int((n_samples-train_size)*0.5)
test_size = n_samples - train_size - val_size
print("train_size:", train_size, "val_size:", val_size, "test_size", test_size)

# split dataset into training and test datasets with fixed random seed (42)
trainval_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size+val_size, test_size], generator=torch.Generator().manual_seed(42))
train_dataset, val_dataset = torch.utils.data.random_split(trainval_dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))

# PyTorchにはDataLoaderというクラスがあり、iterationごとにデータを読み込む部分を隠蔽できて便利
## batch_sizeはミニバッチのサイズの指定。num_workersは読み込みの並列化に使う。
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

In [None]:
device  # ここがcudaになっていることを確認しましょう（なっていなかったらGPUインスタンスに切り替えてください）

## Step 1: モデルの構築
ResNet34（34層のResNetモデル）を使ってみます。ResNetは、層の数によって17,50,101,152もあり、torchvisionのmodelsで定義されています。他にもいろいろなモデルがありますので、興味があれば切り替えて使ってみてください。

https://pytorch.org/vision/stable/models.html

In [None]:
# torchvisionで提供されているResNet34の中身を見てみる
print(torchvision.models.resnet34())

In [None]:
# ResNet34をベースにしたネットワーク構造
class ResNet34_scratch(nn.Module):
    def __init__(self):
        super(ResNet34_scratch, self).__init__()
        self.model  = torchvision.models.resnet34()

        # avgpoolの出力が512次元。元のResNetは1000クラス分類用のfc層が最後についているので、
        # 最終層（全結合層: fc）だけ、分類したいクラス数（101）に合わせて変更するため上書きしてしまう
        # 元のFC層の定義　(fc): Linear(in_features=512, out_features=1000, bias=True)
        self.model.fc = nn.Linear(512, len(dataset.categories), bias=True)
    
    def forward(self, x):
        # forward passの定義。
        # backwardの計算（勾配を求めて云々。。。）はPyTorchが自動でやってくれる。
        
        # xに、入力データが入ってくる。あとは、__init__で定義したモジュールを組み合わせて完成
        out = self.model(x) 
        return out

model = ResNet34_scratch().to(device)


In [None]:
from torchsummary import summary
print(summary(model, input_size=(3, 224, 224)))  # 引数で指定された入力が与えられた場合のパラメータ数などを計算してくれる

## Step 2: 学習


In [None]:
# 損失関数。 nn.CrossEntropyLoss() はSoftmaxとCross entropyを同時に計算してくれる
loss_criterion = nn.CrossEntropyLoss() 

# optimizerの指定。Adamを使うことにする。引数で指定しているのは最適化対象のパラメータと学習率。
## 最適化対象については、model.parameters()で、モデル内の全パラメータを渡せる
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 任意のデータセット全体に対し、（パラメータの最適化をせず）lossだけ計算する関数
def validate(model, dataloader):
  model.eval()  # evalモード
  losses = 0.0
  correct_preds = 0
  with torch.no_grad(): # 最適化しないので、勾配を計算しない
    for data in dataloader:
      input, label = data[0].to(device), data[1].to(device)
      outputs = model(input)
      loss = loss_criterion(outputs, label)
      
      losses += loss.item()
      _, preds = torch.max(outputs.data, 1)
      correct_preds += (preds == label).sum().item()
    
    avg_loss = losses/len(dataloader)
    accuracy = float(correct_preds)/len(dataloader.dataset)
    
    return avg_loss, accuracy

In [None]:
# 学習
loss_train , acc_train = [], []
loss_val , acc_val = [], []

print('Training')
start = time.perf_counter()

for epoch in range(n_epochs):
    model.train()
    losses = 0.0
    correct_preds = 0
    
    print("Epoch", epoch, "/", n_epochs-1)
    for data in tqdm(train_loader):
        input, label = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()
        outputs = model(input)

        loss = loss_criterion(outputs, label)
        losses += loss.item()
        _, preds = torch.max(outputs.data, 1)
        correct_preds += (preds == label).sum().item()

        loss.backward()
        optimizer.step()
        
    avg_loss = losses/len(train_loader)
    acc = float(correct_preds)/len(train_loader.dataset)    
    print(f"Train Loss: {avg_loss:.4f}, Train Acc: {acc:.4f}")

    print('Validating...')
    vl, va = validate(model, val_loader)
    print(f'Val Loss: {vl:.4f}, Val Acc: {va:.4f}')
    
    loss_train.append(avg_loss)
    acc_train.append(acc)
    loss_val.append(vl)
    acc_val.append(va)

    # 各エポック終了時のモデルを保存しておく
    path = str(epoch) + ".pt"
    torch.save({'model_state_dict': model.state_dict(),'optimizer_state_dict': optimizer.state_dict()}, path)

print("Elapsed time:",time.perf_counter()-start,"[s]")

In [None]:
best_epoch = loss_val.index(min(loss_val))  # eval_setに対するlossが最も小さかったエポック数
print("Best epoch:", best_epoch, "train loss:", loss_train[best_epoch], "val loss:", loss_val[best_epoch])

# 学習曲線の描画
fig = plt.figure()
ax1 = fig.add_subplot()
ax1.set_xlabel('iteration')
ax1.set_ylabel('loss')

ax1.plot(loss_train, label='train loss')
ax1.plot(loss_val, label='val loss')
plt.legend()
plt.show()
plt.close()

## Step 3: 学習済みモデルを使った推論

学習済みのモデルを使って、

*   testデータセットに対する推論をして精度評価をする
*   任意の画像を入力として推論する

**validation datasetは既にハイパーパラメータの設定（best iterationの選択）に使ったので評価に使ってはいけない**



### testデータセットを使った評価

最もvalidation setに対する精度が良かったエポックのモデルを使う

#### Tips: 学習時と評価時の振る舞いの変化

trainに対する精度が、学習中のログとして出力したものと異なる（多くの場合、学習時より精度が高くなる）ことがある。学習中と評価時は一部の処理の内容が異なるためである。例えば、
* batch normalizationは、学習時はミニバッチ内の平均・分散を使うが、評価時は全データを使う
* dropoutは、学習時のみ導入される

これらの振る舞いは、model.train()とmodel.eval()で切り替えることができる

ちなみに、学習曲線をみると、たまに学習に使っていないvalidation setに対する精度がtrainに対するものより良いことがある。この理由も上記によることが多い。

In [None]:
checkpoint = torch.load(str(best_epoch) + ".pt")
model.load_state_dict(checkpoint['model_state_dict'])

test_loss, test_acc = validate(model, test_loader)
train_loss, train_acc = validate(model, train_loader)
val_loss, val_acc = validate(model, val_loader)

print("---Accuracy evaluation---")

print("Test accuracy:",test_acc)
print("Train accuracy (for reference)':",train_acc)
print("Val accuracy (for reference):",val_acc)

### 任意の画像に対する予測

In [None]:
path = "wrightflyer.jpg"

image = Image.open(path)
image = transforms(image).to(device)
image = image.unsqueeze(0)

with torch.no_grad():
    output = model(image)
    preds = torch.max(output.data, 1).indices.cpu().numpy()
    pred = preds[0] # 1つの画像しか入力していないので、最初の要素をとってくればOK

print("Predicted as", categories[pred], ", category id:",pred)
imshow(cv2.imread(path))

## Step 4: 大規模データセットで学習されたモデルをfine-tuneする

Caltech101は**小さな**データセットであるため、これだけを学習に使うとすぐにtrainデータセットへの過適合を起こしてしまう。

よくある物体（飛行機とか人とか・・・）の認識に使うような画像を使う場合、ImageNetという大規模な画像データセット（と1000クラスの分類用のラベル）を使って学習されたパラメータを初期値として、あるいはそのまま利用して学習（fine-tuning）することが多い。

Fine-tuningといっても様々な方法があるが、ここでは最後の全結合層 (fc) のパラメータのみを最適化対象として、その他は事前学習された（pretrained）重みをそのまま使うことにする。

今回のfine-tuning方法がうまくいくのは、ImageNetとCaltech101のドメイン（タスクや含まれる画像）がかなり似通っているから。事前学習に使われたドメインとfine-tuneにつかうデータセットのドメインもう少し離れている場合は、もうすこし多くのパラメータを更新対象とするようなことを検討すると良い。（あまりにかけ離れたドメインの場合は、pretrained modelを使うよりも、ランダム重みから学習する方が良いこともある）


In [None]:
# ResNet34をベースにしたネットワーク構造
class ResNet34_finetune(nn.Module):
    def __init__(self):
        super(ResNet34_finetune, self).__init__()
        self.model = torchvision.models.resnet34(pretrained=True) # pretrained=Trueとすると、ImageNetで事前学習された重みを読み込む

        # 事前学習されたパラメータのうち、前の方の層は固定したい。
        for param in model.parameters():  # modelに含まれる全パラメータについて
          param.requires_grad = False # requires_grad = Falseとすると、そのパラメータを固定してくれる。
        
        # avgpoolの出力が512次元。元のResNetは1000クラス分類用のfc層が最後についているので、
        # 最終層（全結合層: fc）だけ、分類したいクラス数（101）に合わせて変更するため上書きしてしまう
        # 元のFC層の定義　(fc): Linear(in_features=512, out_features=1000, bias=True)
        self.model.fc = nn.Linear(512, len(dataset.categories), bias=True)       
        self.model.fc.weight.requires_grad = True # ここは更新対象にしたい
        self.model.fc.bias.requires_grad = True # ここは更新対象にしたい
    
    def forward(self, x):
        # forward passの定義。
        # backwardの計算（勾配を求めて云々。。。）はPyTorchが自動でやってくれる。
        
        # xに、入力データが入ってくる。あとは、__init__で定義したモジュールを組み合わせて完成
        out = self.model(x) 
        return out

model_ft = ResNet34_finetune().to(device)

optimizer = optim.Adam(model_ft.parameters(), lr=1e-4)
#optimizer = optim.Adam(filter(lambda p: p.requires_grad, model_ft.parameters()), lr=1e-4)

# 学習
loss_train , acc_train = [], []
loss_val , acc_val = [], []

print('Training')
start = time.perf_counter()

for epoch in range(n_epochs):
    model_ft.train()
    losses = 0.0
    correct_preds = 0
    
    print("Epoch", epoch, "/", n_epochs-1)
    for data in tqdm(train_loader):
        input, label = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()
        outputs = model_ft(input)

        loss = loss_criterion(outputs, label)
        losses += loss.item()
        _, preds = torch.max(outputs.data, 1)
        correct_preds += (preds == label).sum().item()

        loss.backward()
        optimizer.step()
        
    avg_loss = losses/len(train_loader)
    acc = float(correct_preds)/len(train_loader.dataset)    
    print(f"Train Loss: {avg_loss:.4f}, Train Acc: {acc:.4f}")

    print('Validating...')
    vl, va = validate(model_ft, val_loader)
    print(f'Val Loss: {vl:.4f}, Val Acc: {va:.4f}')
    
    loss_train.append(avg_loss)
    acc_train.append(acc)
    loss_val.append(vl)
    acc_val.append(va)

    # 各エポック終了時のモデルを保存しておく
    path = str(epoch) + "_finetune.pt"
    torch.save({'model_state_dict': model_ft.state_dict(),'optimizer_state_dict': optimizer.state_dict()}, path)

print("Elapsed time:",time.perf_counter()-start,"[s]")

In [None]:
best_epoch = loss_val.index(min(loss_val))  # eval_setに対するlossが最も小さかったエポック数
print("Best epoch:", best_epoch, "train loss:", loss_train[best_epoch], "val loss:", loss_val[best_epoch])

# 学習曲線の描画
fig = plt.figure()
ax1 = fig.add_subplot()
ax1.set_xlabel('iteration')
ax1.set_ylabel('loss')

ax1.plot(loss_train, label='train loss')
ax1.plot(loss_val, label='val loss')
plt.legend()
plt.show()
plt.close()

In [None]:
# 最もvalidation setに対する精度が良かったエポックのモデルを使って評価
checkpoint = torch.load(str(best_epoch) + "_finetune.pt")
model_ft.load_state_dict(checkpoint['model_state_dict'])

test_loss, test_acc = validate(model_ft, test_loader)
train_loss, train_acc = validate(model_ft, train_loader)
val_loss, val_acc = validate(model_ft, val_loader)

print("---Accuracy evaluation for fine-tuned model---")

print("Test accuracy:",test_acc)
print("Train accuracy (for reference)':",train_acc)
print("Val accuracy (for reference):",val_acc)

In [None]:
path = "wrightflyer.jpg"

image = Image.open(path)
image = transforms(image).to(device)
image = image.unsqueeze(0)

with torch.no_grad():
    output = model_ft(image)
    preds = torch.max(output.data, 1).indices.cpu().numpy()
    pred = preds[0] # 1つの画像しか入力していないので、最初の要素をとってくればOK

print("Predicted as", categories[pred], ", category id:",pred)
imshow(cv2.imread(path))