<a href="https://colab.research.google.com/github/SY-256/basics-of-image-recognition/blob/main/chapter03.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## 畳み込みニューラルネットワーク

In [None]:
base_path = "/content/drive/MyDrive/colab-notebooks/basics-of-image-recognition/"

In [None]:
import numpy as np

def im2col(img, h, w, stride=1, pad=0):
    """
    畳み込みフィルタ
    img: (B, C, H, W)
    h: kernel size
    w: kernel width
    """
    B, C, H, W = img.shape
    img = np.pad(img, [(0, 0), (0, 0), (pad, pad), (pad, pad)], "constant")
    out_h = (H + 2*pad - h)//stride + 1
    out_w = (W + 2*pad - w)//stride + 1

    out = np.zeros((B, C, h, w, out_h, out_w))

    for y in range(h):
        y_ = y + stride*out_h
        for x in range(w):
            x_ = x + stride*out_w
            out[:, :, y, x, :, :] = img[:, :, y:y_:stride, x:x_:stride]

    out = out.transpose(0, 4, 5, 1, 2, 3).reshape(B*out_h*out_w, -1)
    return out

class Conv2D:
    """
    in_ch: number of input channels
    out_ch: number of output channels
    h, w: kernel size
    stride: stride of conv process
    pad: padding size
    img: input image (B, in_ch, H, W)
    """
    def __init__(self, in_ch, out_ch, h, w, stride=1, pad=0):
        self.out_ch = out_ch
        self.stride = stride
        self.pad = pad
        self.filters = np.random.randn(out_ch, in_ch, h, w)
        self.bias = np.zeros(out_ch)

    def forward(self, img):
        B, in_ch, H, W = img.shape
        out_ch, in_ch, h, w = self.filters.shape

        img = im2col(img, h, w, self.stride, self.pad)
        filters = self.filters.reshape(out_ch, -1).T

        out = np.dot(img, filters) + self.bias

        out_h = 1 + int((H + 2*self.pad - h) / self.stride)
        out_w = 1 + int((W + 2*self.pad - w) / self.stride)
        out = out.reshape(B, out_h, out_w, -1).transpose(0, 3, 1, 2)
        return out

In [None]:
### edeg detection
import matplotlib.pyplot as plt
from PIL import Image
img = Image.open(base_path + "keyboard.png").convert("L")
img = np.array(img, dtype=float) / 255
img = img[np.newaxis, np.newaxis, :, :]
conv = Conv2D(in_ch=1, out_ch=1, h=3, w=3, stride=1, pad=1)
### horizontal edge detectio
conv.filters[0, 0, :, :] = np.array([[-1, -2, -1], [0 ,0, 0], [1, 2, 1]])
conv.filters[0, 0, :, :] = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]])
out = conv.forward(img)
out = np.clip(out, 0, 1)

plt.subplot(121)
plt.imshow(img[0, 0], cmap="gray")
plt.title("Input Image")
plt.subplot(122)
plt.imshow(out[0, 0], cmap="gray")
plt.title("Output Image")
plt.savefig(base_path + "convoled_output.png")
plt.show()

# GAP（グローバルアベレージプーリング）をスクラッチで

In [None]:
### GAP（グローバルアベレージプーリング）
import numpy as np

def global_average_poooling_2d(x):
    """
    Global Average Pooling 2D

    Args:
        x: 入力テンソル（batch, hight, width, channels）または（height, width, chnnels）
    Returns:
        出力テンソル（batch_size, channels）または（channels,）
    """
    if x.ndim == 4: # バッチあり
        return np.mean(x, axis=(1,2))
    elif x.ndim == 3: # バッチなしの場合
        return np.mean(x, axis=(0, 1))
    else:
        raise ValueError("入力次元は3次元または4次元である必要があります")

def global_average_poooling_1d(x):
    """
    Global Average Pooling 1D

    Args:
        x: 入力テンソル（batch_size, length, channels）または（lenght, channels）

    Returns:
        出力テンソル（batch_size, channels）または（channels,）
    """
    if x.ndim == 3: # バッチあり
        return np.mean(x, axis=1)
    elif x.ndim == 2: # バッチなし
        return np.mean(x, axis=0)
    else:
        raise ValueError("入力は2次元または3次元である必要があります")

