# PyTorch を利用した実装（畳み込みニューラルネットワーク）

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](http://colab.research.google.com/github/hseiyama/DeepLearning/blob/main/pytorch_basic_02_cnn.ipynb)

In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt

### パラメータの定義

In [None]:
epoch_num = 20  # エポック数を設定する
batch_size = 100  # バッチの数
learning_rate = 0.01  # 学習係数

### MNIST データセット

In [None]:
# 訓練データをdatasetsからダウンロード
training_data = datasets.MNIST(
    root='data',
    train=True,
    download=True,
    transform=ToTensor()
)

# テストデータをdatasetsからダウンロード
test_data = datasets.MNIST(
    root='data',
    train=False,
    download=True,
    transform=ToTensor()
)

### データローダー

In [None]:
# データローダーの作成
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print('X.shape [N, C, H, W]:', X.shape)
    print('y.shape:', y.shape, y.dtype)
    break

#### 【オプション】入力データの確認

In [None]:
for X, y in train_dataloader:
    x_train = X
    t_train = y
    break

# 画像を表示
figure = plt.figure(figsize=(12, 4))
rows, cols = 2, 6
for i in range(rows * cols):
    img, label = x_train[i], t_train[i]
    figure.add_subplot(rows, cols, i + 1)
    plt.title(f'{label} (idx={i})')
    plt.axis('off')
    plt.imshow(img.squeeze(), cmap='gray')  # 1x28x28 -> 28x28 に変換
plt.show()
print(y[:rows * cols])

### ニューラルネットワークの定義

In [None]:
# modelを定義します
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Sequential(
            # 畳み込み層（入力チャンネル数、フィルタ数、フィルタサイズ、ストライド、パディング）
            nn.Conv2d(1, 30, kernel_size=5, stride=1, padding=0),  # (batch)x1x28x28 -> (batch)x30x24x24
            nn.ReLU(),
            # プーリング層（領域のサイズ、ストライド、パディング）
            nn.MaxPool2d(2, stride=2, padding=0)  # (batch)x30x24x24 -> (batch)x30x12x12
        )
        self.layer2 = nn.Sequential(
            nn.Flatten(),  # (batch)x30x12x12 -> (batch)x(30*12*12) に変換
            nn.Linear(30*12*12, 100),  # 全結合層 (batch)x(30*12*12) -> (batch)x100
            nn.ReLU()
        )
        self.layer3 = nn.Linear(100, 10)  # 全結合層 (batch)x100-> (batch)x10

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        return x

### ニューラルネットワークのインスタンス化

In [None]:
# 訓練に際して、可能であればGPU（cuda）を設定します。GPUが搭載されていない場合はCPUを使用します
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

model = NeuralNetwork().to(device)
print(model)

### 損失関数と最適化手法を定義

In [None]:
loss_fn = nn.CrossEntropyLoss()
#optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)  # SGD
#optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)  # Momentum SGD
#optimizer = torch.optim.Adagrad(model.parameters(), lr=learning_rate)  # Adagrad
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)  # Adam

### 訓練のプロセス

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    model.train()  # 訓練モード
    loss_list = []
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        #y = torch.eye(10)[y]  # index -> one-hot 形式に変換
        
        # 損失誤差を計算
        pred = model(X)
        loss = loss_fn(pred, y)
        
        # バックプロパゲーション
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        loss_list.append(loss.item())
        print(f'\rtrain batch({batch + 1})', end='')
    
    return loss_list

### 評価のプロセス

In [None]:
def test(dataloader, model, title):
    model.eval()  # 評価モード
    correct = 0
    size = len(dataloader.dataset)
    with torch.no_grad():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            pred = model(X)
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            print(f'\rtest({title}) batch({batch + 1}){" " * 5}', end='')
    correct /= size
    
    return correct

### ミニバッチの学習

In [None]:
train_loss_list = []
train_acc_list = [0.0]
test_acc_list = [0.0]

for t in range(epoch_num):
    train_loss = train(train_dataloader, model, loss_fn, optimizer)
    train_loss_list += train_loss
    print(f'\repoch({t + 1}) loss = {train_loss_list[-1]}')
    train_acc = test(train_dataloader, model, 'train_data')
    test_acc = test(test_dataloader, model, 'test_data')
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)
    print(f'\repoch({t + 1}) accuracy(train, test) = ({train_acc}, {test_acc})')

print('Done!')

### 損失関数の推移

In [None]:
# グラフの描画
x = torch.arange(len(train_loss_list))
plt.plot(x, train_loss_list, label='train loss')
plt.xlabel('batchs')
plt.ylabel('loss')
plt.title('loss by batchs')
plt.legend(loc='upper right')
plt.show()

### 認識精度の推移

In [None]:
# グラフの描画
x = torch.arange(len(train_acc_list))
plt.plot(x, train_acc_list, label='train acc', marker='o')
plt.plot(x, test_acc_list, label='test acc', linestyle='--', marker='^')
plt.xlabel('epochs')
plt.ylabel('accuracy')
plt.title('accuracy by epochs')
plt.ylim(0, 1.0)
plt.legend(loc='lower right')
plt.show()

#### 【オプション】テストの結果

In [None]:
x_test = None
t_test = None
test_predict = None

# テストの実行
model.eval()
with torch.no_grad():
    for X, y in test_dataloader:
        X, y = X.to(device), y.to(device)
        pred = model(X)
        X, y, pred = X.cpu(), y.cpu(), pred.cpu()
        x_test = torch.cat((x_test,X),0) if x_test is not None else X
        t_test = torch.cat((t_test,y),0) if t_test is not None else y
        test_predict = torch.cat((test_predict,pred),0) if test_predict is not None else pred

print(test_predict.shape)
for index in range(3):
    print(f'index = {index}')
    print(test_predict[index])
    print(t_test[index])

#### 【オプション】エラー結果の確認

In [None]:
# エラー結果の要素を抽出
index_error = torch.argmax(test_predict, axis=1) != t_test
error_list = torch.arange(len(test_predict))[index_error]
print(f'error rate = {len(error_list) / len(test_predict) * 100}[%]',
      f'(num = {len(error_list)}/{len(test_predict)})')

offset = 0
# 画像を表示
figure = plt.figure(figsize=(12, 2))
rows, cols = 1, 6
for i in range(rows * cols):
    index = error_list[offset + i]
    img, label_t, label_y = x_test[index], t_test[index], test_predict[index].argmax()
    figure.add_subplot(rows, cols, i + 1)
    plt.title(f'{label_y} (t={label_t})')
    plt.axis('off')
    plt.imshow(img.squeeze(), cmap='gray')  # 1x28x28 -> 28x28 に変換
plt.show()
print(test_predict[error_list[offset:offset + rows * cols]])

#### 【オプション】モデルの保存と読み込み

In [None]:
#torch.save(model, 'model_cnn.pth')
model = torch.load('model_cnn.pth')

#### 【オプション】1層目の重みの可視化

In [None]:
def filter_show(filters, nx=10):
    FN, C, FH, FW = filters.shape
    ny = int(FN / nx)

    fig = plt.figure()
    fig.subplots_adjust(left=0, right=1.6, bottom=0, top=0.7, hspace=0.1, wspace=0.1)

    for i in range(FN):
        ax = fig.add_subplot(ny, nx, i+1, xticks=[], yticks=[])
        ax.imshow(filters[i, 0], cmap=plt.cm.gray_r, interpolation='nearest')
    plt.show()

# 学習後の重み
filter_show(model.state_dict()['layer1.0.weight'].cpu())