<a href="https://colab.research.google.com/github/freud-sensei/imfine_torch/blob/main/%5B%EB%B1%80%EA%B3%BC%ED%9A%83%EB%B6%88%5D_%EC%9E%91%EB%AC%BC_%EC%9E%8E_%EC%82%AC%EC%A7%84%EC%9C%BC%EB%A1%9C_%EC%A7%88%EB%B3%91_%EB%B6%84%EB%A5%98%ED%95%98%EA%B8%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 실험 설계를 위한 데이터 분할

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
os.chdir('/content/drive/MyDrive')

In [None]:
!unzip -qq './dataset.zip' -d './dataset'

## 데이터 분할을 위한 폴더 생성

In [None]:
import os
import shutil

original_dataset_dir = './dataset'
classes_list = os.listdir(original_dataset_dir) # 경로 하위에 있는 모든 폴더의 목록을 가져온다
base_dir = './splitted' # 나눈 데이터를 저장할 폴더
os.mkdir(base_dir)

# train, valid, test 폴더를 만드는 과정
train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

# train, valid, test 폴더 아래에 각 클래스별 폴더 생성
for clss in classes_list:
  os.mkdir(os.path.join(train_dir, clss))
  os.mkdir(os.path.join(validation_dir, clss))
  os.mkdir(os.path.join(test_dir, clss))

## 데이터 분할과 클래스별 데이터 수 확인

In [None]:
import math

for clss in classes_list:
  # train, validation, test dataset으로 분
  path = os.path.join(original_dataset_dir, clss) # 예: 'leaf_dataset/Apple__Apple_scab'
  fnames = os.listdir(path) # 각 class 폴더별 이미지의 리스트. 그 수를 확인할 수 있겠지
  train_size = math.floor(len(fnames) * 0.6)
  validation_size = math.floor(len(fnames) * 0.2)
  test_size = math.floor(len(fnames) * 0.2)

  # 원래 폴더에서 각 클래스별 폴더로 이동
  train_fnames = fnames[:train_size]
  print('Train size(', clss, '): ', len(train_fnames))
  for fname in train_fnames:
    source = os.path.join(path, fname) # 예: 'leaf_dataset/Apple__Apple_scab/image (1).JPG'
    dest = os.path.join(os.path.join(train_dir, clss), fname) # 예: 'splitted/train/Apple_Apple_scab/image (1).JPG'
    shutil.copyfile(source, dest)

  validation_fnames = fnames[train_size:(validation_size + train_size)]
  print('Validation size(', clss, '): ', len(validation_fnames))
  for fname in validation_fnames:
    source = os.path.join(path, fname)
    dest = os.path.join(os.path.join(validation_dir, clss), fname)
    shutil.copyfile(source, dest)

  test_fnames = fnames[(train_size + validation_size):(validation_size + train_size + test_size)]
  print('Test size(', clss, '): ', len(test_fnames))
  for fname in test_fnames:
    source = os.path.join(path, fname)
    dest = os.path.join(os.path.join(test_dir, clss), fname)
    shutil.copyfile(source, dest)

Train size( Pepper,_bell___healthy ):  886
Validation size( Pepper,_bell___healthy ):  295
Test size( Pepper,_bell___healthy ):  295
Train size( Grape___Esca_(Black_Measles) ):  829
Validation size( Grape___Esca_(Black_Measles) ):  276
Test size( Grape___Esca_(Black_Measles) ):  276
Train size( Pepper,_bell___Bacterial_spot ):  598
Validation size( Pepper,_bell___Bacterial_spot ):  199
Test size( Pepper,_bell___Bacterial_spot ):  199
Train size( Strawberry___healthy ):  273
Validation size( Strawberry___healthy ):  91
Test size( Strawberry___healthy ):  91
Train size( Grape___Black_rot ):  708
Validation size( Grape___Black_rot ):  236
Test size( Grape___Black_rot ):  236
Train size( Corn___Common_rust ):  715
Validation size( Corn___Common_rust ):  238
Test size( Corn___Common_rust ):  238
Train size( Apple___Apple_scab ):  378
Validation size( Apple___Apple_scab ):  126
Test size( Apple___Apple_scab ):  126
Train size( Potato___healthy ):  91
Validation size( Potato___healthy ):  30


