# 学習と評価

このノートブックでは，Vision Transformerの学習と評価を行うためのパイプラインを実装する．今回は，画像を直接入力できるように，かつ，ヘッドからの予測結果がCNNの形状と同じになるようにViTを実装したので，モデルの定義以外はCNNのコードをすべて流用できる．

そのため，CNNではMNISTを使ったが，今回はカラー画像のCIFAR10を利用して学習させてみよう．

## データセットの準備

CIFAR10は10クラスからなる一般物体の画像認識用データセットであり，各データはRGBの3チャネル持つ．CIFAR10もMNISTと同じように `torchvision` でサポートされており，次のように簡単に読み込むことができる．

In [None]:
import torchvision.transforms as transforms
import torchvision.datasets as datasets

dataset = datasets.CIFAR10(
    root='./data', train=True, download=True, 
    transform=transforms.ToTensor())

CIFAR10も初回実行時は`root`にデータをダウンロードするため時間がかかる．正しくダウンロードできたならば，`train=True` としたので学習データセットが利用可能になる．

サンプル数と画像サイズをチェックする．

In [None]:
print('len(dataset):', len(dataset))

i = 0
x, y = dataset[i]

print('x.shape:', x.shape)

チャネル数が3チャネル，画像サイズが $32 \times 32$ であることが確認できる．

データの最小値・最大値は

In [None]:
print('x.min():', x.min())
print('x.max():', x.max())

となり，`ToTensor()` を渡しているので0から1の間に正規化されている．ここでは，さらに **標準化（Standardization）** を行おう．標準化はデータの平均を0，分散を1とする手法であり，ニューラルネットの学習においても性能改善のための重要なテクニックである．

各チャネルの平均と分散を計算しよう．内方表記を利用して学習データセットに含まれるデータを一度一つのテンソル化し，バッチ，幅，高さ方向に対して平均と標準偏差を次のように計算する．

In [None]:
import torch

data = torch.cat([d[0] for d in torch.utils.data.DataLoader(dataset)])
mean = data.mean(dim=[0, 2, 3])
std = data.std(dim=[0, 2, 3])

print('mean:', mean)
print('std:', std)

計算できたので，これを使って標準化を行う`transform`を作成する．今回は学習用にデータ拡張を施した`transform`も作成しよう．またオリジナルのCIFAR10の解像度は確認した通り $32 \times 32$ であるが，ViTへの入力（パッチ化）を考慮して，$96 \times 96$ とする

In [5]:
img_size = 96

train_transform = transforms.Compose([
    transforms.Resize(img_size),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)])

test_transform = transforms.Compose([
    transforms.Resize(img_size),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)])

再度データセットを読み込んで学習・検証・評価データセットを作成する．

In [None]:
train_dataset = datasets.CIFAR10(
    root='./data', train=True, download=True,
    transform=train_transform)

train_size = int(len(train_dataset) * 0.9)
valid_size = int(len(train_dataset) * 0.1)
train_dataset, valid_dataset = \
    torch.utils.data.random_split(
        train_dataset, [train_size, valid_size])
        
test_dataset = datasets.CIFAR10(
    root='./data', train=False, download=True,
    transform=test_transform)

print(f'train data: {len(train_dataset)}')
print(f'validation data: {len(valid_dataset)}')
print(f'test data: {len(test_dataset)}')

データローダーを作成する．

In [7]:
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=128)
valid_loader = DataLoader(valid_dataset, batch_size=100)
test_loader = DataLoader(test_dataset, batch_size=100)

ミニバッチを次のように取り出して，画像データを可視化しよう．ここで利用した`torchvision.utils.make_grid` 関数はミニバッチの画像データの可視化の際に非常に便利である．

In [None]:
from torchvision.utils import make_grid
import matplotlib.pyplot as plt

x, y = next(iter(train_loader))

mean_ = torch.tensor(mean.numpy()).view(1, 3, 1, 1)
std_ = torch.tensor(std.numpy()).view(1, 3, 1, 1)
x = x * std_ + mean_