class GlobalAveragePooling2D:
    """クラス版のGlobal Average Pooling 2D"""

    def forward(self, x):
        self.input_shape = x.shape
        return global_average_poooling_2d(x)

    def backward(self, grad_output):
        """逆伝播の実装"""
        if len(self.input_shape) == 4:
            batch_size, height, width, channels = self.input_shape
            # 勾配を元の形状に戻す（平均で割った値を全ピクセルに分散）
            grad_input = np.zeros(self.input_shape)
            for i in range(batch_size):
                for c in range(channels):
                    grad_input[i, :, :, c] = grad_output[i, c] / (height * width)

        else:
            height, width, channels = self.input_shape
            grad_input = np.zeros(self.input_shape)
            for c in range(channels):
                grad_input[:, :, c] = grad_output[c] / (height * width)

        return grad_input


In [None]:
print("=== Global Average Pooling 2D ===")
# 単一画像の場合（height=4, width=4, channels=3）
x_2d = np.random.randn(4, 4, 3)
print(f"入力形状: {x_2d.shape}")
print(f"入力値: {x_2d}")

output_2d = global_average_poooling_2d(x_2d)
print(f"出力形状: {output_2d.shape}")
print(f"出力値: {output_2d}")

# バッチの場合（batch=2, height=4, width=4, channels=3）
x_batch_2d = np.random.randn(2, 4, 4, 3)
print(f"\n入力バッチ形状: {x_batch_2d.shape}")

output_batch_2d = global_average_poooling_2d(x_batch_2d)
print(f"バッチ出力形状: {output_batch_2d.shape}")

print("\n==== クラス版 ====")
gap_layer = GlobalAveragePooling2D()

x_test = np.random.randn(1, 3, 3, 2)
output = gap_layer.forward(x_test)
print(f"入力形状: {x_test.shape}")
print(f"出力形状: {output.shape}")

# 逆伝播のテスト
grad_output = np.ones_like(output)
grad_input = gap_layer.backward(grad_output)
print(f"勾配入力形状: {grad_input.shape}")
print(f"勾配値の例: {grad_input[0, 0, 0, :]}")


=== Global Average Pooling 2D ===
入力形状: (4, 4, 3)
入力値: [[[ 1.9216899   0.76574373  0.19227038]
  [ 0.90747202 -1.00150843 -0.93774835]
  [ 0.45887446  1.36584178 -0.61927072]
  [ 0.66800848 -0.97647051 -0.8558606 ]]

 [[ 1.51556905 -0.81584812  1.353832  ]
  [ 0.82094649 -1.34260133 -0.19318977]
  [-0.7527346   1.99628424 -1.3232735 ]
  [-0.76252744  1.4831022   1.13187252]]

 [[ 1.33507294  1.27968203  0.43559142]
  [ 0.19062981  0.41805862 -0.30810173]
  [-0.94957467 -0.17696031  0.8310906 ]
  [ 0.57156049 -0.66808225 -1.60692287]]

 [[-1.21114089  1.37205759 -0.22141397]
  [-0.19499464  0.24747499 -0.61352329]
  [ 0.11728387  0.46471609  0.68810329]
  [ 0.42640274  1.00966312 -0.35621508]]]
出力形状: (3,)
出力値: [ 0.31640863  0.33882209 -0.15017248]

入力バッチ形状: (2, 4, 4, 3)
バッチ出力形状: (2, 3)

==== クラス版 ====
入力形状: (1, 3, 3, 2)
出力形状: (1, 2)
勾配入力形状: (1, 3, 3, 2)
勾配値の例: [0.11111111 0.11111111]


# 残差学習をスクラッチで

In [None]:
import numpy as np

def relu(x):
    """ReLU活計化関数"""
    return np.maximum(0, x)

def conv2d(x, weights, bias=None, stride=1, padding=0):
    """2D畳み込み（簡易版）"""
    batch_size, in_height, in_width, in_channels = x.shape
    out_channels, kernel_h, kernel_w, _ = weights.shape

    # パディング
    if padding > 0:
        x = np.pad(x, ((0, 0), (padding, padding), (padding, padding), (0, 0,)), "constant")

    out_height = (x.shape[1] - kernel_h) // stride + 1
    out_width = (x.shape[2] - kernel_w) // stride + 1

    output = np.zeros((batch_size, out_height, out_width, out_channels))

    for b in range(batch_size):
        for oc in range(out_channels):
            for oh in range(out_height):
                for ow in range(out_width):
                    h_start = oh * stride
                    w_start = ow * stride
                    patch = x[b, h_start:h_start+kernel_h, w_start:w_start+kernel_w, :]
                    output[b, oh, ow, oc] = np.sum(patch * weights[oc]) + (bias[oc] if bias is not None else 0)
    return output