## 베이스라인 모델 학습을 위한 준비

In [None]:
import torch
use_cuda = torch.cuda.is_available()
device = 'cuda' if use_cuda else 'cpu'
batch_size = 256
epoch = 1

**1. Dataset, DataLoader 생성**

In [None]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder # 데이터셋을 불러온다. 하나의 클래스가 하나의 폴더인 형태에 유용.

transform_base = transforms.Compose([transforms.Resize((64, 64)),
                                     transforms.ToTensor()])
# 이미지의 크기를 (64, 64)로 조정하고, 이후 Tensor 형태로 변환 후 값을 0에서 1 사이로 normalize한다.
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base)
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)

from torch.utils.data import DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

In [None]:
x = train_loader.__iter__().__next__()
print(x[0].shape, x[1].shape)

torch.Size([256, 3, 64, 64]) torch.Size([256])


## 베이스라인 모델 설계

**2. 모델 설계하기**

**3. 옵티마이저와 손실함수 설정하기**

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
  def __init__(self):
    super().__init__()
    self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
    self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
    self.dropout1 = nn.Dropout(p=0.25)
    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
    self.dropout2 = nn.Dropout(p=0.25)
    self.conv3 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
    self.dropout3 = nn.Dropout(p=0.25)
    self.fc1 = nn.Linear(4096, 512)
    self.dropout4 = nn.Dropout(p=0.5)
    self.fc2 = nn.Linear(512, 33)

  def forward(self, x):
    x = self.conv1(x)
    x = F.relu(x)
    x = self.pool(x)
    x = self.dropout1(x)

    x = self.conv2(x)
    x = F.relu(x)
    x = self.pool(x)
    x = self.dropout2(x)

    x = self.conv3(x)
    x = F.relu(x)
    x = self.pool(x)
    x = self.dropout3(x)

    x = x.view(-1, 4096) # (여기서 0번째 차원의 길이는 batch의 데이터 수가 된다.)
    x = self.fc1(x)
    x = F.relu(x)
    x = self.dropout4(x)
    x = self.fc2(x)

    return F.log_softmax(x, dim=1)

model_base = Net().to(device)
optimizer = optim.Adam(model_base.parameters(), lr=0.001)

## 모델 학습을 위한 함수

In [None]:
def train(model, train_loader, optimizer):
  model.train()
  for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    optimizer.zero_grad()
    output = model(data)
    loss = F.cross_entropy(output, target)
    loss.backward()
    optimizer.step()

## 모델 평가를 위한 함수

In [None]:
def evaluate(model, test_loader):
  model.eval()
  test_loss = 0
  correct = 0

  with torch.no_grad(): # parameter 업데이트 중단
    for data, target in test_loader:
      data, target = data.to(device), target.to(device)
      output = model(data)
      test_loss += F.cross_entropy(output, target, reduction='sum').item()
      pred = output.max(1)[1]
      correct += (pred == target).sum()

  test_loss /= len(test_loader.dataset)
  test_accuracy = 100. * correct / len(test_loader.dataset)
  return test_loss, test_accuracy

## 학습 실행

In [None]:
import time
import copy

def train_baseline(model, train_loader, val_loader, optimizer, num_epochs = 30):
  best_acc = 0.0
  best_model = copy.deepcopy(model.state_dict())
  # state_dict 는 간단히 말해 각 계층을 매개변수 텐서로 매핑되는 Python 사전(dict) 객체입니다.

  for epoch in range(num_epochs):
    since = time.time()
    train(model, train_loader, optimizer)
    train_loss, train_acc = evaluate(model, train_loader) # 해당 epoch에서의 학습 loss, 정확도
    val_loss, val_acc = evaluate(model, val_loader) # 해당 epoch에서의 검증 loss, 정확도
    if val_acc > best_acc:
      best_acc = val_acc
      best_model_wts = copy.deepcopy(model.state_dict())
    time_elapsed = time.time() - since

    print(f"epoch {epoch}")
    print(f"train loss: {train_loss:.4f}, accuracy: {train_acc:.2f}")
    print(f"validation loss: {val_loss:.4f}, accuracy: {val_acc:.2f}")
    print(f"completed in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")

  model.load_state_dict(best_model_wts)
  return model

