# 3-1. 프로젝트: ResNet Ablation Study

In [None]:
import tensorflow as tf
import numpy as np

print(tf.__version__)
print(np.__version__)

2.12.0
1.22.4


# 1) ResNet 기본 블록 구성하기

- ResNet-34와 ResNet-50에서 사용되는 블록의 공통점

: conv block이라고 불리는 블록 구조를 각각 3, 4, 6, 3개씩 반복해서 쌓은 형태

- ResNet-34와 ResNet-50에서 사용되는 블록의 차이점

: ResNet-34의 경우 Block은 3x3 kernel인 Convolution layer로만 구성되어있지만, ResNet-50은 1x1 Convolution이 앞뒤로 붙어 더 많은 레이어를 한 블록 내에 가지고 있음

# 2) ResNet-34, ResNet-50 Complete Model

[코드 작성]
VGG와 같이 블록을 만드는 함수를 사용해서 직접 전체 모델을 만들어 봅시다. ResNet-34와 ResNet-50의 차이에 따라 달라지는 구성(configuration)을 함수에 전달해서 같은 생성 함수 build_resnet()를 통해서 ResNet의 여러 가지 버전들을 모두 만들어 낼 수 있도록 해야 합니다.

얕은 모델의 경우 BasicBlock을 사용함

깊은 모델의 경우 BottleneckBlock을 사용함

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

import matplotlib.pyplot as plt

import copy
from collections import namedtuple
import os
import random
import time

import cv2
from torch.utils.data import DataLoader, Dataset
from PIL import Image

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class ImageTransform() :
    def __init__(self, resize, mean, std) :
        self.data_transform = {
            'train' : transforms.Compose([
                transforms.RandomResizedCrop(resize, scale=(0.5, 1.0)),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize(mean, std)
            ]),
            'val' : transforms.Compose([
                transforms.Resize(256),
                transforms.CenterCrop(resize),
                transforms.ToTensor(),
                transforms.Normalize(mean, std)
            ])
        }

    def __call__(self, img, phase) :
        return self.data_transform[phase](img)

In [None]:
# define variables
size = 224
mean = (0.485, 0.456, 0.456)
std = (0.229, 0.224, 0.225)
batch_size = 32

In [None]:
#이미지 레이블 구분

class DogvsCatDataset(Dataset) :
    def __init__(self, file_list, transform=None, phase='train') :
        self.file_list = file_list
        self.transform = transform
        self.phase = phase

    def __len__(self) :
        return len(self.file_list)


    def __getitem__(self, idx) :
        img_path = self.file_list[idx]
        img = Image.open(img_path)
        img_transformed = self.transform(img, self.phase)

        label = img_path.split('/')[-1].split('.')[0]
        if label == 'dog' :
            label = 1

        elif label == 'cat' :
            label = 0

        return img_transformed, label

In [None]:
#이미지 데이터셋 정의


cat_images_filepaths = ["/content/sample_data/dogs-vs-cats/train"]
dog_images_filepaths = ["/content/sample_data/dogs-vs-cats/train"]

image_filepaths = [*cat_images_filepaths, *dog_images_filepaths]
correct_images_filepaths = [i for i in image_filepaths if cv2.imread(i) is not None]
train_images_filepaths = correct_images_filepaths[:400]
val_images_filepaths = correct_images_filepaths[400:-10]
test_images_filepaths = correct_images_filepaths[-10:]

train_dataset = DogvsCatDataset(train_images_filepaths, transform=ImageTransform(size, mean, std),
                                 phase='train')
val_dataset = DogvsCatDataset(val_images_filepaths, transform=ImageTransform(size, mean, std),
                               phase='val')
index = 0


print(train_dataset.__getitem__(index)[0].size())
print(train_dataset.__getitem__(index)[1])

IndexError: ignored

In [None]:
# load data to memory
train_iterator = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_iterator = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
dataloader_dict = {'train' : train_iterator, 'val' : valid_iterator}

batch_iterator = iter(train_iterator)
inputs, label = next(batch_iterator)
print(inputs.size())
print(label)

NameError: ignored

In [None]:
class BasicBlock(nn.Module) :
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=False) :
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
                              stride=stirde, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                              stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)

        if downsample : # 다운샘플이 적용되는 부분(출력 데이터크기가 다를 경우 사용)
            conv = nn.Conv2d(in_channels, out_channels, kernel_size=1,
                            stride=stride, bias=False)

            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)

        else :
            downsample = None
        self.downsample = downsample

    def forward(self, x) :
        i = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)

        if self.downsample is not None :
            i = self.downsample(i)

        x += i # identity mapping
        x = self.relu(x)

        return x


