#### 必要なライブラリのインポート

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
from torchinfo import summary
from torchviz import make_dot
import japanize_matplotlib
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import torchvision.datasets as datasets
from tqdm.notebook import tqdm


#### 共通関数のインポート
著者が学習用、予測用、学習用ログ解析用、図示用の関数を用意してくださっているのでそちらを利用する

In [2]:
!git clone https://github.com/makaishi2/pythonlibs.git

from pythonlibs.torch_lib1 import *

fatal: destination path 'pythonlibs' already exists and is not an empty directory.


#### GPUの確認

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


#### データの準備

In [4]:
import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from PIL import Image
from io import BytesIO

# Define a custom transform to upscale images
class UpscaleTransform:
    def __init__(self, scale_factor):
        self.scale_factor = scale_factor

    def __call__(self, img):
        if isinstance(img, torch.Tensor):
            # If img is a tensor, convert it to PIL image
            img = transforms.ToPILImage()(img)

        # Convert PIL image to tensor
        img_tensor = transforms.ToTensor()(img).unsqueeze(0)
        
        # Calculate the new size
        new_size = [int(d * self.scale_factor) for d in img_tensor.shape[2:]]
        
        # Upsample the image
        img_tensor = F.interpolate(img_tensor, size=new_size, mode='bilinear', align_corners=False)
        
        # Convert tensor back to PIL image
        img = transforms.ToPILImage()(img_tensor.squeeze(0))
        return img


# Define transformations including upscaling

scale_factor=1

transform_train = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.RandomHorizontalFlip(p=0.5),  # Randomly flip the image horizontally
    transforms.RandomRotation(degrees=15),   # Randomly rotate the image
    #transforms.Lambda(lambda x: UpscaleTransform(scale_factor)(x)),  # Upscale the image
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),  # Normalize the image
])
    #transforms.RandomErasing(p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3), value=0, inplace=False)  # Randomly erase part of the image

transform = transforms.Compose([
    transforms.Resize((512, 512)),
    transforms.ToTensor(),  # Convert image to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),  # Normalize the image
])

# Define datasets
train_dataset = datasets.ImageFolder(root='/usr/src/tiny-imagenet-200/train', transform=transform_train)
test_dataset = datasets.ImageFolder(root='/usr/src/tiny-imagenet-200/test', transform=transform)
val_dataset = datasets.ImageFolder(root='/usr/src/tiny-imagenet-200/val_reorganized', transform=transform)

# Define data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

print(f"Train loader has {len(train_loader.dataset)} images.")
print(f"Test loader has {len(test_loader.dataset)} images.")
print(f"Validation loader has {len(val_loader.dataset)} images.")


Train loader has 100000 images.
Test loader has 10000 images.
Validation loader has 10000 images.


#### Data Augmentation

#### CNNドロップアウト

In [12]:
class CNN_v3(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=(1,1))  # 64x64x3 → 64x64x32
        self.conv2 = nn.Conv2d(32, 32, 3, padding=(1,1))  # 64x64x32 → 64x64x32
        self.conv3 = nn.Conv2d(32, 64, 3, padding=(1,1))  # 32x32x32 → 32x32x64
        self.conv4 = nn.Conv2d(64, 64, 3, padding=(1,1))  # 32x32x64 → 32x32x64
        self.conv5 = nn.Conv2d(64, 128, 3, padding=(1,1)) # 16x16x64 → 16x16x128
        self.conv6 = nn.Conv2d(128, 128, 3, padding=(1,1)) # 16x16x128 → 16x16x128
        self.relu = nn.ReLU(inplace=True)
        self.flatten = nn.Flatten()
        self.maxpool = nn.MaxPool2d((2,2))
        self.l1 = nn.Linear(524288, 128)  # 8x8x128 → 8192
        self.l2 = nn.Linear(128, 200)  # 128 → 200
        self.dropout1 = nn.Dropout(0.2)
        self.dropout2 = nn.Dropout(0.3)
        self.dropout3 = nn.Dropout(0.4)

        self.features = nn.Sequential(
            self.conv1,
            self.relu,
            self.conv2,
            self.relu,
            self.maxpool,
            self.dropout1,
            self.conv3,
            self.relu,
            self.conv4,
            self.relu,
            self.maxpool,
            self.dropout2,
            self.conv5,
            self.relu,
            self.conv6,
            self.relu,
            self.maxpool,
            self.dropout3,
            )

        self.classifier = nn.Sequential(
            self.l1,
            self.relu,
            self.dropout3,
            self.l2
        )

    def forward(self, x):
        x1 = self.features(x)
        x2 = self.flatten(x1)
        x3 = self.classifier(x2)
        return x3
    
    def save_checkpoint(cls, epoch, model, optimizer, history, path='checkpoint.pth'):
        state = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'history': history
        }
        torch.save(state, path)

    @classmethod
    def load_checkpoint(cls, path='checkpoint.pth'):
        if os.path.isfile(path):
            checkpoint = torch.load(path)
            return checkpoint['epoch'], checkpoint['model_state_dict'], checkpoint['optimizer_state_dict'], checkpoint['history']
        else:
            print("No checkpoint found.")
            return 0, None, None, None