img = make_grid(x[:25], nrow=5)
plt.imshow(img.permute(1,2,0).numpy())
plt.axis('off')

以下の処理

```
mean_ = torch.tensor(mean.numpy()).view(1, 3, 1, 1)
std_ = torch.tensor(std.numpy()).view(1, 3, 1, 1)
x = x * std_ + mean_
```

は標準化の逆を行い，元の範囲に戻している．

## モデルの定義

データローダーが構築できたのでViTの構築を行う．行数が長くなるが，ViTのノートブックから必要な処理をコピペしている．

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MLP(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.fc1 = nn.Linear(dim, hidden_dim)
        self.act = nn.GELU()
        self.fc2 = nn.Linear(hidden_dim, dim)
        self.drop = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        return x

class Attention(nn.Module):
    def __init__(self, dim, num_heads=4, dropout=0.5):
        super().__init__()
        self.head_dim = dim // num_heads
        self.num_heads = num_heads
        
        self.proj_q = nn.Linear(dim, dim, bias=False)
        self.proj_k = nn.Linear(dim, dim, bias=False)
        self.proj_v = nn.Linear(dim, dim, bias=False)
        self.proj = nn.Linear(dim, dim, bias=False)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x):
        bs, num_tokens, dim = x.shape
        
        q = self.proj_q(x)
        k = self.proj_q(x)
        v = self.proj_q(x)
        
        q = q.reshape(bs, num_tokens, self.num_heads, self.head_dim)
        k = k.reshape(bs, num_tokens, self.num_heads, self.head_dim)
        v = v.reshape(bs, num_tokens, self.num_heads, self.head_dim)
    
        attn_weight = q @ k.transpose(-2, -1) * dim ** -0.5
        attn_weight = F.softmax(attn_weight, dim=-1)
        attn_weight = self.dropout(attn_weight)
        x = attn_weight @ v
        
        x = x.transpose(1, 2).reshape(bs, num_tokens, dim)
        x = self.proj(x)
        x = self.dropout(x)
        return x

class Block(nn.Module):
    def __init__(self, dim, num_heads, dropout):
        super().__init__()
        self.attn = Attention(dim, num_heads, dropout)
        self.norm1 = nn.LayerNorm(dim)
        self.norm2 = nn.LayerNorm(dim)
        self.mlp = MLP(dim, dim, dropout)

    def forward(self, x):
        h = self.norm1(x)
        h = self.attn(h)
        h = x + h
        h = self.norm2(h)
        h = self.mlp(h)
        h = x + h
        return h

class Head(nn.Module):
    def __init__(self, dim, num_classes):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fc = nn.Linear(dim, num_classes)

    def forward(self, x):
        x = self.norm(x)
        x = self.fc(x)
        return x

class PatchEmbed(nn.Module):
    def __init__(self, img_size=224, patch_size=16, in_channels=3, embed_dim=384):
        super().__init__()
        self.num_patches = (img_size // patch_size) * (img_size // patch_size)
        self.proj = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.proj(x).flatten(2).transpose(1, 2)
        return x

class ViT(nn.Module):
    def __init__(self, img_size, patch_size, in_channels, num_classes, embed_dim, num_heads, dropout):
        super().__init__()
        self.patch_embed = PatchEmbed(img_size, patch_size, in_channels, embed_dim)
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        num_patches = self.patch_embed.num_patches + 1
        self.pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dim))

        self.block1 = Block(embed_dim, num_heads, dropout)
        self.block2 = Block(embed_dim, num_heads, dropout)
        self.block3 = Block(embed_dim, num_heads, dropout)

        self.head = Head(embed_dim, num_classes)

    def forward(self, x):
        x = self.patch_embed(x)
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)
        x = torch.torch.cat((cls_tokens, x), dim=1)

        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)

        x = self.head(x[:,0])
        return x

インスタンス化を行う．

In [None]:
model = ViT(img_size, 16, 3, 10, 256, 4, 0.1)
print(model)

## 損失関数とオプティマイザの設定・GPUへの移動

続いて，損失関数，オプティマイザを設定する．