In [None]:
class Bottleneck(nn.Module) :
    expansion = 4 # 병목 블록을 정의하기 위한 하이퍼파라미터

    def __init__(self, in_channels, out_channels, stride=1, downsample=False) :
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1,
                              stride=1, bias=False) # 1x1 합성곱층
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                              stride=stride, padding=1, bias=False)

        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
                              stride=stride, padding=1, bias=False)
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels,
                               kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*out_channels)
        self.relu = nn.ReLU(inplace=True)

        if downsample :
            conv = nn.Conv2d(in_channels, self.expansion*out_channels, kernel_size=1,
                            stride=stride, bias=False)
            bn = nn.BatchNorm2d(self.expansion*out_channels)
            downsample = nn.Sequential(conv, bn)
        else :
            downsample = None

        self.downsample = downsample

    def forward(self, x) :
        i = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.downsample is not None :
            i = self.downsample(i)

        x += i
        x = self.relu(x)

        return x

In [None]:
class ResNet(nn.Module) :
   def __init__(self, config, output_dim, zero_init_residual=False) :
       super().__init__()

       block, n_blocks, channels = config
       self.in_channels = channels[0]
       assert len(n_blocks) == len(channels) == 4

       self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=7, stride=2,
                             padding=3, bias=False)
       self.bn1 = nn.BatchNorm2d(self.in_channels)
       self.relu = nn.ReLU(inplace=True)
       self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

       self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
       self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride=2)
       self.layer3 = self.get_resnet_layer(block, n_blocks[1], channels[2], stride=2)
       self.layer4 = self.get_resnet_layer(block, n_blocks[1], channels[3], stride=2)

       self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
       self.fc = nn.Linear(self.in_channels, output_dim)


       if zero_init_residual :
           for m in self.modules() :
               if isinstance(m, BottleNeck) :
                   nn.init.constant_(m.bn3.weight, 0)
               elif isinstance(m, BasicBlock) :
                   nn.init.constant_(m.bn2.weight, 0)

   def get_resnet_layer(self, block, n_blocks, channels, stride=1) :
       layers = []
       if self.in_channels != block.expansion * channels :
           downsample = True
       else :
           downsample = False

       layers.append(block(self.in_channels, channels, stride, downsample))

       for i in range(1, n_blocks) :
           layers.append(block(block.expansion*channels, channels))

       self.in_channels = block.expansion * channels
       return nn.Sequential(*layers)


   def forward(self, x) :
       x = self.conv1(x)
       x = self.bn1(x)
       x = self.relu(x)
       x = self.maxpool(x)
       x = self.layer1(x)
       x = self.layer2(x)
       x = self.layer3(x)
       x = self.layer4(x)
       x = self.avgpool(x)
       h = x.view(x.shape[0], -1)
       x = self.fc(h)
       return x, h

