### 第4章画像分類（その2）
#### 畳み込みニューラルネットワーク

In [1]:
from collections import deque
import copy
from tqdm import tqdm

import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision
import torchvision.transforms as T

from src import utils, transform

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# 各チャネルのデータセット全体の平均と標準偏差を計算する関数
def get_dataset_statistics(dataset: DataLoader):
    data = []
    for i in range(len(dataset)):
        # [チャネル数, 高さ, 幅]の画像を取得
        img = dataset[i][0]
        data.append(img)
        data = torch.stack(data)
        
        # 各チャネルの平均と標準偏差を計算
        channel_mean = data.mean(dim=(0, 2, 3))
        channel_std = data.std(dim=(0, 2, 3))
        
        return channel_mean, channel_std

#### ResNet18の構造
- 畳み込み層と全結合層を合わせて18層持つ
- スキップ接続、残差接続を持つ

In [4]:
# 残差ブロックの実装
class BasicBlock(nn.Module):
    """
    ResNet18における残差ブロック
    in_channels  : 入力チャネル数
    out_channels : 出力チャネル数
    stride       : 畳み込み層のストライド
    """
    def __init__(self, in_channels: int, out_channels: int, stride: int=1):
        super().__init__()
        
        """残差接続"""
        self.conv1 = nn.Conv2d(in_channels, out_channels,
                               kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels,
                               kernel_size=3, paddind=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        """"""
        
        # ストライドが1より大きい時にスキップ接続と残差接続から得られる
        # 特徴マップの高さと幅を合わせるために、別途畳み込み演算を用意
        self.downsample = None
        if stride > 1:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1,
                          stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
    """
    順伝播関数
    x : 入力,[バッチサイズ, 入力チャネル数, 高さ, 幅] 
    """
    def forward(self, x: torch.Tensor):
        """残差接続"""
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        """"""
        
        # 残差接続て特徴マップが縮小される場合
        # スキップ接続の特徴マップを縮小してあわせる
        if self.downsample is not None:
            x = self.downsample(x)
            
        # 残差写像と恒等写像の要素ごとの和を計算
        out += x
        
        out = self.relu(out)
        
        return out

In [5]:
# ResNet18の実装
class ResNet18(nn.Module):
    """
    ResNet18モデル
    num_classes : 分類対象の物体クラス数
    """
    def __init__(self, num_classes: int):
        super().__init__()
        
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2,
                               padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = nn.Sequential(
            BasicBlock(64, 64),
            BasicBlock(64, 64),
        )
        self.layer2 = nn.Sequential(
            BasicBlock(64, 128, stride=2),
            BasicBlock(128, 128),
        )
        self.layer3 = nn.Sequential(
            BasicBlock(128, 256, stride=2),
            BasicBlock(256, 256),
        )
        self.layer4 = nn.Sequential(
            BasicBlock(256, 512, stride=2),
            BasicBlock(512, 512),
        )
        
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        
        self.linear = nn.Linear(512, num_classes)
        
    """
    順伝播関数
    x            : 入力, [バッチサイズ, 入力チャネル数, 高さ, 幅]
    return_embed : 特徴量を返すかロジットを返すか選択する真偽値 
    """
    def forward(self, x: torch.Tensor, return_embed: bool=False):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.max_pool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avg_pool(x)
        x = x.flatten(1)
        
        if return_embed:
            return x
        
        x = self.linear(x)
        
        return x
    
    """
    モデルが保存されているデバイスを返す関数
    """
    def get_device(self):
        return self.linear.weight.device
    
    """
    モデルを複製して返す関数
    """
    def copy(self):
        return copy.deepcopy(self)