## 파이토치 ResNet 구현

In [None]:
#라이브러리 호출

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torchvision
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

import matplotlib.pyplot as plt
import numpy as np

import copy
from collections import namedtuple #'nametuple': 파이썬의 자료형 중 하나, 인덱스값 뿐 아니라 키 값으로 데이터에 접근할 수 있게 함
import os
import random
import time

import cv2
from torch.utils.data import DataLoader, Dataset
from PIL import Image

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
#이미지 데이터 전처리

class ImageTransform():
  def __init__(self, resize, mean, std):
    self.data_transform = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(resize, scale=(0.5, 1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ]), #훈련 이미지 데이터에 대한 전처리
        'val': transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(resize),
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ]) #검증 이미지 데이터에 대한 전처리
    }
  def __call__(self, img, phase='train'):
    return self.data_transform[phase](img)

In [None]:
#데이터 전처리에 사용되는 변수 값 정의

size = 224
mean = (0.485, 0.456, 0.406)
std = (0.229, 0.224, 0.225)
batch_size = 32

In [None]:
!unzip -q dogs-vs-cats.zip -d dogs-vs-cats

In [None]:
#훈련/테스트셋 불러오기

cat_directory = r'/content/dogs-vs-cats/Cat'
dog_directory = r'/content/dogs-vs-cats/Dog'

cat_images_filepaths = sorted([os.path.join(cat_directory, f) for f in os.listdir(cat_directory)])
dog_images_filepaths = sorted([os.path.join(dog_directory, f) for f in os.listdir(dog_directory)])

images_filepaths = [*cat_images_filepaths, *dog_images_filepaths]
correct_images_filepaths = [i for i in images_filepaths if cv2.imread(i) is not None]

In [None]:
#데이터셋 분리
random.seed(42)
random.shuffle(correct_images_filepaths)
train_images_filepaths = correct_images_filepaths[:400]
val_images_filpaths = correct_images_filepaths[400:-10]
test_images_filepaths = correct_images_filepaths[-10:]
print(len(train_images_filepaths), len(val_images_filpaths), len(test_images_filepaths))

400 92 10


In [None]:
#이미지 레이블 구분(가져온 데이터가 개이면 '1', 고양이이면 '0')

class DogvsCatDataset(Dataset):
  def __init__(self, file_list, transform=None, phase='train'):
    self.file_list = file_list
    self.transform = transform
    self.phase = phase
  def __len__(self):
    return len(self.file_list)
  def __getitem__(self, idx):
    img_path = self.file_list[idx]
    img = Image.open(img_path)
    img_transformed = self.transform(img, self.phase)

    label = img_path.split('/')[-1].split('.')[0]
    if label == 'dog':
      label = 1
    elif label == 'cat':
      label = 0
    return img_transformed, label


In [None]:
#이미지 데이터셋 정의

train_dataset = DogvsCatDataset(file_list=train_images_filepaths, transform=ImageTransform(size, mean, std), phase='train')
val_dataset = DogvsCatDataset(file_list=val_images_filpaths, transform=ImageTransform(size, mean, std), phase='val')

index = 0
print(train_dataset.__getitem__(index)[0].size())
print(train_dataset.__getitem__(index)[1])

torch.Size([3, 224, 224])
0


In [None]:
#데이터셋의 데이터를 메모리로 불러오기

train_iterator = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_iterator = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
dataloader_dict = {'train': train_iterator, 'val': val_iterator}

batch_iterator = iter(train_iterator)
inputs, label = next(batch_iterator)
print(inputs.size())
print(label)

torch.Size([32, 3, 224, 224])
tensor([1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 0,
        1, 0, 1, 1, 1, 1, 1, 1])


#### ResNet 네트워크 구성

In [None]:
#기본 블록 (3x3 합성곱 2개로 구성)

class BasicBlock(nn.Module):
  expansion = 1

  def __init__(self, in_channels, out_channels, stride=1, downsample=False):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.relu = nn.ReLU(inplace=True)

    if downsample: #다운샘플링 - 입력 데이터와 출력 데이터 크기가 다른 경우
      conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False)
      bn = nn.BatchNorm2d(out_channels)
      downsample = nn.Sequential(conv, bn)
    else:
      downsample = None
    self.downsample = downsample

  def forward(self, x):
    i = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn2(x)

    if self.downsample is not None :
      i = self.downsample(i)

    x += I #identity mapping - 특정 층의 출력 결과를 다음 합성곱층의 출력 결과에 더함
    x = self.relu(x)
    return x

In [None]:
#병목 블록 (1x1->3x3->1x3 합성곱층)