base = train_baseline(model_base, train_loader, val_loader, optimizer, 3)
torch.save(base, 'baseline.pt')

epoch 0
train loss: 0.0475, accuracy: 98.91
validation loss: 0.1961, accuracy: 93.78
completed in 1m 55s
epoch 1
train loss: 0.0475, accuracy: 98.91
validation loss: 0.1961, accuracy: 93.78
completed in 1m 54s
epoch 2
train loss: 0.0475, accuracy: 98.91
validation loss: 0.1961, accuracy: 93.78
completed in 2m 2s


# Transfer Learning

* 높은 성능의 이미지 분류 모델을 구축하기 위해선, 질 좋은 데이터셋이 필요하다. 하지만 이를 대량으로 구하기는 어려움
* **Transfer Learning(전이학습)**: 대량의 데이터셋으로 학습된 모델(pre-trained model)을 재활용한 후, 일부를 조정(fine-tuning)하여 다른 주제의 이미지 분류 모델에 사용하는 것
* `torchvision.models` 패키지의 모델을 사용 가능

**가중치 학습**
* 처음부터 딥러닝 모델을 설계하고 학습을 진행할 땐, 초기 parameter 값이 random하게 설정된다.
* transfer learning을 할 땐, 대량의 데이터로 학습된 parameter 값을 불러온 후 학습 과정에서 update하게 된다.

**Fine-tuning은 일종의 가중치 갱신**
* parameter 값 일부를 데이터셋의 특성에 맞게 다시 학습하여 parameter를 조정하는 것 (layer를 freeze)
* pre-trained model의 일부 layer는 업데이트되지 않고, 일부 layer는 업데이트되게끔 함

**Freeze할 Layer를 결정하는 방법**
* 큰 데이터셋 + pre-trained model 데이터셋과 유사: 일부 레이어를 훈련시킨다. (유사도가 높아 일부만 update해도 높은 효과)
* 큰 데이터셋 + pre-trained model 데이터셋과 상이: 모델 전체를 훈련시킨다.
* 작은 데이터셋 + pre-trained model 데이터셋과 유사: Convolutional Base를 freeze한다.
* 작은 데이터셋 + pre-trained model 데이터셋과 상이: 일부 레이어를 훈련시킨다. (전체를 훈련하면 과적합의 위험)

## Transfer Learning을 위한 준비

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize([64, 64]),
        transforms.RandomHorizontalFlip(), # 무작위샘플을 좌우반전 (default p=.5)
        transforms.RandomVerticalFlip(), # 무작위샘플을 상하반전 (default p=.5)
        transforms.RandomCrop(52), # 일부를 랜덤하게 잘라내어 52*52 사이즈로 변경
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
        # RGB 채널 값에서 정규화를 적용할 평균값, 표준편차 값
        # 정규화는 모델 최적화 및 local minimum 방지에 도움이 됨
    ]),
    'val': transforms.Compose([
        transforms.Resize([64, 64]),
        transforms.RandomCrop(52),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])
}

data_dir = './splitted'
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), transform=data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], shuffle=True, batch_size=batch_size, num_workers=2) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

In [None]:
from torchvision import models
resnet = models.resnet50(pretrained=True)
num_features = resnet.fc.in_features # ResNet의 마지막 Layer의 입력 채널 수
# 모형의 마지막 FCN 대신 출력채널의 수가 33개인 새로운 Layer를 추가해야

resnet.fc = nn.Linear(num_features, 33) # 이것만 바꿔치기
resnet = resnet.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)
# 설정한 일부 layer의 parameter만을 업데이트
# requires_grad가 True로 설정된 parameter만

from torch.optim import lr_scheduler
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
# optimizer의 learning rate를 epoch에 따라 변경하는 역할
# 7 epoch마다 0.1씩 곱해 lr을 감소시킨다


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 159MB/s]


본 transfer learning에서는 fully connected layer 앞부분의 parameter는 고정시키고, 뒷부분만 학습하도록 하자.

In [None]:
count = 0
for child in resnet.children(): # 모델의 자식 모듈을 iterable로 반환하는 메서드
  count += 1
  if count < 6:
    for param in child.parameters():
      param.requires_grad = False
      # 1~5번 layer의 parameter는 update X. 6~10번만.