In [10]:
import numpy as np
import torch
import random

# 乱数の固定化
def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

seed_everything()

# モデルインスタンス生成
net = CNN_v3(200).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

# 訓練と検証
num_epochs = 25
history15 = np.zeros((0, 5))
history15 = fit(net, optimizer, criterion, num_epochs, train_loader, val_loader, device, history15)

# 評価履歴の表示
evaluate_history(history15)

# チェックポイントの保存
net.save_checkpoint(num_epochs, net, optimizer, history15)

NameError: name 'CNN_v3' is not defined

# Resnet構造

In [7]:
import torch
import torch.nn as nn
import os

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += identity
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=200):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


    def save_checkpoint(cls, epoch, model, optimizer, history, path='checkpoint.pth'):
        state = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'history': history
        }
        torch.save(state, path)

    @classmethod
    def load_checkpoint(cls, path='checkpoint.pth'):
        if os.path.isfile(path):
            checkpoint = torch.load(path)
            return checkpoint['epoch'], checkpoint['model_state_dict'], checkpoint['optimizer_state_dict'], checkpoint['history']
        else:
            print("No checkpoint found.")
            return 0, None, None, None

# Usage
model = ResNet(ResidualBlock, [2, 2, 2, 2], num_classes=200)


In [8]:
import numpy as np
import torch
import random

def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

import torch.optim as optim
import numpy as np

# 乱数の固定化
seed_everything()

# モデルインスタンス生成
net = ResNet(ResidualBlock, [2, 2, 2, 2], num_classes=200).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

# 訓練と検証
num_epochs = 25
history20 = np.zeros((0, 5))
history20 = fit(net, optimizer, criterion, num_epochs, train_loader, val_loader, device, history20)

# 評価履歴の表示
evaluate_history(history20)

# チェックポイントの保存
net.save_checkpoint(num_epochs, net, optimizer, history20)


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [1/25], loss: 4.73991 acc: 0.05278 val_loss: 4.25643, val_acc: 0.09930


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [2/25], loss: 3.91467 acc: 0.14915 val_loss: 3.64388, val_acc: 0.19090


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [3/25], loss: 3.34585 acc: 0.24164 val_loss: 3.08840, val_acc: 0.28640


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [4/25], loss: 2.93284 acc: 0.31665 val_loss: 2.76502, val_acc: 0.34390


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [5/25], loss: 2.63536 acc: 0.37439 val_loss: 2.53110, val_acc: 0.39620


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [6/25], loss: 2.39669 acc: 0.42490 val_loss: 2.39726, val_acc: 0.43020


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [7/25], loss: 2.21082 acc: 0.46185 val_loss: 2.27808, val_acc: 0.45490


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [8/25], loss: 2.04840 acc: 0.49800 val_loss: 2.15534, val_acc: 0.48750


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [9/25], loss: 1.90029 acc: 0.52758 val_loss: 2.10874, val_acc: 0.48880


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [10/25], loss: 1.76726 acc: 0.55655 val_loss: 2.08324, val_acc: 0.50360


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [11/25], loss: 1.64902 acc: 0.58248 val_loss: 2.01228, val_acc: 0.51780


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [12/25], loss: 1.53506 acc: 0.60690 val_loss: 1.99645, val_acc: 0.52820


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [13/25], loss: 1.42458 acc: 0.63302 val_loss: 1.97340, val_acc: 0.53420


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [14/25], loss: 1.32354 acc: 0.65394 val_loss: 1.98338, val_acc: 0.53650


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [15/25], loss: 1.22626 acc: 0.67598 val_loss: 2.03127, val_acc: 0.53150


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [16/25], loss: 1.13516 acc: 0.69752 val_loss: 2.00921, val_acc: 0.54260


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [17/25], loss: 1.04617 acc: 0.71678 val_loss: 2.10802, val_acc: 0.53650


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [18/25], loss: 0.96103 acc: 0.73777 val_loss: 2.10014, val_acc: 0.54130


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [19/25], loss: 0.88165 acc: 0.75633 val_loss: 2.15600, val_acc: 0.53710


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [20/25], loss: 0.81057 acc: 0.77258 val_loss: 2.14120, val_acc: 0.54610


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [21/25], loss: 0.73810 acc: 0.79275 val_loss: 2.23480, val_acc: 0.53630


  0%|          | 0/3125 [00:00<?, ?it/s]

