# 第5回講義 宿題

## 課題

今Lessonで学んだことに工夫を加えて，CNNでより高精度なCIFAR10の分類器を実装してみましょう．精度上位者はリーダーボードに載ります．

### 目標値

Accuracy 78%

### ルール

- 訓練データはx_train， t_train，テストデータはx_testで与えられます．
- 予測ラベルは one_hot表現ではなく0~9のクラスラベル で表してください．
- **下のセルで指定されているx_train，t_train以外の学習データは使わないでください．**
- 今回から基本的にAPI制限はありません．
- ただしCNNベースでないモデル（Vision Transformerなど）やtorchvision等の既存モデル，学習済みモデルは用いないでください．

### 提出方法

- 2つのファイルを提出していただきます．
  - テストデータ (x_test) に対する予測ラベルをcsvファイル (ファイル名: submission_pred.csv) で提出してください．
  - それに対応するpythonのコードをsubmission_code.pyとして提出してください (%%writefileコマンドなどを利用してください)．

### 評価方法

- 予測ラベルのt_testに対する精度 (Accuracy) で評価します．
- 提出後即時採点を行い，Leader Boardが更新されます．
- 締切後の点数を最終的な評価とします．

In [None]:
# ドライブのマウント
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### データの読み込み

- この部分は修正しないでください

In [None]:
import random

import numpy as np
import pandas as pd
import torch
from torchvision import transforms
from tqdm import tqdm_notebook as tqdm
from PIL import Image
from sklearn.model_selection import train_test_split

#学習データ
x_train = np.load('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/data/x_train.npy')
t_train = np.load('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/data/t_train.npy')
    
#テストデータ
x_test = np.load('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/data/x_test.npy')

class train_dataset(torch.utils.data.Dataset):
    def __init__(self, x_train, t_train):
        data = x_train.astype('float32')
        self.x_train = []
        for i in range(data.shape[0]):
            self.x_train.append(Image.fromarray(np.uint8(data[i])))
        self.t_train = t_train
        self.transform = transforms.ToTensor()

    def __len__(self):
        return len(self.x_train)

    def __getitem__(self, idx):
        return self.transform(self.x_train[idx]), torch.tensor(t_train[idx], dtype=torch.long)

class test_dataset(torch.utils.data.Dataset):
    def __init__(self, x_test):
        data = x_test.astype('float32')
        self.x_test = []
        for i in range(data.shape[0]):
            self.x_test.append(Image.fromarray(np.uint8(data[i])))
        self.transform = transforms.ToTensor()

    def __len__(self):
        return len(self.x_test)

    def __getitem__(self, idx):
        return self.transform(self.x_test[idx])

trainval_data = train_dataset(x_train, t_train)
test_data = test_dataset(x_test)

### 畳み込みニューラルネットワーク(CNN)の実装

In [None]:
def fix_seed(seed=1234):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)


fix_seed(seed=42)


class gcn():
    def __init__(self):
        pass

    def __call__(self, x):
        mean = torch.mean(x)
        std = torch.std(x)
        return (x - mean)/(std + 10**(-6))  # 0除算を防ぐ


class ZCAWhitening():
    def __init__(self, epsilon=1e-4, device="cuda"):  # 計算が重いのでGPUを用いる
        self.epsilon = epsilon
        self.device = device

    def fit(self, images):  # 変換行列と平均をデータから計算
        x = images[0][0].reshape(1, -1)
        self.mean = torch.zeros([1, x.size()[1]]).to(self.device)
        con_matrix = torch.zeros([x.size()[1], x.size()[1]]).to(self.device)
        for i in range(len(images)):  # 各データについての平均を取る
            x = images[i][0].reshape(1, -1).to(self.device)
            self.mean += x / len(images)
            con_matrix += torch.mm(x.t(), x) / len(images)
            if i % 10000 == 0:
                print("{0}/{1}".format(i, len(images)))
        self.E, self.V = torch.linalg.eigh(con_matrix)  # 固有値分解
        self.E = torch.max(self.E, torch.zeros_like(self.E)) # 誤差の影響で負になるのを防ぐ
        self.ZCA_matrix = torch.mm(torch.mm(self.V, torch.diag((self.E.squeeze()+self.epsilon)**(-0.5))), self.V.t())
        print("completed!")

    def __call__(self, x):
        size = x.size()
        x = x.reshape(1, -1).to(self.device)
        x -= self.mean
        x = torch.mm(x, self.ZCA_matrix.t())
        x = x.reshape(tuple(size))
        x = x.to("cpu")
        return x