## 모델 학습 및 검증

In [None]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=10):
  best_model = copy.deepcopy(model.state_dict())
  best_acc = .0

  for epoch in range(3):
    print(f"epoch {epoch + 1}")
    since = time.time()

    for phase in ['train', 'val']:
      if phase == 'train':
        model.train()
      else:
        model.eval()
      running_loss = .0
      running_corrects = 0

      for data, target in dataloaders[phase]:
        data = data.to(device)
        target = target.to(device)
        optimizer.zero_grad()

        with torch.set_grad_enabled(phase=='train'):
          # phase가 train일 때만 gradient를 계산
          outputs = model(data)
          preds = torch.max(outputs, axis=1)[1]
          loss = criterion(outputs, target)
          if phase == 'train':
            loss.backward()
            optimizer.step()

        running_loss += loss.item() * data.size(0) # data.size(0)은 배치 사이즈, 모든 데이터 Loss를 합산하고자 함
        running_corrects += torch.sum(preds == target)

      if phase == 'train':
        scheduler.step()
        l_r = [x['lr'] for x in optimizer.param_groups]
        # 각 epoch의 learning rate를 불러온다
        print('learning rate: ', l_r)

      epoch_loss = running_loss / dataset_sizes[phase]
      epoch_acc = running_corrects.double() / dataset_sizes[phase]
      print(f"{phase} Loss: {epoch_loss:.4f} Accuracy: {epoch_acc:.4f}")

      if phase == 'val' and epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print(f"completed in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")

  print(f"Best val acc: {best_acc:.4f}")
  model.load_state_dict(best_model)

  return model

In [None]:
model_resnet50 = train_resnet(resnet, criterion, optimizer,
                              exp_lr_scheduler, num_epochs=epoch)
torch.save(model_resnet50, 'resnet50.pt')

# 아마 훨씬 빠를 겁니다. 그러길 바랍니다.

epoch 1
learning rate:  [0.001]
train Loss: 0.2390 Accuracy: 0.9223
val Loss: 0.2135 Accuracy: 0.9389
completed in 1m 21s
epoch 2
learning rate:  [0.001]
train Loss: 0.1686 Accuracy: 0.9443
val Loss: 0.1561 Accuracy: 0.9483
completed in 1m 21s
epoch 3
learning rate:  [0.001]
train Loss: 0.1213 Accuracy: 0.9601
val Loss: 0.1326 Accuracy: 0.9562
completed in 1m 20s
Best val acc: 0.9562


# 모델 평가

## 베이스라인 모델 평가를 위한 전처리

In [None]:
transform_base = transforms.Compose([transforms.Resize([64, 64]),
                                     transforms.ToTensor()])
test_base = ImageFolder(root='./splitted/test',
                        transform=transform_base)
test_loader_base = torch.utils.data.DataLoader(test_base,
                      batch_size=batch_size, shuffle=True,
                                               num_workers=2)

## Transfer Learning 모델 평가를 위한 전처리

In [None]:
transform_resNet = transforms.Compose([
    transforms.Resize([64, 64]),
    transforms.RandomCrop(52),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_resNet = ImageFolder(root='./splitted/test', transform=transform_resNet)
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, batch_size=batch_size,
                                                 shuffle=True, num_workers=2)

## 베이스라인 모델 성능 평가하기

In [None]:
baseline = torch.load('baseline.pt')
baseline.eval()
test_loss, test_accuracy = evaluate(baseline, test_loader_base)
print(test_accuracy)

tensor(94.1044, device='cuda:0')


## Transfer Learning 모델 성능 평가하기

In [None]:
resnet50 = torch.load('resnet50.pt')
resnet50.eval()
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)
print(test_accuracy)

tensor(95.8818, device='cuda:0')


**Transfer Learning의 장점**

* 대량의 데이터로 사전 학습한 모델을 사용하는 만큼, 더 높은 예측 성능을 보인다. (내가 고생하기 싫어서 에폭을 3으로만 설정해서 그렇지, 값을 바꾸면 아마 성능 격차가 더 커질 거다)
* 모델 훈련 속도가 더 빠르다. (**아주 큰 장점이다**)
* 이미지 분류 외 영상, 자연어 처리 등 다양한 분야에 사용될 수 있다.