def batch_norm(x, gamma, beta, eps=1e-5):
    """バッチ正規化"""
    mean = np.mean(x, axis=(0, 1, 2), keepdims=True)
    var = np.var(x, axis=(0, 1, 2), keepdims=True)
    x_norm = (x - mean) / np.sqrt(var + eps)
    return gamma * x_norm + beta

class BasicBlock:
    """ResNetの基本ブロック（2層）"""

    def __init__(self, in_channels, out_channels, stride=1):
        self.stride = stride
        self.in_channels = in_channels
        self.out_channels = out_channels

        # 畳み込み層の重み初期化
        self.conv1d_weights = np.random.randn(out_channels, 3, 3, in_channels) * 0.1
        self.conv1d_bias = np.zeros(out_channels)

        self.conv2d_weights = np.random.randn(out_channels, 3, 3, out_channels) * 0.1
        self.conv2d_bias = np.zeros(out_channels)

        # バッチ正規化パラメータ
        self.bn1_gamma = np.ones(out_channels)
        self.bn1_beta = np.zeros(out_channels)
        self.bn2_gamma = np.ones(out_channels)
        self.bn2_beta = np.zeros(out_channels)

        # ショートカット接続（チャネル数が変わる場合）
        self.use_shortcut_conv = (stride != 1) or (in_channels != out_channels)
        if self.use_shortcut_conv:
            self.shortcut_weights = np.random.randn(out_channels, 1, 1, in_channels) * 0.1
            self.shortcut_bias = np.zeros(out_channels)
            self.shortcut_bn_gamma = np.ones(out_channels)
            self.shortcut_bn_beta = np.zeros(out_channels)

    def forward(self, x):
        """順伝播"""
        identity = x

        # メインパス
        out = conv2d(x, self.conv1d_weights, self.conv1d_bias, stride=self.stride, padding=1)
        out = batch_norm(out, self.bn1_gamma, self.bn1_beta)
        out = relu(out)

        out = conv2d(out, self.conv2d_weights, self.conv2d_bias, stride=1, padding=1)
        out = batch_norm(out, self.bn2_gamma, self.bn2_beta)

        # ショートカット接続
        if self.use_shortcut_conv:
            identity = conv2d(identity, self.shortcut_weights, self.shortcut_bias,
                              stride=self.stride, padding=0)
            identity = batch_norm(identity, self.shortcut_bn_gamma, self.shortcut_bn_beta)

        # 残差接続
        out = out + identity
        out = relu(out)

        return out

class SimpleResNet:
    """シンプルなResNet"""

    def __init__(self, num_classes=10):
        self.num_classes = num_classes

        # 初期畳み込み層
        self.conv1d_weights = np.random.randn(64, 7, 7, 3) * 0.1
        self.conv1d_bias = np.zeros(64)
        self.bn1_gamma = np.ones(64)
        self.bn1_beta = np.zeros(64)

        # 残差ブロック
        self.layer1 = [BasicBlock(64, 64) for _ in range(2)]
        self.layer2 = [BasicBlock(64, 128, stride=2)] + [BasicBlock(128, 128) for _ in range(1)]
        self.layer3 = [BasicBlock(128, 256, stride=2)] + [BasicBlock(256, 256) for _ in range(1)]

        # 分類層
        self.fc_weights = np.random.randn(256, num_classes) * 0.1
        self.fc_bias = np.zeros(num_classes)

    def forward(self, x):
        """順伝播"""
        # 初期畳み込み
        out = conv2d(x, self.conv1d_weights, self.conv1d_bias, stride=2, padding=3)
        out = batch_norm(out, self.bn1_gamma, self.bn1_beta)
        out = relu(out)

        # 残差ブロック
        for block in self.layer1:
            out = block.forward(out)

        for block in self.layer2:
            out = block.forward(out)

        for block in self.layer3:
            out = block.forward(out)

        # Global Average Pooling
        out = np.mean(out, axis=(1, 2))

        # 全結合層
        out = np.dot(out, self.fc_weights) + self.fc_bias

        return out

In [None]:
# テストデータ
batch_size = 2
x = np.random.randn(batch_size, 32, 32, 3)

print("=== BasicBlockテスト ===")
basic_block = BasicBlock(3, 64, stride=1)
output_basic = basic_block.forward(x)
print(f"入力形状: {x.shape}")
print(f"BasicBlock出力形状: {output_basic.shape}")