KeyboardInterrupt: 

過学習を防ぐため、dropoutを増加する

In [5]:
import torch
import torch.nn as nn
import os

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None, dropout_rate=0.5):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.dropout = nn.Dropout(dropout_rate)  # Dropout layer
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.dropout(out)  # Apply dropout
        out += identity
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=200, dropout_rate=0.5):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.dropout = nn.Dropout(dropout_rate) 
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0], dropout_rate=dropout_rate)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dropout_rate=dropout_rate)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dropout_rate=dropout_rate)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dropout_rate=dropout_rate)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1, dropout_rate=0.5):
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample, dropout_rate))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels, dropout_rate=dropout_rate))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def save_checkpoint(cls, epoch, model, optimizer, history, path='checkpoint.pth'):
        state = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'history': history
        }
        torch.save(state, path)

   
    def load_checkpoint(cls, path='checkpoint.pth'):
        if os.path.isfile(path):
            checkpoint = torch.load(path)
            return checkpoint['epoch'], checkpoint['model_state_dict'], checkpoint['optimizer_state_dict'], checkpoint['history']
        else:
            print("No checkpoint found.")
            return 0, None, None, None

# Usage
model = ResNet(ResidualBlock, [2, 2, 2, 2], num_classes=200)


In [6]:
import numpy as np
import torch
import random

def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

import torch.optim as optim
import numpy as np

# 乱数の固定化
seed_everything()

# モデルインスタンス生成
net = ResNet(ResidualBlock, [2, 2, 2, 2], num_classes=200).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

# 訓練と検証
num_epochs = 30
history21 = np.zeros((0, 5))
history21 = fit(net, optimizer, criterion, num_epochs, train_loader, val_loader, device, history21)

# 評価履歴の表示
evaluate_history(history21)

# チェックポイントの保存
net.save_checkpoint(num_epochs, net, optimizer, history21)


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [1/30], loss: 4.75682 acc: 0.04882 val_loss: 4.21070, val_acc: 0.10190


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [2/30], loss: 3.93999 acc: 0.14187 val_loss: 3.55286, val_acc: 0.20060


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [3/30], loss: 3.35984 acc: 0.23562 val_loss: 3.32755, val_acc: 0.25080


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [4/30], loss: 2.97883 acc: 0.30624 val_loss: 2.90603, val_acc: 0.31850


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [5/30], loss: 2.71863 acc: 0.35625 val_loss: 2.80275, val_acc: 0.35340


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [6/30], loss: 2.51009 acc: 0.39909 val_loss: 2.59025, val_acc: 0.39320


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [7/30], loss: 2.34144 acc: 0.43496 val_loss: 2.47305, val_acc: 0.41540


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [8/30], loss: 2.20269 acc: 0.46367 val_loss: 2.35882, val_acc: 0.44610


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [9/30], loss: 2.06733 acc: 0.49154 val_loss: 2.35531, val_acc: 0.44580


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [10/30], loss: 1.95469 acc: 0.51527 val_loss: 2.39986, val_acc: 0.43990


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [11/30], loss: 1.85797 acc: 0.53552 val_loss: 2.28724, val_acc: 0.46850


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [12/30], loss: 1.76945 acc: 0.55435 val_loss: 2.29994, val_acc: 0.47160


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [13/30], loss: 1.68159 acc: 0.57336 val_loss: 2.17209, val_acc: 0.49720


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [14/30], loss: 1.60527 acc: 0.59187 val_loss: 2.11769, val_acc: 0.51130


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [15/30], loss: 1.52790 acc: 0.60599 val_loss: 2.19062, val_acc: 0.49680


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [16/30], loss: 1.45628 acc: 0.62350 val_loss: 2.17207, val_acc: 0.50650


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [17/30], loss: 1.39085 acc: 0.63803 val_loss: 2.20037, val_acc: 0.51330


  0%|          | 0/3125 [00:00<?, ?it/s]

Epoch [18/30], loss: 1.32919 acc: 0.64937 val_loss: 2.12095, val_acc: 0.52200


  0%|          | 0/3125 [00:00<?, ?it/s]