In [1]:
# gdrive에 mount
from google.colab import drive
drive.mount('/content/drive', force_remount = True)

Mounted at /content/drive


In [2]:
# 경로 설정
import os
os.chdir('/content/drive/MyDrive/tobigs16_강의자료/9주차/정규세션_CNN심화')

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Load Data
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder 
from torch.utils.data import DataLoader

# Pytorch --> CNN
import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim

from torchsummary import summary 

In [4]:
import torch

if torch.cuda.is_available():
  DEVICE = torch.device('cuda')
else:
  DEVICE = torch.device('cpu')
print(DEVICE)

cuda


In [5]:
import shutil

original_dataset_dir = '/content/drive/MyDrive/tobigs16_강의자료/9주차/정규세션_CNN심화/data/'                   # 기존의 데이터
classes_list = os.listdir(original_dataset_dir) 
 
base_dir = '/splitted'                           # train-validation-test 나누기
os.makedirs(base_dir,exist_ok=True)
 
train_dir = os.path.join(base_dir, 'train') 
os.makedirs(train_dir,exist_ok=True)
validation_dir = os.path.join(base_dir, 'val')
os.makedirs(validation_dir,exist_ok=True)
test_dir = os.path.join(base_dir, 'test')
os.makedirs(test_dir,exist_ok=True)

for cls in classes_list:     
    os.makedirs(os.path.join(train_dir, cls),exist_ok=True)
    os.makedirs(os.path.join(validation_dir, cls),exist_ok=True)
    os.makedirs(os.path.join(test_dir, cls),exist_ok=True)

In [6]:
## 데이터 분할 후 클래스별 데이터 수 확인

import math
 
for cls in classes_list:
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path)
 
    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)
    
    train_fnames = fnames[:train_size]
    print("Train size(",cls,"): ", len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, cls), fname)
        shutil.copyfile(src, dst)
        
    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size(",cls,"): ", len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)
        
    test_fnames = fnames[(train_size+validation_size):(validation_size + train_size +test_size)]

    print("Test size(",cls,"): ", len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)

Train size( dog ):  197
Validation size( dog ):  65
Test size( dog ):  65
Train size( horse ):  90
Validation size( horse ):  30
Test size( horse ):  30
Train size( house ):  147
Validation size( house ):  49
Test size( house ):  49
Train size( person ):  239
Validation size( person ):  79
Test size( person ):  79
Train size( giraffe ):  141
Validation size( giraffe ):  47
Test size( giraffe ):  47
Train size( guitar ):  80
Validation size( guitar ):  26
Test size( guitar ):  26
Train size( elephant ):  123
Validation size( elephant ):  41
Test size( elephant ):  41


In [7]:
BATCH_SIZE = 64
EPOCH = 30 

In [17]:
transform_base = transforms.Compose([transforms.ToTensor()]) 
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base) 
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)

In [None]:
'''transform_base = transforms.Compose([transforms.Resize((64,64)),transforms.ToTensor()]) 
basic_ = '/content/drive/MyDrive/tobigs16_강의자료/9주차/정규세션_CNN심화'
train_dataset = ImageFolder(root=basic_+'/splitted/train', transform=transform_base) 
val_dataset = ImageFolder(root=basic_+'/splitted/val', transform=transform_base)'''

In [18]:
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset,batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

  cpuset_checked))


Baseline Model

In [19]:
class Net(nn.Module):
    def __init__(self, num_classes = 7):
        super(Net, self).__init__()
        self.convnet = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, padding=0, stride=4),
            nn.ReLU(inplace=True),
            #nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, padding=2, stride=1),
            nn.ReLU(inplace=True),
            #nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2),
            
            nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            #nn.LocalResponseNorm(size=5, k=2),
            nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            #nn.LocalResponseNorm(size=5, k=2),
            nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, padding=1, stride=1),
            nn.ReLU(inplace=True),
            #nn.LocalResponseNorm(size=5, k=2),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )

        self.fclayer = nn.Sequential(
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(4096, num_classes),
        )
    
    def forward(self, x):
        x = self.convnet(x)
        x = torch.flatten(x, 1)
        x = self.fclayer(x)
        return F.log_softmax(x)

model_base = Net().to(DEVICE)
print(model_base)
summary(model_base, (3, 227, 227)) 