print("\n=== SimpleResNet ===")
model = SimpleResNet(num_classes=10)
output = model.forward(x)
print(f"ResNet出力形状: {output.shape}")
print(f"出力例: {output[0][:5]}") # 最初のサンプルの最初の5クラス

# 残差接続の効果を確認
print("\n=== 残差接続の効果確認 ===")
# 同じ入力に対して複数回実行（実際の学習では勾配が流れやすくなる）
x_test = np.random.randn(1, 8, 8, 64)
block_test = BasicBlock(64, 64)

# 入力と出力の差分（残差）を確認
output_test = block_test.forward(x_test)
residual = output_test - x_test # これが学習される残差
print(f"入力平均: {np.mean(x_test):.4f}")
print(f"出力平均: {np.mean(output_test):.4f}")
print(f"残差平均: {np.mean(residual):.4f}")
print("⇒ 残差学習により、恒等写像からの小さな変化を学習")

=== BasicBlockテスト ===
入力形状: (2, 32, 32, 3)
BasicBlock出力形状: (2, 32, 32, 64)

=== SimpleResNet ===
ResNet出力形状: (2, 10)
出力例: [ 0.88414622 -0.03603872  1.18273422 -0.09054755 -1.17715313]

=== 残差接続の効果確認 ===
入力平均: 0.0114
出力平均: 0.5692
残差平均: 0.5579
⇒ 残差学習により、恒等写像からの小さな変化を学習


### ResNet18

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import numpy as np
from tqdm import tqdm

# デバイスの設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"使用デバイス: {device}")

使用デバイス: cuda


In [None]:
# データの前処理設定
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

In [None]:
# CIFAR-10データセットの読み込み
trainset = torchvision.datasets.CIFAR10(
    root="./data", train=True, download=True, transform=transform_train
)

trainloader = DataLoader(trainset, batch_size=128, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(
    root="./data", train=False, download=True, transform=transform_test
)

testloader = DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

# クラス名定義
classes = ("plane", "car", "bird", "cat", "deer",
           "dog", "frog", "horse", "ship", "truck")

In [None]:
# ResNetモデルの読み込み（事前学習済みモデル）
from torchvision import models

# ResNet18を使用
model = models.resnet18(pretrained=True)



Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


100%|██████████| 44.7M/44.7M [00:00<00:00, 143MB/s]


In [None]:
# CIFAR-10は10クラスなので、最終層を追加
num_classes = 10
model.fc = nn.Linear(model.fc.in_features, num_classes) # 全結合層
model = model.to(device)

In [None]:
# 損失関数と最適化手法の設定
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=5e-4)
schedular = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)

In [None]:
# 学習関数
def train_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    progress_bar = tqdm(dataloader, desc="Training")
    for inputs, targets in progress_bar:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        progress_bar.set_postfix({
            "loss": running_loss / (progress_bar.n + 1),
            "acc": 100. * correct / total
        })

    return running_loss / len(dataloader), 100 * correct / total

In [None]:
# 検証関数
def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        progress_bar = tqdm(dataloader, desc="Validation")
        for inputs, targets in progress_bar:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            progress_bar.set_postfix({
                "loss": running_loss / (progress_bar.n + 1),
                "acc": 100. * correct / total
            })

    return running_loss / len(dataloader), 100. * correct / total


In [None]:
# 学習の実行
num_epochs = 10
best_acc = 0

print("学習開始...")
for epoch in range(num_epochs):
    print(f"\nEpoch {epoch+1}/{num_epochs}")
    print("-"*50)

    train_loss, train_acc = train_epoch(model, trainloader, criterion, optimizer, device)
    val_loss, val_acc = validate(model, testloader, criterion, device)

    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Validate Loss: {val_loss:.4f}, Validate Acc: {val_acc:.2f}%")

    schedular.step()

    # ベストモデルの保存
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save({
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "best_acc": best_acc,
        }, "best_resnet_model.pth")
        print(f"ベストモデルを保存しました (Acc: {best_acc:.2f}%)")

print(f"\n学習完了 ベスト精度: {best_acc:.2f}%")


学習開始...

Epoch 1/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:24<00:00, 15.86it/s, loss=1.31, auc=53.6]
Validation: 100%|██████████| 100/100 [00:02<00:00, 34.29it/s, loss=0.967, acc=66.1]


Train Loss: 1.3092, Train Acc: 53.57%
Validate Loss: 0.9667, Validate Acc: 66.08%
ベストモデルを保存しました (Acc: 66.08%)