```python
ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])

In [None]:
#restnet config def

resnet34_config = ResNetConfig(block=BasicBlock,
                             n_blocks=[3, 4, 6, 3,],
                             channels=[64, 128, 256, 512])

resnet50_config = ResNetConfig(block=Bottleneck,
                             n_blocks=[3, 4, 6, 3],
                             channels=[64,128,256,512])

In [None]:
output_dim = 2
model = ResNet(resnet50_config, output_dim)
print(model)

In [None]:
#resnet 50

# 손실함수 정의
optimizer = optim.Adam(model.parameters(), lr=1e-7)
criterion = nn.CrossEntropyLoss()

model = model.to(device)
criterion = criterion.to(device)

In [None]:
def calculate_topk_accuracy(y_pred, y, k=2) :
    with torch.no_grad() :
        batch_size = y.shape[0]
        _, top_pred = y_pred.topk(k, 1)
        top_pred = top_pred.t()
        correct = top_pred.eq(y.view(1, -1).expand_as(top_pred))
        correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim=True)
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
        acc_1 = correct_1 / batch_size
        acc_k = correct_k / batch_size

    return acc_1, acc_k

In [None]:
def train(model, iterator, optimizer, criterion, device) :
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    model.train()
    for (x, y) in iterator :
        x = x.to(device)
        y = y.to(device)

        optimizer.zero_grad()
        y_pred = model(x)
        loss = criterion(y_pred[0], y)

        acc_1, acc_5 = calculate_topk_accuracy(y_pred[0], y)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc_1 += acc_1.item()
        epoch_acc_5 += acc_5.item()

    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)

    return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
def evaluate(model, iterator, criterion, device) :
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0

    model.eval()
    with torch.no_grad() :
        for (x, y) in iterator :
            x = x.to(device)
            y = y.to(device)
            y_pred = model(x)
            loss = criterion(y_pred[0], y)

            acc_1, acc_5 = calculate_topk_accuracy(y_pred[0], y)
            epoch_loss += loss.item()
            epoch_acc_1 += acc_1.item()
            epoch_acc_5 += acc_5.item()

        epoch_loss /=len(iterator)
        epoch_acc_1 /= len(iterator)
        epoch_acc_5 /= len(iterator)

        return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
def epoch_time(start_time, end_time) :
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
#model 학습

best_valid_loss = float('inf')
epochs = 10

for epoch in range(epochs) :
    start_time = time.monotonic()

    train_loss, train_acc_1, train_acc_5 = train(model, train_iterator, optimizer,
                                                criterion, device)
    valid_loss, valid_acc_1, valid_acc_5 = evaluate(model, valid_iterator, criterion,
                                                   device)

    if valid_loss < best_valid_loss :
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), '../080289-main/chap06/data/ResNet-model.pt')

    end_time = time.monotonic()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    print(f'Epoch : {epoch+1:02} | Epoch Time : {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss : {train_loss:.3f} | Train Acc @1 : {train_acc_1*100:6.2f}% | Train Acc @5 : {train_acc_5*100:6.2f}%')
    print(f'\tValid Loss : {valid_loss:.3f} | Valid Acc @1 : {valid_acc_1*100:6.2f}% | Valid Acc @5 : {valid_acc_5*100:6.2f}%')

In [None]:
#테스트 데이터셋으로 모델 예측

import pandas as pd
id_list = []
pred_list = []
_id = 0

with torch.no_grad() :
    for test_path in test_images_filepaths :
        img = Image.open(test_path)
        _id = test_path.split('/')[-1].split('.')[1]
        transform = ImageTransform(size, mean, std)
        img = transform(img, phase='val')
        img = img.unsqueeze(0)
        img = img.to(device)

        model.eval()
        outputs = model(img)
        preds = F.softmax(outputs[0], dim=1)[:, 1].tolist()
        id_list.append(_id)
        pred_list.append(preds[0])

res = pd.DataFrame({
        'id' : id_list,
        'label' : pred_list
})

res.sort_values(by='id', inplace=True)
res.reset_index(drop=True, inplace=True)

res.to_csv('../080289-main/chap06/data/ResNet.csv', index=False)
res.head(10)

In [None]:
#예측결과 시각화 하여 확인
import pandas as pd
id_list = []
pred_list = []
_id = 0

with torch.no_grad() :
    for test_path in test_images_filepaths :
        img = Image.open(test_path)
        _id = test_path.split('/')[-1].split('.')[1]
        transform = ImageTransform(size, mean, std)
        img = transform(img, phase='val')
        img = img.unsqueeze(0)
        img = img.to(device)

        model.eval()
        outputs = model(img)
        preds = F.softmax(outputs[0], dim=1)[:, 1].tolist()
        id_list.append(_id)
        pred_list.append(preds[0])

res = pd.DataFrame({
        'id' : id_list,
        'label' : pred_list
})

res.sort_values(by='id', inplace=True)
res.reset_index(drop=True, inplace=True)

res.to_csv('../080289-main/chap06/data/ResNet.csv', index=False)
res.head(10)
class_ = classes = {0 : 'cat', 1 : 'dog'}

def display_image_grid(images_filepaths, predicted_labels=(), cols=5) :
    rows = len(images_filepaths) // cols
    fig , ax = plt.subplots(nrows=rows, ncols=cols, figsize=(12, 6))
    for i, image_filepath in enumerate(images_filepaths) :
        image = cv2.imread(image_filepath)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        a = random.choice(res['id'].values)
        label = res.loc[res['id'] == a, 'label'].values[0]

        if label > 0.5 :
            label = 1
        else :
            label = 0

        ax.ravel()[i].imshow(image)
        ax.ravel()[i].set_title(class_[label])
        ax.ravel()[i].set_axis_off()
    plt.tight_layout()
    plt.show()

display_image_grid(test_images_fileapths)