class Bottleneck(nn.Module):
  expansion = 4

  def __init__(self, in_channels, out_channels, stride=1, downsample=False):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False)
    self.bn1 = nn.BatchNorm2d(out_channels)
    self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
    self.bn2 = nn.BatchNorm2d(out_channels)
    self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, bias=False)
    self.bn3 = nn.BatchNorm2d(out_channels*self.expansion)
    self.relu = nn.ReLU(inplace=True)

    if downsample:
      conv = nn.Conv2d(in_channels, out_channels*self.expansion, kernel_size=1, stride=stride, bias=False)
      bn = nn.BatchNorm2d(out_channels*self.expansion)
      downsample = nn.Sequential(conv, bn)
    else:
      downsample = None
    self.downsample = downsample
  def forward(self, x):
    i = x
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.conv2(x)
    x = self.bn2(x)
    x = self.relu(x)
    x = self.conv3(x)
    x = self.bn3(x)

    if self.downsample is not None:
      i = self.downsample(i)

    x += i
    x = self.relu(x)
    return x

In [None]:
#ResNet 모델 네트워크

class ResNet(nn.Module):
  def __init__(self, config, output_dim, zero_init_residual=False):
    super().__init__()

    block, n_blocks, channels = config #config 값들을 저장
    self.in_channels = channels[0]
    assert len(n_blocks) == len(channels) == 4 #assert?

    self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
    self.bn1 = nn.BatchNorm2d(self.in_channels)
    self.relu = nn.ReLU(inplace=True)
    self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
    self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1])
    self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2])
    self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3])

    self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
    self.fc = nn.Linear(self.in_channels, output_dim)

    if zero_init_residual: #각 residual branck 에 있는 마지막 BN 을 0으로 초기화하여 다음 Residual 분기가 0에서 시작할 수 있도록 함
      for m in self.modules():
        if isinstance(m, Bottleneck):
          nn.init.constant_(m.bn3.weight, 0)
        elif isinstance(m, BasicBlock):
          nn.init.constant_(m.bn2.weight, 0)

  def get_resnet_layer(self, block, n_blocks, channels, stride=1): #블록을 추가하기 위한 함
    layers = []
    #다운샘플링 필요 여부 결정
    if self.in_channels != channels * block.expansion:
      downsample = True
    else:
      downsample = False

    layers.append(block(self.in_channels, channels, stride, downsample))
    for i in range(1, n_blocks): #n_blocks 수만큼 계층 추가
      layers.append(block(channels * block.expansion, channels))

    self.in_channels = channels * block.expansion
    return nn.Sequential(*layers)

  def forward(self, x):
    x = self.conv1(x) #224x224 (stride 2)
    x = self.bn1(x)
    x = self.relu(x)
    x = self.maxpool(x) #112x112 (stride 2)
    #residual blocks
    x = self.layer1(x) #56x56
    x = self.layer2(x) #28x28
    x = self.layer3(x) #14x14
    x = self.layer4(x) #7x7
    x = self.avgpool(x) #1x1
    h = x.view(x.shape[0], -1)
    x = self.fc(h)
    return x, h


In [None]:
#ResNetConfig 정의

ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])

In [None]:
#기본 블록을 사용해서 ResNetConfig 정의

resnet18_config = ResNetConfig(block=BasicBlock, n_blocks=[2, 2, 2, 2], channels=[64, 128, 256, 512])
resnet34_config = ResNetConfig(block=BasicBlock, n_blocks=[3, 4, 6, 3], channels=[64, 128, 256, 512])

#병목 블록을 사용해서 ResNetConfig 정의

resnet50_config = ResNetConfig(block=Bottleneck, n_blocks=[3, 4, 6, 3], channels=[64, 128, 256, 512])
resnet101_config = ResNetConfig(block=Bottleneck, n_blocks=[3, 4, 23, 3], channels=[64, 128, 256, 512])
resnet152_config = ResNetConfig(block=Bottleneck, n_blocks=[3, 8, 36, 3], channels=[64, 128, 256, 512])

#사전 정의된 ResNet 사용

In [None]:
#사전 훈련된 ResNet 모델 사용

pretrained_model = models.resnet50(pretrained=True)



Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


100%|██████████| 97.8M/97.8M [00:00<00:00, 113MB/s]


In [None]:
print(pretrained_model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [None]:
#ResNet50 Config 를 사용한 ResNet 모델 사용

OUTPUT_DIM = 2
model = ResNet(resnet50_config, OUTPUT_DIM)
print(model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [None]:
#옵티마이저와 손실 함수 정의

optimizer = optim.Adam(model.parameters(), lr=1e-7)
criterion = nn.CrossEntropyLoss()

model = model.to(device)
criterion = criterion.to(device)

In [None]:
#모델 학습 정확도 측정 함수 정의

def calculate_topk_accuracy(y_pred, y, k=2):
  with torch.no_grad():
    batch_size = y.shape[0]
    _, top_pred = y_pred.topk(k,1) #주어진 텐서에서 가장 큰 값의 '인덱스' 추출
    top_pred = top_pred.t()
    correct = top_pred.eq(y.view(1,-1).expand_as(top_pred)) #두 텐서가 같은지 확인
    correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim=True)
    correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True) #이미지의 정확한 레이블 부여를 위해 사용
    acc_1 = correct_1 / batch_size
    acc_k = correct_k / batch_size
  return acc_1, acc_k