Epoch 2/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:23<00:00, 16.93it/s, loss=0.896, auc=68.7]
Validation: 100%|██████████| 100/100 [00:03<00:00, 28.81it/s, loss=0.821, acc=72.6]


Train Loss: 0.8962, Train Acc: 68.69%
Validate Loss: 0.7798, Validate Acc: 72.58%
ベストモデルを保存しました (Acc: 72.58%)

Epoch 3/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:24<00:00, 16.00it/s, loss=0.778, auc=72.7]
Validation: 100%|██████████| 100/100 [00:03<00:00, 28.15it/s, loss=0.701, acc=75.6]


Train Loss: 0.7776, Train Acc: 72.69%
Validate Loss: 0.7013, Validate Acc: 75.60%
ベストモデルを保存しました (Acc: 75.60%)

Epoch 4/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:23<00:00, 16.89it/s, loss=0.698, auc=75.8]
Validation: 100%|██████████| 100/100 [00:03<00:00, 26.29it/s, loss=0.659, acc=77.3]


Train Loss: 0.6964, Train Acc: 75.76%
Validate Loss: 0.6524, Validate Acc: 77.30%
ベストモデルを保存しました (Acc: 77.30%)

Epoch 5/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:23<00:00, 16.89it/s, loss=0.646, auc=77.4]
Validation: 100%|██████████| 100/100 [00:03<00:00, 30.59it/s, loss=0.639, acc=78.3]


Train Loss: 0.6447, Train Acc: 77.39%
Validate Loss: 0.6267, Validate Acc: 78.30%
ベストモデルを保存しました (Acc: 78.30%)

Epoch 6/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:23<00:00, 16.64it/s, loss=0.6, auc=78.9]
Validation: 100%|██████████| 100/100 [00:02<00:00, 36.28it/s, loss=0.612, acc=79.7]


Train Loss: 0.5989, Train Acc: 78.93%
Validate Loss: 0.6000, Validate Acc: 79.66%
ベストモデルを保存しました (Acc: 79.66%)

Epoch 7/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:23<00:00, 16.40it/s, loss=0.566, auc=79.9]
Validation: 100%|██████████| 100/100 [00:02<00:00, 39.36it/s, loss=0.581, acc=79.9]


Train Loss: 0.5664, Train Acc: 79.89%
Validate Loss: 0.5756, Validate Acc: 79.86%
ベストモデルを保存しました (Acc: 79.86%)

Epoch 8/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:24<00:00, 16.20it/s, loss=0.54, auc=81.1]
Validation: 100%|██████████| 100/100 [00:02<00:00, 40.25it/s, loss=0.579, acc=80.8]


Train Loss: 0.5390, Train Acc: 81.08%
Validate Loss: 0.5614, Validate Acc: 80.85%
ベストモデルを保存しました (Acc: 80.85%)

Epoch 9/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:23<00:00, 16.32it/s, loss=0.509, auc=82.2]
Validation: 100%|██████████| 100/100 [00:02<00:00, 39.62it/s, loss=0.582, acc=80.8]


Train Loss: 0.5072, Train Acc: 82.23%
Validate Loss: 0.5588, Validate Acc: 80.77%

Epoch 10/10
--------------------------------------------------


Training: 100%|██████████| 391/391 [00:24<00:00, 16.19it/s, loss=0.489, auc=82.9]
Validation: 100%|██████████| 100/100 [00:02<00:00, 40.03it/s, loss=0.553, acc=81.1]


Train Loss: 0.4875, Train Acc: 82.92%
Validate Loss: 0.5416, Validate Acc: 81.13%
ベストモデルを保存しました (Acc: 81.13%)

学習完了 ベスト精度: 81.13%


### SE-ResNet
- Gloval Average Pooling + MLP（2層全結合層）

**特徴**
1. SEBlock（Squeeze-and-Excitation Block）

    Global Average Poolingでチャネルごとの特徴を圧縮
    2つの全結合層（削減→復元）でチャネル間の依存関係を学習
    Sigmoidで0-1の重みを生成し、元の特徴マップにスケーリング

2. SEBasicBlock / SEBottleneck

    通常のResNetブロックにSEBlockを追加
    ResNet-18/34用のBasicBlockと、ResNet-50以上用のBottleneckに対応

3. SE-ResNet本体

    se_resnet18、se_resnet34、se_resnet50の3つのバリエーション
    CIFAR-10での学習例（10クラス分類）

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import os