# (datasetのクラスを自作したので，このあたりの処理が少し変わっています)

zca = ZCAWhitening()
zca.fit(trainval_data)

val_size = 3000
train_data, val_data = torch.utils.data.random_split(trainval_data, [len(trainval_data)-val_size, val_size])  # 訓練データと検証データに分割


# 前処理を定義
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # randomly flip and rotate
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    gcn(),  # apply the global contrast normalization
    zca  # apply ZCA Whitening
])

transform = transforms.Compose([
    transforms.ToTensor(),
    gcn(),  # apply the global contrast normalization
    zca  # apply ZCA Whitening
])

# データセットに前処理を設定
trainval_data.transform = transform_train
test_data.transform = transform

batch_size = 64

dataloader_train = torch.utils.data.DataLoader(
    train_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_valid = torch.utils.data.DataLoader(
    val_data,
    batch_size=batch_size,
    shuffle=True
)

dataloader_test = torch.utils.data.DataLoader(
    test_data,
    batch_size=batch_size,
    shuffle=False
)


0/50000
10000/50000
20000/50000
30000/50000
40000/50000
completed!


In [None]:
import torch.nn as nn
class Conv(nn.Module):
    def __init__(self, filter_shape, function=lambda x: x, stride=(1, 1), padding=0):
        super().__init__()
        # Heの初期化
        # filter_shape: (出力チャンネル数)x(入力チャンネル数)x(縦の次元数)x(横の次元数)
        fan_in = filter_shape[1] * filter_shape[2] * filter_shape[3]
        fan_out = filter_shape[0] * filter_shape[2] * filter_shape[3]

        self.W = nn.Parameter(torch.tensor(rng.normal(
                        0,
                        np.sqrt(2/fan_in),
                        size=filter_shape
                    ).astype('float32')))

        # バイアスはフィルタごとなので, 出力フィルタ数と同じ次元数
        self.b = nn.Parameter(torch.tensor(np.zeros((filter_shape[0]), dtype='float32')))

        self.function = function  # 活性化関数
        self.stride = stride  # ストライド幅
        self.padding = padding  # パディング

    def forward(self, x):
        u = F.conv2d(x, self.W, bias=self.b, stride=self.stride, padding=self.padding)
        return self.function(u)

class Pooling(nn.Module):
    def __init__(self, ksize=(2, 2), stride=(2, 2), padding=0):
        super().__init__()
        self.ksize = ksize  # カーネルサイズ
        self.stride = stride  # ストライド幅
        self.padding = padding  # パディング

    def forward(self, x):
        return F.avg_pool2d(x, kernel_size=self.ksize, stride=self.stride, padding=self.padding)
        
class BatchNorm(nn.Module):
    def __init__(self, shape, epsilon=np.float32(1e-5)):
        super().__init__()
        self.gamma = nn.Parameter(torch.tensor(np.ones(shape, dtype='float32')))
        self.beta = nn.Parameter(torch.tensor(np.zeros(shape, dtype='float32')))
        self.epsilon = epsilon

    def forward(self, x):
        mean = torch.mean(x, (0, 2, 3), keepdim=True)  # WRITE ME
        std = torch.std(x, (0, 2, 3), keepdim=True)  # WRITE ME
        x_normalized = (x - mean) / (std**2 + self.epsilon)**0.5  # WRITE ME
        return self.gamma * x_normalized + self.beta  # WRITE ME

class Flatten(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x.view(x.size()[0], -1)

class Dense(nn.Module):
    def __init__(self, in_dim, out_dim, function=lambda x: x):
        super().__init__()
        # Heの初期化
        # in_dim: 入力の次元数，out_dim: 出力の次元数
               
        self.W = nn.Parameter(torch.tensor(rng.normal(
                        0,
                        np.sqrt(2/in_dim),
                        size=(in_dim, out_dim)
                    ).astype('float32')))

        self.b = nn.Parameter(torch.tensor(np.zeros([out_dim]).astype('float32')))
        self.function = function

    def forward(self, x):
        return self.function(torch.matmul(x, self.W) + self.b)

class Activation(nn.Module):
    def __init__(self, function=lambda x: x):
        super().__init__()
        self.function = function

    def __call__(self, x):
        return self.function(x)

In [None]:
import torch.optim as optim
import torch.autograd as autograd
import torch.nn.functional as F

rng = np.random.RandomState(1234)
random_state = 42
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


conv_net = nn.Sequential(
    Conv((32, 3, 3, 3)),        # 画像の大きさ：32x32x3 -> 30x30x32  # WRITE ME(入出力の画像サイズ）
    BatchNorm((32, 30, 30)),
    Activation(F.relu),
    Pooling((2, 2)),            # 30x30x32 -> 15x15x32  # WRITE ME(入出力の画像サイズ）
    Conv((64, 32, 3, 3)),       # 15x15x32 -> 13x13x64  # WRITE ME(入出力の画像サイズ）
    BatchNorm((64, 13, 13)),
    Activation(F.relu),
    Pooling((2, 2)),            # 13x13x64 -> 6x6x64  # WRITE ME(入出力の画像サイズ）
    Conv((128, 64, 3, 3)),      # 6x6x64 -> 4x4x128  # WRITE ME(入出力の画像サイズ）
    BatchNorm((128, 4, 4)),
    Activation(F.relu),
    Pooling((2, 2)),            # 4x4x128 -> 2x2x128  # WRITE ME(入出力の画像サイズ）
    Flatten(),
    Dense(2*2*128, 256, F.relu),  # WRITE ME(in_features)
    Dense(256, 10)
)



def init_weights(m):  # Heの初期化
    if type(m) == nn.Linear or type(m) == nn.Conv2d:
        torch.nn.init.kaiming_normal_(m.weight)
        m.bias.data.fill_(0.0)


conv_net.apply(init_weights)


n_epochs = 50
lr = 0.0005
device = 'cuda'

conv_net.to(device)
optimizer = optim.Adam(conv_net.parameters(), lr=lr)
loss_function = loss_function = nn.CrossEntropyLoss()

In [None]:
best_valid_acc = 0.0  # 最高の検証精度を保存するための変数を初期化
for epoch in range(n_epochs):
    losses_train = []
    losses_valid = []

    conv_net.train()
    n_train = 0
    acc_train = 0
    for x, t in dataloader_train:
        n_train += t.size()[0]

        conv_net.zero_grad()  # 勾配の初期化

        x = x.to(device)  # テンソルをGPUに移動

        t_hot = torch.eye(10)[t]  # 正解ラベルをone-hot vector化

        t = t.to(device)
        t_hot = t_hot.to(device)  # 正解ラベルとone-hot vectorをそれぞれGPUに移動

        y = conv_net.forward(x)  # 順伝播

        loss = -(t_hot*torch.log_softmax(y, dim=-1)).sum(axis=1).mean()  # 誤差(クロスエントロピー誤差関数)の計算

        loss.backward()  # 誤差の逆伝播

        optimizer.step()  # パラメータの更新

        pred = y.argmax(1)  # 最大値を取るラベルを予測ラベルとする

        acc_train += (pred == t).float().sum().item()
        losses_train.append(loss.tolist())

    conv_net.eval()
    n_val = 0
    acc_val = 0
    for x, t in dataloader_valid:
        # WRITE ME

        n_val += t.size()[0]

        x = x.to(device)  # テンソルをGPUに移動

        t_hot = torch.eye(10)[t]  # 正解ラベルをone-hot vector化

        t = t.to(device)
        t_hot = t_hot.to(device)  # 正解ラベルとone-hot vectorをそれぞれGPUに移動

        y = conv_net.forward(x)  # 順伝播

        loss = -(t_hot*torch.log_softmax(y, dim=-1)).sum(axis=1).mean()  # 誤差(クロスエントロピー誤差関数)の計算

        pred = y.argmax(1)  # 最大値を取るラベルを予測ラベルとする

        acc_val += (pred == t).float().sum().item()
        losses_valid.append(loss.tolist())


    acc_valval = acc_val / n_val
    if acc_valval > best_valid_acc:
      best_valid_acc = acc_val
      conv_net.eval()

      t_pred = []
      for x in dataloader_test:

          x = x.to(device)

          # 順伝播
          y = conv_net.forward(x)

          # モデルの出力を予測値のスカラーに変換
          pred = y.argmax(1).tolist()

          t_pred.extend(pred)

      submission = pd.Series(t_pred, name='label')
      submission.to_csv('drive/MyDrive/Colab Notebooks/DLBasics2023_colab/Lecture05/submission_pre.csv', header=True, index_label='id')

      print('EPOCH: {}, Train [Loss: {:.3f}, Accuracy: {:.3f}], Valid [Loss: {:.3f}, Accuracy: {:.3f}]'.format(
          epoch,
          np.mean(losses_train),
          acc_train/n_train,
          np.mean(losses_valid),
          acc_val/n_val
      ))

EPOCH: 0, Train [Loss: 1.411, Accuracy: 0.505], Valid [Loss: 1.130, Accuracy: 0.597]