In [None]:
#모델 학습 함수 정의

def train(model, iterator, optimizer, criterion, scheduler, device):
  epoch_loss = 0
  epoch_acc_1 = 0
  epoch_acc_5 = 0

  model.train() #매 epoch 마다 학습 루프에 들어가기 전에 model.train() 을 호출해서 학습용 레이어들이 올바르게 작동되도록 함

  for (x, y) in iterator:
    x = x.to(device)
    y = y.to(device)

    optimizer.zero_grad()
    y_pred = model(x)
    loss = criterion(y_pred[0], y)

    acc_1, acc_5 = calculate_topk_accuracy(y_pred[0], y)
    loss.backward()
    optimizer.step()

    epoch_loss += loss.item()
    epoch_acc_1 += acc_1.item()
    epoch_acc_5 += acc_5.item()

  epoch_loss /= len(iterator)
  epoch_acc_1 /= len(iterator)
  epoch_acc_5 /= len(iterator)

  return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
#모델 평가 함수 정의

def evaluate(model, iterator, criterion, device):
  epoch_loss = 0
  epoch_acc_1 = 0
  epoch_acc_5 = 0

  model.eval()
  with torch.no_grad():
    for (x, y) in iterator:
      x = x.to(device)
      y = y.to(device)

      y_pred, _ = model(x)

      loss = criterion(y_pred[0], y)
      acc_1, acc_5 = calculate_topk_accuracy(y_pred[0], y)

      epoch_loss += loss.item()
      epoch_acc_1 += acc_1.item()
      epoch_acc_5 += acc_5.item()

      epoch_loss /= len(iterator)
      epoch_acc_1 /= len(iterator)
      epoch_acc_5 /= len(iterator)

  return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
#모델 학습 시간 측정 함수

def epoch_time(start_time, end_time):
  elapsed_time = end_time - start_time
  elapsed_mins = int(elapsed_time / 60)
  elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
  return elapsed_mins, elapsed_secs

In [None]:
from torch.optim import lr_scheduler

optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
#모델 학습

best_valid_loss = float('inf')
EPOCHS = 10

for epoch in range(EPOCHS):
  start_time = time.monotonic()
  train_loss, train_acc_1, train_acc_5 = train(model, train_iterator, optimizer, criterion, scheduler, device)
  valid_loss, valid_acc_1, valid_acc_5 = evaluate(model, val_iterator, criterion, device)

  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    torch.save(model.state_dict(), 'tut5-model.pt')

  end_time = time.monotonic()
  epoch_mins, epoch_secs = epoch_time(start_time, end_time)

  print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
  print(
    f'\t Train Loss: {train_loss:.3f} | '
    f'Train Acc @1: {train_acc_1*100:6.2f}% | '
    f'Train Acc @5: {train_acc_5*100:6.2f}%'
  )

  print(
      f'\t Val.  Loss: {valid_loss:.3f} | '
      f'Val.  Acc @1: {valid_acc_1*100:6.2f}% | '
      f'Val.  Acc @5: {valid_acc_5*100:6.2f}%'
  )

In [None]:
#테스트 데이터셋을 이용한 모델 예측

import pandas as pd

id_list = []
pred_list = []
_id = 0

with torch.no_grad():
  for test_path in test_images_filepaths:
    img = Image.open(test_path)
    _id = test_path.split('/')[-1].split('.')[1]
    transform = ImageTransform(size, mean, std)
    img = transform(img, phase='val')
    img = img.unsqueeze(0)
    img = img.to(device)

    model.eval()
    outputs = model(img)
    preds = F.softmax(outputs[0], dim=1)[:, 1].tolist()
    id_list.append(_id)
    pred_list.append(preds[0])

res = pd.DataFrame({
    'id': id_list,
    'label': pred_list
})

res.sort_values(by='id', inplace=True)
res.reset_index(drop=True, inplace=True)

res.to_csv('submission.csv', index=False)
res.head(10)

In [None]:
#모델 예측에 대한 결과 출력

class_ = classes = {0:'cat', 1:'dog'}
def display_image_grid(images_filepaths, predicted_labels=(), cols=5):
  rows = len(images_filepaths) // cols
  figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(12, 6))
  for i, images_filepaths in enumerate(images_filepaths):
    image = cv2.imread(image_filepath)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    a = random.choice(res['id'].values)
    label = res.loc[res['id'] == a, 'label'].values[0]

    if label > 0.5:
      label = 1
    else:
      label = 0

    ax.ravel()[i].imshow(image)
    ax.ravel()[i].set_title(class_[label])
    ax.ravel()[i].set_axis_off()

  plt.tight_layout()
  plt.show()

display_image_grid(test_images_filepaths)