In [None]:
class SEBlock(nn.Module):
    """Squeeeze-and-Excitation Block"""
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(channels, channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        # Sequeeze: Gloval Average Pooling
        y = self.squeeze(x).view(b, c)
        # Excitation: FC -> ReLU -> FC -> Sigmoid
        y = self.excitation(y).view(b, c, 1, 1)
        # Scale
        return x * y.expand_as(x)

class SEBasickBlock(nn.Module):
    """SE-ResNetの基本ブロック(ResNet-18/34用)"""
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsampling=None, reduction=16):
        super(SEBasickBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.se = SEBlock(out_channels, reduction)
        self.downsampling = downsampling

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        # SE Block
        out = self.se(out)

        if self.downsampling is not None:
            identity = self.downsampling(x)

        out += identity
        out = self.relu(out)

        return out

class SEBottleneck(nn.Module):
    """SE-ResNetのボトルネックブロック(ResNet-50/101/152用)"""
    expansion = 4

    def __init__(self, in_cahnnels, out_channels, stride=1, downsampling=None, reduction=16):
        super(SEBottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_cahnnels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                                stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion,
                               kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.se = SEBlock(out_channels * self.expansion, reduction)
        self.downsampling = downsampling

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        # SE Block
        out = self.se(out)

        if self.donsampling is not None:
            identity = self.downsampling(x)

        out += identity
        out = self.relu(out)

        return out

class SEResNet(nn.Module):
    """SE-ResNet"""
    def __init__(self, block, layers, num_classes=10, reduction=16):
        super(SEResNet, self).__init__()
        self.in_channels = 64
        self.reduction = reduction

        # 初期層
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNetレイヤー
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        # 分類層
        self.avggpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        # 重みの初期化
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsampling = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsampling = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion)
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsampling, self.reduction))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels, reduction=self.reduction))
        self.block.ex
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


## EfficientNet

In [1]:
import torch
import torch.nn as nn
import math
from typing import Optional, Tuple

class Swish(nn.Module):
    """Swish activation function (also known as SiLU)"""
    def forward(self, x):
        return x * torch.sigmoid(x)


class SEBlock(nn.Module):
    """Squeeze-and-Excitation Block"""
    def __init__(self, in_channels: int, se_ratio: float = 0.25):
        super().__init__()
        squeeze_channels = max(1, int(in_channels * se_ratio))
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(in_channels, squeeze_channels, 1),
            Swish(),
            nn.Conv2d(squeeze_channels, in_channels, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return x * self.se(x)


class MBConvBlock(nn.Module):
    """Mobile Inverted Bottleneck Convolution Block"""
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        kernel_size: int,
        stride: int,
        expand_ratio: int,
        se_ratio: float = 0.25,
        drop_connect_rate: float = 0.2
    ):
        super().__init__()
        self.stride = stride
        self.use_residual = (stride == 1 and in_channels == out_channels)
        self.drop_connect_rate = drop_connect_rate

        # Expansion phase
        expanded_channels = in_channels * expand_ratio
        self.expand = None
        if expand_ratio != 1:
            self.expand = nn.Sequential(
                nn.Conv2d(in_channels, expanded_channels, 1, bias=False),
                nn.BatchNorm2d(expanded_channels),
                Swish()
            )

        # Depthwise convolution
        self.depthwise = nn.Sequential(
            nn.Conv2d(
                expanded_channels,
                expanded_channels,
                kernel_size,
                stride=stride,
                padding=kernel_size // 2,
                groups=expanded_channels,
                bias=False
            ),
            nn.BatchNorm2d(expanded_channels),
            Swish()
        )

        # Squeeze-and-Excitation
        self.se = SEBlock(expanded_channels, se_ratio)

        # Output projection
        self.project = nn.Sequential(
            nn.Conv2d(expanded_channels, out_channels, 1, bias=False),
            nn.BatchNorm2d(out_channels)
        )

    def forward(self, x):
        identity = x

        # Expansion
        if self.expand is not None:
            x = self.expand(x)

        # Depthwise + SE
        x = self.depthwise(x)
        x = self.se(x)

        # Projection
        x = self.project(x)

        # Skip connection with drop connect
        if self.use_residual:
            if self.training and self.drop_connect_rate > 0:
                x = self._drop_connect(x, self.drop_connect_rate)
            x = x + identity

        return x

    @staticmethod
    def _drop_connect(x, drop_rate):
        """Drop connect (stochastic depth)"""
        if not x.requires_grad:
            return x
        keep_prob = 1 - drop_rate
        random_tensor = keep_prob + torch.rand(
            (x.shape[0], 1, 1, 1), dtype=x.dtype, device=x.device
        )
        random_tensor.floor_()
        return x.div(keep_prob) * random_tensor