In [11]:
from torch import optim

loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

GPUへの移動も行う．

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

## 学習・検証ループ

こちらもCNNからの流用でコピペして実行する．

In [None]:
import numpy as np

def train_one_epoch(model, loader, loss_function, optimizer, device):
    model.train()
    train_loss, train_acc = [], []
    for batch in loader:
        x, y = batch
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        output = model(x)
        loss = loss_function(output, y)
        loss.backward()
        optimizer.step()
            
        acc = (output.max(1)[1] == y).float().mean()
        train_loss.append(loss.item())
        train_acc.append(acc.item())
    return np.mean(train_loss), np.mean(train_acc)

def test(model, loader, loss_function, device):
    model.eval()
    test_loss, test_acc = [], []
    with torch.no_grad():
        for batch in loader:
            x, y = batch
            x, y = x.to(device), y.to(device)
            output = model(x)
            loss = loss_function(output, y)
      
            acc = (output.max(1)[1] == y).float().mean()
            test_loss.append(loss.item())
            test_acc.append(acc.item())
    return np.mean(test_loss), np.mean(test_acc)

epochs = 10
train_loss, train_acc = [], []
valid_loss, valid_acc = [], []
for epoch in range(1, epochs+1):
    print(f'Epoch {epoch}/{epochs}')
    loss, acc = train_one_epoch(model, train_loader, loss_function, optimizer, device)
    print(f'train_loss - {loss:.4f}, train_acc - {acc:.4f}')
    train_loss.append(loss)
    train_acc.append(acc)
    
    loss, acc = test(model, valid_loader, loss_function, device)
    print(f'valid_loss - {loss:.4f}, valid_acc - {acc:.4f}')
    valid_loss.append(loss)
    valid_acc.append(acc)
    
    print('')
    
test_loss, test_acc = test(model, test_loader, loss_function, device)
print('test_loss = ', test_loss)
print('test_acc = ', test_acc)

モデルサイズ，入力サイズ，GPUの性能によってこのセルの実行時間は異なるが，Block数を3に制限したとしても，学習にはCNN以上に時間がかかる．またViTは一般的に大規模なデータセットで事前学習を行う必要があり，パラメータを制限したとしても学習はCNNほど上手く進まない．

別ノートブックで事前学習済みモデルの利用方法についても紹介したい．

続いて，予測結果を出力する．

In [None]:
model.eval()
for batch in test_loader:
    x, y = batch
    x, y = x.to(device), y.to(device)
    
    with torch.no_grad():
        output = model(x)
        _, prediction = torch.max(output, 1)

    x = x.cpu()
    mean_ = torch.tensor(mean.numpy()).view(1, 3, 1, 1)
    std_ = torch.tensor(std.numpy()).view(1, 3, 1, 1)
    x = x * std_ + mean_
    x = x.permute(0,2,3,1)
        
    fig, axes = plt.subplots(1, 10, figsize=(15, 2))
    for i in range(10):
        img = x[i].cpu().numpy().squeeze()
        axes[i].imshow(img)
        axes[i].set_title(f"Pred: {prediction[i].item()} - GT: {y[i].item()}")
        axes[i].axis("off")

    plt.tight_layout()
    plt.show()
    break

## モデルの保存

モデルを保存する．保存方法はこれまでと同様である．

In [15]:
import os
os.makedirs('output', exist_ok=True)

save_path = 'output/model.pth'
torch.save(model.state_dict(), save_path)

## 学習結果の表示と保存

学習曲線の表示と結果の保存もこれまでと同様のコードで実行できる．

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 4))
plt.subplot(1, 2, 1)
plt.plot(train_loss, label='train_loss')
plt.plot(valid_loss, label='valid_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(train_acc, label='train_acc')
plt.plot(valid_acc, label='valid_acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show();

In [17]:
np.savetxt('output/train_loss.txt', train_loss)
np.savetxt('output/train_acc.txt', train_acc)

np.savetxt('output/valid_loss.txt', valid_loss)
np.savetxt('output/valid_acc.txt', valid_acc)