In [None]:
# import module
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T

from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
from torchvision.datasets import CIFAR100
from torchmetrics.aggregation import MeanMetric
from torchmetrics.functional.classification import accuracy

In [None]:
# Build config
title = 'resnet101_finetune'
device = 'cuda'
root = 'data'
batch_size = 128
num_workers = 8
lr = 0.01 # adam optimizer : 0.001이 SGD : 0.1과 비슷한 수준
weight_decay = 1e-8
label_smoothing = 0.05
epochs = 20
log_dir = 'logs'
checkpoint_dir = 'checkpoints'
pretrained_model = 'resnet101-cd907fc2.pth' # pytorch 공식 사이트에서 다운받을 수 있다

In [None]:
# Build dataset
train_transform = T.Compose([
    T.RandomCrop(size=(32, 32), padding=4),
    T.RandomHorizontalFlip(),
    T.TrivialAugmentWide(), # 다양한 augmentation을 확률적으로 적용
    T.ToTensor(),
    T.RandomErasing(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
train_data = CIFAR100(root, train=True, download=True, transform=train_transform)
train_loader = DataLoader(train_data, batch_size, shuffle=True, num_workers=num_workers, drop_last=True)

val_transform = T.Compose([
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
val_data = CIFAR100(root, train=False, download=True, transform=val_transform)
val_loader = DataLoader(val_data, batch_size=batch_size, num_workers=num_workers)

In [None]:
# ResNet building block
class Block(nn.Module):
    def __init__(self, in_dim, dim, stride=1, expansion=4):
        super().__init__()
        out_dim = dim * expansion
        self.conv1 = nn.Conv2d(in_dim, dim, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(dim)
        self.conv2 = nn.Conv2d(dim, dim, 3, stride, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(dim)
        self.conv3 = nn.Conv2d(dim, out_dim, 1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_dim)
        self.relu = nn.ReLU()
        if stride != 1 or in_dim != out_dim: # 입력 행렬의 크기와 out 행렬의 크기를 맞춰주기 위함
            self.downsample = nn.Sequential(
                nn.Conv2d(dim, out_dim, 1, stride, bias=False),
                nn.BatchNorm2d(out_dim),
            )
        else:
            self.downsample = nn.Identity()

    def forward(self, x):
        identity = self.downsample(x)
        out = self.relu(self.bn1(self.conv1(x))) # conv -> batch_normalization -> relu
        out = self.relu(self.bn2(self.conv2(out))) # conv -> batch_normalization -> relu
        out = self.bn3(self.conv3(out)) # conv -> batch_normalization
        out = out + identity # out + identity -> relu. 입력 행렬의 크기와 out 행렬의 크기가 같지 않으면 덧셈 연산 불가능
        out = self.relu(out)
        return out

In [None]:
class ResNet(nn.Module): # ResNet Network 101 구조를 구현
    def __init__(self, depths):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=1, padding=3, bias=False)
        # stride : 기존 resnet 이미지 크기(224 x 224)일때는 stride 2를 해서 이미지 크기를 줄였는데, cifar-10 이미지 크기(32 x 32)는 stride 2를하면 이미지 텐서가 남아나질 않아서 1로 수정
        # padding : kernel size가 7이므로 padding을 3으로 해야 이미지 손실이 없음
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(64, 64, depths[0], stride=1) # depths[0] = 3
        self.layer2 = self._make_layer(64 * 4, 128, depths[1], stride=2) # depths[1] = 4
        self.layer3 = self._make_layer(128 * 4, 256, depths[2], stride=1) # depths[2] = 23
        self.layer4 = self._make_layer(256 * 4, 512, depths[3], stride=2) # depths[3] = 3
        self.apply(self.init_weights)

    def _make_layer(self, in_dim, dim, depth, stride=1): # ♣ 그래서 모델 구조가 어떻게 되는거지???
        layers = [Block(in_dim, dim, stride)]
        layers.extend([Block(dim * 4, dim, 1) for _ in range(1, depth)])
        # stride : stride는 위에서 한번만 적용되고, 나머지는 1로 두어야 이미지가 계속 작아지는 경우가 발생하지 않는다.
        # in_dim : _make_layer함수 실행 전의 데이터 차원
        # dim * 4 : out의 차원
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.maxpool(self.relu(self.bn1(self.conv1(x)))) # conv -> batch_normalization -> relu -> maxpool
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = x.mean([-1, -2]) # Pytorch의 텐서 순서 : Batch x Channel x Height x Weight. 그러므로 -1 : Weight, -2 : Height
        return x
    
    def init_weights(self, m):
        if isinstance(m, nn.Conv2d):
            nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
        elif isinstance(m, nn.BatchNorm2d):
            nn.init.constant_(m.weight, 1)
            nn.init.constant_(m.bias, 0)

In [None]:
class ResNet101(nn.Module):
    def __init__(self):
        super().__init__()
        self.features = ResNet(depths=[3, 4, 23, 3])
        self.head = nn.Linear(2048, 100) # conv 마지막 차원 -> 분류할 label 개수
    
    def forward(self, x):
        x = self.features(x)
        x = self.head(x)
        return x

model = ResNet101()
state_dict = torch.load(pretrained_model)
model.features.load_state_dict(state_dict, strict=False)
model = model.to(device)

In [None]:
# Build optimizer 
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay, nesterov=True)

# Build scheduler
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, epochs * len(train_loader))

# Build loss function
loss_fn = nn.CrossEntropyLoss(label_smoothing=label_smoothing)

# Build metric function
metric_fn = accuracy

# Build logger
train_logger = SummaryWriter(f'{log_dir}/train/{title}')
val_logger = SummaryWriter(f'{log_dir}/val/{title}')

In [None]:
# Define training loop 
def train(loader, model, optimizer, scheduler, loss_fn, metric_fn, device):
    model.train()
    loss_mean = MeanMetric()
    metric_mean = MeanMetric()
    
    for inputs, targets in loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        metric = metric_fn(outputs, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        loss_mean.update(loss.to('cpu'))
        metric_mean.update(metric.to('cpu'))

        scheduler.step()

    summary = {'loss': loss_mean.compute(), 'metric': metric_mean.compute()}

    return summary

In [None]:
# Define evaluation loop 
def evaluate(loader, model, loss_fn, metric_fn, device):
    model.eval()
    loss_mean = MeanMetric()
    metric_mean = MeanMetric()
    
    for inputs, targets in loader:
        inputs = inputs.to(device)
        targets = targets.to(device)

        with torch.no_grad():
            outputs = model(inputs)
        loss = loss_fn(outputs, targets)
        metric = metric_fn(outputs, targets)

        loss_mean.update(loss.to('cpu'))
        metric_mean.update(metric.to('cpu'))
    
    summary = {'loss': loss_mean.compute(), 'metric': metric_mean.compute()}

    return summary

In [None]:
# Main loop
for epoch in range(epochs):
    # train one epoch
    train_summary = train(train_loader, model, optimizer, scheduler, loss_fn, metric_fn, device)
    
    # evaluate one epoch
    val_summary = evaluate(val_loader, model, loss_fn, metric_fn, device)

    # write log
    train_logger.add_scalar('Loss', train_summary['loss'], epoch + 1)
    train_logger.add_scalar('Accuracy', train_summary['metric'], epoch + 1)
    val_logger.add_scalar('Loss', val_summary['loss'], epoch + 1)
    val_logger.add_scalar('Accuracy', val_summary['metric'], epoch + 1)
    
    # save model
    state_dict = {
        'epoch': epoch + 1,
        'model': model.state_dict(),
        'optimizer': optimizer.state_dict(),
    }
    checkpoint_path = f'{checkpoint_dir}/{title}_last.pth'
    torch.save(state_dict, checkpoint_path)
        
train_logger.close()
val_logger.close()