<a href="https://colab.research.google.com/github/chaen243/SeSAC_colab/blob/main/03_09_ResNet_exp_ipynb%EC%9D%98_%EC%82%AC%EB%B3%B8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torch.optim as optim
from torch.utils import data
print(torch.__version__)
print(torch.cuda.is_available())

2.5.1+cu121
True


In [2]:
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import tqdm
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

np.set_printoptions(precision=3)
np.set_printoptions(suppress=True)

![](https://img1.daumcdn.net/thumb/R1280x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FKqguG%2FbtqAyyU6Dlp%2F4k2pZ9IiJeZT3lB0KBotgK%2Fimg.png)

## 1.Convolution Operation  
PyTorch에서는 [nn.sequential](https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html#torch.nn.Sequential) 등의 함수를 이용해서 복잡한 모델 구조를 종종 축약해서 사용하곤 합니다.

아래의 예제는 conv-relu-maxpool의 model을 서로 다른 방법으로 표현한 것입니다.

In [3]:
input_image = torch.rand(64, 3, 32, 32)
input_image.shape

torch.Size([64, 3, 32, 32])

### nn.Sequential

conv2d - ReLU - MaxPool2d순으로 쌓아주세요!  
`nn.Conv2d`는 `input_channel`3, `output_channel`64 `kernel_size`3 로 쌓아주세요.

Conv1,2,3은 각각 다른 스타일로 설계해주세요!
- Conv1클래스는 변수에 모델 설계
- Conv2클래스는 `nn.Sequential`로 모델 설계
- Conv3클래스는 list append로 모델 설계

- `nn.Conv2d`
  - 사용예제 `nn.Conv2d(input_channel, output_channel, kernel_size, padding)`
- `nn.ReLU()`
- `nn.MaxPool2d(kernel_size)`

In [19]:
class Conv1(nn.Module):
    def __init__(self): # input image = batch_size x 3 x 32 x 32
        super(Conv1, self).__init__()
        self.conv = nn.Conv2d(3, 64, 3, padding=1)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(2)

    def forward(self, x):
        x = self.conv(x)
        x = self.relu(x)
        x = self.maxpool(x)
        return x


class Conv2(nn.Module):
    def __init__(self): # input image = batch_size x 3 x 32 x 32
        super(Conv2, self).__init__()
        self.layer = nn.Sequential(
            nn.Conv2d(3,64,3,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2))

    def forward(self, x):
        x = self.layer(x)
        return x


class Conv3(nn.Module):
    def __init__(self): # input image = batch_size x 3 x 32 x 32
        super(Conv3, self).__init__()
        layer = []
        layer.append(nn.Conv2d(3,64,3,padding=1))
        layer.append(nn.ReLU())
        layer.append(nn.MaxPool2d(2))
        self.layer = nn.Sequential(*layer)

    def forward(self, x):
        x = self.layer(x)
        return x

model1 = Conv1()
model2 = Conv2()
model3 = Conv3()

In [16]:
print(model1)
output = model1(input_image)
print(output.size())

Conv1(
  (conv): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu): ReLU()
  (maxpool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
)
torch.Size([64, 64, 16, 16])


In [17]:
print(model2)
output = model2(input_image)
print(output.size())

Conv2(
  (layer): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
)
torch.Size([64, 64, 16, 16])


In [20]:
print(model3)
output = model3(input_image)
print(output.size())

Conv3(
  (layer): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
)
torch.Size([64, 64, 16, 16])


### Let's practice to calculate the shape of the network

![](https://d3s0tskafalll9.cloudfront.net/media/images/GC-1-P-table1.max-800x600.png)

위에 있는 그림 중 34-layer에 해당하는 resnet을 구현해놓은 예제입니다.  
빈칸에 알맞은 변수이름을 넣으시거나, 새롭게 구현하셔도 무방합니다!

In [37]:
class Conv_block(nn.Module):
    def __init__(self, input_dim, output_dim, kernel_size=3, strides=1, activation=True): # input image = batch_size x 3 x 32 x 32
        super(Conv_block, self).__init__()
        self.in_dim = input_dim
        self.out_dim = output_dim
        self.activation = activation
        self.kernel_size = kernel_size
        self.strides = strides
        self.relu = nn.ReLU()
        self.layer = nn.Sequential(
            nn.Conv2d(self.in_dim, self.out_dim, kernel_size=self.kernel_size, stride=self.strides, padding=self.kernel_size // 2, bias=False),
            nn.BatchNorm2d(self.out_dim)  # Batch Normalization
        )
        # conv와 batch만 넘깁니다.
    def forward(self, x):
        x = self.layer(x)
        if self.activation:
            x = self.relu(x)

        return x

In [38]:
class Build_resnet_block(nn.Module):
    def __init__(self, num_layer, input_dim=3, output_dim=64, block_num=0):
        super(Build_resnet_block, self).__init__()
        self.num_layer = num_layer
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.block_num = block_num
        self.relu = nn.ReLU()
        self.conv_block =  Conv_block(self.input_dim, self.output_dim, kernel_size=1, strides=2, activation=False)
        self.blocks = nn.ModuleList()

        for idx in range(self.num_layer):
            if self.block_num > 0 and idx == 0:
                self.blocks.append(Conv_block(self.input_dim, self.output_dim, kernel_size=3,strides=2))
                # self.blocks.append(Conv_block(self.output_dim, self.output_dim, kernel_size=3, stride=1))
            else:
                self.blocks.append(Conv_block(self.output_dim, self.output_dim, kernel_size=3, strides=1))
                # self.blocks.append(Conv_block(None))
                # self.blocks.append(Conv_block(None))

    def forward(self, x):
        st = 0
        for idx in range(self.num_layer):
            if self.block_num > 0 and idx == 0:
                shorcut = self.conv_block(x)
                for block in self.blocks[st:st+2]:
                    x = block(x)
                st = st+2
            else:
                shorcut = x
                for block in self.blocks[st:st+2]:
                    x = block(x)
                st = st+2
            x = x + shorcut
            x = self.relu(x)
        return x

In [39]:
class GlobalAvgPool2d(nn.Module):
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

    def forward(self, x):
        x = self.avgpool(x)
        # 배치와 채널을 유지하면서 2D 공간 차원을 제거합니다.
        x = x.view(x.size(0), -1)
        return x

In [44]:
class Build_resnet(nn.Module):
    def __init__(self, num_cnn_list=[3,4,6,3],
                 channel_list=[64,128,256,512],
                 num_classes=10,
                 activation="softmax"):
        super(Build_resnet, self).__init__()
        self.num_cnn_list = num_cnn_list
        self.channel_list = channel_list
        self.num_classes = num_classes
        self.conv_layer = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3)
        self.maxpool_layer = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.relu = nn.ReLU(inplace=True)
        self.global_avg_pool = GlobalAvgPool2d()
        self.linear = nn.Linear(512, self.num_classes)

        self.resnet_blocks = nn.ModuleList()

        assert len(self.num_cnn_list) == len(self.channel_list)

        input_dim = 64
        for block_num, (num_cnn, output_dim) in enumerate(zip(num_cnn_list, channel_list)):
            block = Build_resnet_block(num_cnn, input_dim = input_dim, output_dim= output_dim, block_num=block_num)
            self.resnet_blocks.append(block)
            input_dim = output_dim

        # Global Average Pooling 및 Fully Connected Layer
        self.global_avg_pool = GlobalAvgPool2d()
        self.linear = nn.Linear(512, self.num_classes)

    def forward(self,x):
        # first layer
        x = self.conv_layer(x)
        x = self.relu(x)
        x = self.maxpool_layer(x)



        for block in self.resnet_blocks:
            x = block(x)

        x = self.global_avg_pool(x)
        x = self.linear(x)

        return x

In [45]:
model =  Build_resnet()

In [46]:
print(model)

Build_resnet(
  (conv_layer): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (maxpool_layer): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (relu): ReLU(inplace=True)
  (global_avg_pool): GlobalAvgPool2d(
    (avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
  )
  (linear): Linear(in_features=512, out_features=10, bias=True)
  (resnet_blocks): ModuleList(
    (0): Build_resnet_block(
      (relu): ReLU()
      (conv_block): Conv_block(
        (relu): ReLU()
        (layer): Sequential(
          (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(2, 2), bias=False)
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (blocks): ModuleList(
        (0-2): 3 x Conv_block(
          (relu): ReLU()
          (layer): Sequential(
            (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, 

In [47]:
output = model(input_image)
print(output.size())

torch.Size([64, 10])


### Check Implementations

In [48]:
model.parameters()

<generator object Module.parameters at 0x7845f27607b0>

# 3. Train CIFAR-10 with own ResNet

train-image: 50,000
test-image: 10,000

class: [비행기, 자동차, 트럭, 개구리, ...] 등 10개의 class

for more info, https://www.cs.toronto.edu/~kriz/cifar.html

### Use pre-defined dataset

PyTorch는 custom dataset과 dataloader를 사용해도 되지만 cifar-10과 같은 유명 데이터셋에 대해서는 pre-defined된 dataset이 존재합니다.

이번 실습에서는 custom dataset을 직접 만드는 대신 pre-trained dataset을 불러와서 실습을 진행해보도록 하겠습니다.

In [49]:
import torchvision.datasets as datasets
import torchvision.transforms as transforms

train_dataset = datasets.CIFAR10(root='./data/',
                                 train=True,
                                 transform=transforms.ToTensor(),
                                 download=True)

test_dataset = datasets.CIFAR10(root='./data/',
                                train=False,
                                transform=transforms.ToTensor())

batch_size = 64
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:03<00:00, 49.4MB/s]


Extracting ./data/cifar-10-python.tar.gz to ./data/


In [50]:
class Trainer():
    def __init__(self, trainloader, testloader, model, optimizer, criterion, device):
        """
        trainloader: train data's loader
        testloader: test data's loader
        model: model to train
        optimizer: optimizer to update your model
        criterion: loss function
        """
        self.trainloader = trainloader
        self.testloader = testloader
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion
        self.device = device

    def train(self, epoch = 1):
        self.model.train()
        loss_list = []
        acc_list = []
        for e in range(epoch):
            running_loss, running_acc = 0.0, 0.0
            for i, data in tqdm.tqdm(enumerate(self.trainloader, 0)):
                inputs, labels = data
                # model에 input으로 tensor를 gpu-device로 보낸다
                inputs = inputs.to(self.device)
                labels = labels.to(self.device)
                # zero the parameter gradients
                self.optimizer.zero_grad()
                # forward + backward + optimize
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
                pred = outputs.max(1, keepdim=True)[1]
                running_acc += pred.eq(labels.view_as(pred)).sum().item()

            running_loss = running_loss / len(self.trainloader)
            running_acc = running_acc / len(self.trainloader.dataset)
            loss_list.append(running_loss) # epoch loss
            acc_list.append(running_acc) # epoch acc
            print('epoch: %d  loss: %.3f  acc:%.3f' % (e + 1, running_loss, running_acc))

        return loss_list, acc_list

    def test(self):
        self.model.eval()
        correct = 0
        for inputs, labels in self.testloader:
            inputs = inputs.to(self.device)
            labels = labels.to(self.device)
            output = self.model(inputs)
            pred = output.max(1, keepdim=True)[1] # get the index of the max
            correct += pred.eq(labels.view_as(pred)).sum().item()
        test_acc = correct / len(self.testloader.dataset)
        print('test_acc: %.3f' %(test_acc))

In [52]:
model =  Build_resnet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
device = torch.device('cuda')
model.to(device)

Build_resnet(
  (conv_layer): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
  (maxpool_layer): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (relu): ReLU(inplace=True)
  (global_avg_pool): GlobalAvgPool2d(
    (avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
  )
  (linear): Linear(in_features=512, out_features=10, bias=True)
  (resnet_blocks): ModuleList(
    (0): Build_resnet_block(
      (relu): ReLU()
      (conv_block): Conv_block(
        (relu): ReLU()
        (layer): Sequential(
          (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(2, 2), bias=False)
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (blocks): ModuleList(
        (0-2): 3 x Conv_block(
          (relu): ReLU()
          (layer): Sequential(
            (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, 

In [None]:
trainer = Trainer(trainloader = train_loader,
                  testloader = test_loader,
                  model = model,
                  criterion = criterion,
                  optimizer = optimizer,
                  device = device)

loss_list, acc_list = trainer.train(epoch = 14)

503it [00:16, 32.64it/s]

In [None]:
x = np.arange(10)

fig, axs = plt.subplots(1, 2)
axs[0].plot(x, loss_list)
axs[0].set_xlabel('epoch')
axs[0].set_title('loss_plot')

axs[1].plot(x, acc_list)
axs[1].set_xlabel('epoch')
axs[1].set_title('acc_plot')

plt.tight_layout()
plt.show()

In [None]:
trainer.test()