Net(
  (convnet): Sequential(
    (0): Conv2d(3, 96, kernel_size=(11, 11), stride=(4, 4))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(96, 256, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(256, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fclayer): Sequential(
    (0): Linear(in_features=9216, out_features=4096, bias=True)
    (1): ReLU(inplace=True)
    (2): Dropout(p=0.5, inplace=False)
    (3): Linear(in_features=4096, out_features=4



In [20]:
# Optimizer, Loss function 은 편하신 대로 변경하시면 됩니다 :) 

optimizer = optim.Adam(model_base.parameters(), lr=0.001) 
criterion = nn.CrossEntropyLoss()

In [21]:
def train(model, train_loader, optimizer):
    model.train()                         # 모델 train 상태로
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)   # data, target 값 DEVICE에 할당
        optimizer.zero_grad()                                             # optimizer gradient 값 초기화
        output = model(data)                                              # 할당된 데이터로 output 계산
        loss = criterion(output,target)                                               # Cross Entropy Loss 사용해 loss 계산
        loss.backward()                                               # 계산된 loss back propagation
        optimizer.step()                                    # parameter update

In [22]:
def evaluate(model, test_loader):
    model.eval()      # 모델 평가 상태로
    test_loss = 0     # test_loss 초기화
    correct = 0       # 맞게 예측한 0 값으로 초기화
    
    with torch.no_grad(): 
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)     # data, target DEVICE에 할당
            output = model(data)                                                   # output 계산
            test_loss += criterion(output, target).item()         # loss 계산(총 loss 에 더해주기)
            pred = output.max(1, keepdim=True)[1]                 # 계산된 벡터값 중 가장 큰 값 가지는 class 예측
            correct += pred.eq(target.view_as(pred)).sum().item() # 맞게 예측한 값 세기
   
    test_loss /= len(test_loader.dataset)                         # 평균 loss
    test_accuracy = 100. * correct / len(test_loader.dataset)     # test(validation) 데이터 정확도
    return test_loss, test_accuracy  

Train Model

In [23]:
import time
import copy
 
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  # beset accuracy 초기화
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()                                     # 학습 시간 계산
        train(model, train_loader, optimizer)                   # train 데이터로 학습
        train_loss, train_acc = evaluate(model, train_loader)   # train_loss, train_acc 계산
        val_loss, val_acc = evaluate(model, val_loader)         # valid_loss, valid_acc 계산
        
        if val_acc>best_acc:  # update best accuracy
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since # 학습 시간 출력
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 

    model.load_state_dict(best_model_wts)  
    return model

base = train_baseline(model_base ,train_loader, val_loader, optimizer)  	# 모델 학습시키기
torch.save(base,'AlexNet.pt')                                             # 모델 저장

  cpuset_checked))


-------------- epoch 1 ----------------
train Loss: 0.0299, Accuracy: 23.50%
val Loss: 0.0336, Accuracy: 23.44%
Completed in 1m 2s
-------------- epoch 2 ----------------
train Loss: 0.0296, Accuracy: 23.50%
val Loss: 0.0336, Accuracy: 23.44%
Completed in 0m 9s
-------------- epoch 3 ----------------
train Loss: 0.0297, Accuracy: 21.14%
val Loss: 0.0337, Accuracy: 20.77%
Completed in 0m 9s
-------------- epoch 4 ----------------
train Loss: 0.0286, Accuracy: 28.32%
val Loss: 0.0330, Accuracy: 27.89%
Completed in 0m 9s
-------------- epoch 5 ----------------
train Loss: 0.0296, Accuracy: 21.73%
val Loss: 0.0333, Accuracy: 20.77%
Completed in 0m 9s
-------------- epoch 6 ----------------
train Loss: 0.0298, Accuracy: 23.50%
val Loss: 0.0353, Accuracy: 23.44%
Completed in 0m 9s
-------------- epoch 7 ----------------
train Loss: 0.0290, Accuracy: 26.25%
val Loss: 0.0330, Accuracy: 25.82%
Completed in 0m 9s
-------------- epoch 8 ----------------
train Loss: 0.0280, Accuracy: 28.71%
val Lo

Test Model

In [24]:
# Test Data Classification 하기
transform_base = transforms.Compose([transforms.ToTensor()]) 
test_base = ImageFolder(root='./splitted/test',transform=transform_base)  
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

  cpuset_checked))


In [26]:
tmp = []
for batch_idx, (data,target) in enumerate(test_loader_base):
  data, target = data.to(DEVICE), target.to(DEVICE)
  output = model_base(data)
  tmp.append(output)

  cpuset_checked))


In [32]:
def predicit_test(model, data_loader):
    #### Test 데이터의 class 를 분류하는 함수를 만들어 주세요 ####
    total = 0
    correct = 0
    with torch.no_grad():
      for batch_idx, (data,target) in enumerate(data_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        output = model(data)
        _, predicted = torch.max(output.data,1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
    accuracy = 100*correct/total
    print('accuracy:',accuracy)
    return accuracy

In [33]:
predicit_test(model_base,test_loader_base)

  cpuset_checked))


accuracy: 34.4213649851632


34.4213649851632

Transfer Learning with 모델 학습
Transfer Learning이 익숙하지 않으신 분들은 PyTorch에서 제공하는 https://9bow.github.io/PyTorch-tutorials-kr-0.3.1/beginner/transfer_learning_tutorial.html 을 참고하세요 :)