class EfficientNet(nn.Module):
    """EfficientNet implementation"""
    def __init__(
        self,
        width_mult: float = 1.0,
        depth_mult: float = 1.0,
        dropout_rate: float = 0.2,
        drop_connect_rate: float = 0.2,
        num_classes: int = 1000
    ):
        super().__init__()

        # Building blocks configuration: [expand_ratio, channels, repeats, stride, kernel_size]
        blocks_config = [
            [1, 16, 1, 1, 3],   # Stage 1
            [6, 24, 2, 2, 3],   # Stage 2
            [6, 40, 2, 2, 5],   # Stage 3
            [6, 80, 3, 2, 3],   # Stage 4
            [6, 112, 3, 1, 5],  # Stage 5
            [6, 192, 4, 2, 5],  # Stage 6
            [6, 320, 1, 1, 3],  # Stage 7
        ]

        # Stem
        out_channels = self._round_filters(32, width_mult)
        self.stem = nn.Sequential(
            nn.Conv2d(3, out_channels, 3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            Swish()
        )

        # Building blocks
        self.blocks = nn.ModuleList([])
        total_blocks = sum([self._round_repeats(config[2], depth_mult)
                           for config in blocks_config])
        block_idx = 0

        in_channels = out_channels
        for expand_ratio, channels, repeats, stride, kernel_size in blocks_config:
            out_channels = self._round_filters(channels, width_mult)
            repeats = self._round_repeats(repeats, depth_mult)

            for i in range(repeats):
                # Drop connect rate increases linearly
                drop_rate = drop_connect_rate * block_idx / total_blocks

                self.blocks.append(
                    MBConvBlock(
                        in_channels=in_channels,
                        out_channels=out_channels,
                        kernel_size=kernel_size,
                        stride=stride if i == 0 else 1,
                        expand_ratio=expand_ratio,
                        drop_connect_rate=drop_rate
                    )
                )
                in_channels = out_channels
                block_idx += 1

        # Head
        final_channels = self._round_filters(1280, width_mult)
        self.head = nn.Sequential(
            nn.Conv2d(in_channels, final_channels, 1, bias=False),
            nn.BatchNorm2d(final_channels),
            Swish(),
            nn.AdaptiveAvgPool2d(1),
            nn.Dropout(dropout_rate)
        )

        # Classifier
        self.classifier = nn.Linear(final_channels, num_classes)

        self._initialize_weights()

    def forward(self, x):
        x = self.stem(x)
        for block in self.blocks:
            x = block(x)
        x = self.head(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

    @staticmethod
    def _round_filters(filters: int, width_mult: float, divisor: int = 8) -> int:
        """Round number of filters based on width multiplier"""
        filters *= width_mult
        new_filters = max(divisor, int(filters + divisor / 2) // divisor * divisor)
        if new_filters < 0.9 * filters:
            new_filters += divisor
        return int(new_filters)

    @staticmethod
    def _round_repeats(repeats: int, depth_mult: float) -> int:
        """Round number of repeats based on depth multiplier"""
        return int(math.ceil(depth_mult * repeats))

    def _initialize_weights(self):
        """Initialize weights"""
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

def efficientnet_b0(num_classes: int = 10000):
    """EfficientNet-B0"""
    return EfficientNet(
        width_mult=1.0,
        depth_mult=1.0,
        dropout_rate=0.2,
        num_classes=num_classes
    )

def efficientnet_b1(num_classes: int = 10000):
    """EfficientNet-B1"""
    return EfficientNet(
        width_mult=1.1,
        depth_mult=1.2,
        dropout_rate=0.3,
        num_classes=num_classes
    )

def efficientnet_b2(num_classes: int = 10000):
    """EfficientNet-B2"""
    return EfficientNet(
        width_mult=1.1,
        depth_mult=1.2,
        dropout_rate=0.3,
        num_classes=num_classes
    )

def efficientnet_b3(num_classes: int = 10000):
    """EfficientNet-B3"""
    return EfficientNet(
        width_mult=1.2,
        depth_mult=1.4,
        dropout_rate=0.3,
        num_classes=num_classes
    )

In [None]:
# Test the model
model = efficientnet_b0(num_classes=10)
x = torch.randn(2, 3, 224, 224)
y = model(x)
print(f"Input shape: {x.shape}")
print(f"Output shape: {y.shape}")
print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

Input shape: torch.Size([2, 3, 224, 224])
Output shape: torch.Size([2, 10])
Total parameters: 7,155,658


### モデル学習

In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

import os
import time
import matplotlib.pyplot as plt
from tqdm import tqdm

In [5]:
class AverageMeter:
    """計算と平均値の保存"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

In [6]:
def train_one_epoch(model, train_loader, criterion, optimizer, device, epoch):
    """1エポックの学習"""
    model.train()
    losses = AverageMeter()
    accuracies = AverageMeter()

    pbar = tqdm(train_loader, desc=f"Epoch {epoch} [Train]")
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)
        batch_size = images.size(0)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        _, predicted = outputs.max(1)
        correct = predicted.eq(labels).sum().item()
        accuracy = correct / batch_size

        # Update metrics
        losses.update(loss.item(), batch_size)
        accuracies.update(accuracy, batch_size)

        # Update progress bar
        pbar.set_postfix({
            "loss": f"{losses.avg:.4f}",
            "acc": f"{accuracies.avg:.4f}"
        })

    return losses.avg, accuracies.avg

In [7]:
def validation(model, val_loader, criterion, device, epoch):
    """検証"""
    model.eval()
    losses = AverageMeter()
    accuracies = AverageMeter()

    with torch.no_grad():
        pbar = tqdm(val_loader, desc=f"Epoch {epoch} [Val]")
        for images, labels in pbar:
            images, labels = images.to(device), labels.to(device)
            batch_size = images.size(0)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Calculate accuracy
            _, predicted = outputs.max(1)
            correct = predicted.eq(labels).sum().item()
            accuracy = correct / batch_size

            # Update metrics
            losses.update(loss.item(), batch_size)
            accuracies.update(accuracy, batch_size)

            # Update progress bar
            pbar.set_postfix({
                "loss": f"{losses.avg:.4f}",
                "acc": f"{accuracies.avg:.4f}"
            })

    return losses.avg, accuracies.avg

In [8]:
def train(
        model,
        train_loader,
        val_loader,
        num_epochs,
        learning_rate,
        device,
        save_dir="checkpoints"
):
    """学習メイン関数"""
    os.mkdir(save_dir, exist_ok=True)

    # Loss and Optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Learning rate scheduler
    schedular = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs)

    # History
    history = {
        "train_loss": [],
        "train_acc": [],
        "val_loss": [],
        "val_acc": [],
    }

    best_val_acc = 0.0

    for epoch in range(1, num_epochs + 1):
        start_time = time.time()

        # Train
        train_loss, train_acc = train_one_epoch(
            model, train_loader, criterion, optimizer, device, epoch
        )

        # Validation
        val_loss, val_acc = validate(
            model, val_loader, criterion, device, epoch
        )

        # Update scheduler
        schedular.step()

        # Save history
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)

        epoch_time = time.time() - start_time

        print(f"\nEpoch {epoch}/{num_epochs} - {epoch_time:.2f}s")
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        print(f"Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}\n")

        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save({
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "val_acc": val_acc,
            }, os.path.join(save_dir, "best_model.pth"))
            print(f"Best model saved with val_acc: {val_acc:.4f}\n")

        # Save latest model
        torch.save({
            "epoch": epoch,
            "model_state_dict": model.state_dict(),
            "optimizer_state_dict": optimizer.state_dict(),
            "history": history,
        }, os.path.join(save_dir, "latest_model.pth"))

    return history

In [9]:
def plot_history(history, save_path="training_history.png"):
    """学習履歴のプロット"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))

    # Loss
    ax1.plot(history["train_loss"], label="Train Loss")
    ax1.plot(history["val_loss"], label="Val Loss")
    ax1.set_xlabel("Epoch")
    ax1.set_ylabel("Loss")
    ax1.set_title("Training and Validation Loss")
    ax1.legend()
    ax1.grid(True)

    # Accuracy
    ax2.plot(history["train_acc"], label="Train Acc")
    ax2.plot(history["val_acc"], label="Val Acc")
    ax2.set_xlabel("Epoch")
    ax2.set_ylabel("Accuracy")
    ax2.set_title("Training and Validation Accuracy")
    ax2.legend()
    ax2.grid(True)

    plt.tight_layout()
    plt.savefig(save_path, dpi=300, bbox_inches="tight")
    print(f"Training history saved to {save_path}")