In [85]:
# Data Augmentation 
data_transforms = {
    'train': transforms.Compose([
        ### 데이터 전처리를 진행해 주세요 ###
        #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.ToTensor()]),
    
    'val': transforms.Compose([
        ### 데이터 전처리를 진행해 주세요 ###
        #transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.ToTensor()])
}

train_dataset_resnet = ImageFolder(root='./splitted/train', transform = data_transforms['train'])
val_dataset_resnet = ImageFolder(root='./splitted/val', transform = data_transforms['val'])

train_loader_resnet = torch.utils.data.DataLoader(train_dataset_resnet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader_resnet = torch.utils.data.DataLoader(val_dataset_resnet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

  cpuset_checked))


In [86]:
from torchvision import models
 
resnet = models.resnet50(pretrained=True)   # resnet50 불러오기, pretrained = True로 학습된 파라미터 값들 불러오기
num_ftrs = resnet.fc.in_features            # resnet50의 fully connected layer의 input 노드 수
resnet.fc = nn.Linear(num_ftrs, 7)         # input 노드 수를 이용해 새로운 layer 추가
resnet = resnet.to(DEVICE)                  # resnet 모델 DEVICE에 할당
 
criterion = nn.CrossEntropyLoss()                                                           # CrossEntropyLoss 로 loss 계산
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001) # optimizer -> adam

In [87]:
# Pretrained Model의 일부 layer freeze 하기
ct = 0 
for child in resnet.children():  
    ct+= 1  
    if ct < 7: 
        for param in child.parameters():
            param.requires_grad = False

Fine Tuning을 진행해 주세요!

In [89]:
def train_resnet(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  # beset accuracy 초기화
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()                                     # 학습 시간 계산
        train(model, train_loader, optimizer)                   # train 데이터로 학습
        train_loss, train_acc = evaluate(model, train_loader)   # train_loss, train_acc 계산
        val_loss, val_acc = evaluate(model, val_loader)         # valid_loss, valid_acc 계산
        
        if val_acc>best_acc:  # update best accuracy
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = tㅈㄱㄱime.time() - since # 학습 시간 출력
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 

    model.load_state_dict(best_model_wts)  
    return model

In [90]:
resnet_train = train_resnet(resnet, train_loader_resnet, val_loader_resnet, optimizer_ft)  # resnet transfer learning 진행
torch.save(resnet_train,'resNet.pt')   

  cpuset_checked))


-------------- epoch 1 ----------------
train Loss: 0.5647, Accuracy: 37.76%
val Loss: 0.6108, Accuracy: 31.16%
Completed in 0m 27s
-------------- epoch 2 ----------------
train Loss: 0.0097, Accuracy: 85.55%
val Loss: 0.0221, Accuracy: 70.33%
Completed in 0m 47s
-------------- epoch 3 ----------------
train Loss: 0.0025, Accuracy: 95.38%
val Loss: 0.0130, Accuracy: 81.01%
Completed in 0m 28s
-------------- epoch 4 ----------------
train Loss: 0.0019, Accuracy: 95.97%
val Loss: 0.0154, Accuracy: 80.42%
Completed in 0m 27s
-------------- epoch 5 ----------------
train Loss: 0.0153, Accuracy: 81.71%
val Loss: 0.0344, Accuracy: 67.36%
Completed in 0m 27s
-------------- epoch 6 ----------------
train Loss: 0.0019, Accuracy: 96.56%
val Loss: 0.0128, Accuracy: 77.45%
Completed in 0m 27s
-------------- epoch 7 ----------------
train Loss: 0.0034, Accuracy: 93.81%
val Loss: 0.0149, Accuracy: 75.07%
Completed in 0m 27s
-------------- epoch 8 ----------------
train Loss: 0.0004, Accuracy: 99.12%

In [91]:
# Classification 하기
transform_res = transforms.Compose([transforms.ToTensor()]) 
test_res = ImageFolder(root='./splitted/test',transform=transform_res)  
test_loader_res = torch.utils.data.DataLoader(test_res, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

  cpuset_checked))


In [92]:
def prediction(model, test_loader):
    ### FineTuning을 진행한 모델을 가지고 예측을 진행해 주세요    
    total = 0
    correct = 0
    with torch.no_grad():
      for batch_idx, (data,target) in enumerate(test_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        output = model(data)
        _, predicted = torch.max(output.data,1)
        total += target.size(0)
        correct += (predicted == target).sum().item()
    accuracy = 100*correct/total
    print('accuracy:',accuracy)
    return accuracy
prediction(resnet,test_loader_res)

  cpuset_checked))


accuracy: 84.86646884272997


84.86646884272997

모델 평가
모델 평가를 진행할 때 accuracy, precision, recall f1-score 등 다양한 평가지표를 가지고 진행해 주세요 ~!