In [None]:
from google.colab import auth
auth.authenticate_user()

from google.colab import drive
drive.mount('/content/gdrive', force_remount=False)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
import os
from pathlib import Path

# folder 변수에 구글드라이브에 프로젝트를 저장한 디렉토리를 입력하세요!
folder = ""
project_dir = "02_cnn_pt"

base_path = Path("/content/gdrive/My Drive/")
project_path = base_path / folder / project_dir
os.chdir(project_path)
for x in list(project_path.glob("*")):
    if x.is_dir():
        dir_name = str(x.relative_to(project_path))
        os.rename(dir_name, dir_name.split(" ", 1)[0])
print(f"현재 디렉토리 위치: {os.getcwd()}")

현재 디렉토리 위치: /content/gdrive/My Drive/02_cnn_pt


In [None]:
## 첫번째 코드블록
import torch 
print('pytorch version: {}'.format(torch.__version__))

pytorch version: 1.6.0+cu101


## 1. Package load

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import glob
import os
import matplotlib.pyplot as plt
import numpy as np
import check_util.checker as checker 
%matplotlib inline

print('pytorch version: {}'.format(torch.__version__))
print('GPU 사용 가능 여부: {}'.format(torch.cuda.is_available()))
device = "cuda" if torch.cuda.is_available() else "cpu"   # GPU 사용 가능 여부에 따라 device 정보 저장

pytorch version: 1.6.0+cu101
GPU 사용 가능 여부: True


## 2. 데이터셋 다운로드 및 훈련, 검증, 테스트 데이터셋 구성

In [None]:
import zipfile
from pathlib import Path

current_path = Path().absolute()
data_path = current_path / "data"
print("현재 디렉토리 위치: {}".format(current_path))
if (data_path / "my_cat_dog").exists():
    print("이미 'data/my_cat_dog' 폴더에 압축이 풀려있습니다. 확인해보세요!")
else:
    with zipfile.ZipFile(str(data_path / "my_cat_dog.zip"), "r") as zip_ref:
        zip_ref.extractall(str(data_path / "my_cat_dog"))
    print("Done!")

현재 디렉토리 위치: /content/gdrive/My Drive/02_cnn_pt
이미 'data/my_cat_dog' 폴더에 압축이 풀려있습니다. 확인해보세요!


## 3. 하이퍼파라미터 세팅

In [None]:
batch_size = 100
num_epochs = 10
learning_rate = 0.0001

## 4. Dataset 및 DataLoader 할당


In [None]:
class CatDogDataset(Dataset):
    def __init__(self, data_dir, mode, transform=None):
        self.all_data = sorted(glob.glob(os.path.join(data_dir, mode, '*', '*')))
        self.transform = transform
    
    def __getitem__(self, index):
        # 반환할 이미지 경로 정의 및 이미지 로드
        data_path = self.all_data[index]  
        img = Image.open(data_path)        
        if self.transform != None:
          img = self.transform(img)              
        
        # Step 2: 이미지에 대한 label 정의
        label = None
        if os.path.basename(data_path).startswith("cat"):
          label = 0
        else:
          label = 1

        return img, label
    
    def __len__(self):
        length = len(self.all_data)
        return length
        

### 데이터 증식(Data Augmentation) 


In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomRotation(5),
        transforms.RandomHorizontalFlip(),
        transforms.RandomResizedCrop(120, scale=(0.96, 1.0), ratio=(0.95, 1.05)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize([120, 120]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

train_data = CatDogDataset(data_dir='./data/my_cat_dog', mode='train', transform=data_transforms['train'])
val_data = CatDogDataset(data_dir='./data/my_cat_dog', mode='val', transform=data_transforms['val'])
test_data = CatDogDataset(data_dir='./data/my_cat_dog', mode='test', transform=data_transforms['val'])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, drop_last=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)

## 5. 네트워크 설계

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super().__init__()
        # self.conv 구현
        self.conv = nn.Sequential(
            nn.Conv2d(3, 32, 3), 
            nn.BatchNorm2d(32),   
            nn.ReLU(),    
            nn.MaxPool2d(2),
            
            nn.Conv2d(32, 64, 3), 
            nn.BatchNorm2d(64),   
            nn.ReLU(),    
            nn.MaxPool2d(2),
            
            nn.Conv2d(64, 128, 3), 
            nn.BatchNorm2d(128),   
            nn.ReLU(),    
            nn.MaxPool2d(2),
            
            nn.Conv2d(128, 128, 3), 
            nn.BatchNorm2d(128),   
            nn.ReLU(),    
        )
        
        # self.fc 구현
        self.fc1 = nn.Linear(128*5*5, 512)
        self.fc2 = nn.Linear(512, 2)
    
    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

## 6. train, validation, test 함수 정의

### 훈련 함수


In [None]:
def train(num_epochs, model, data_loader, criterion, optimizer, saved_dir, val_every, device):
    print('Start training..')
    best_loss = 9999999
    for epoch in range(num_epochs):
        for i, (imgs, labels) in enumerate(data_loader):
            imgs, labels = imgs.to(device), labels.to(device)
            ## 코드 시작 ##
            outputs = model(imgs)  
            loss = criterion(outputs, labels)    

            optimizer.zero_grad()        
            loss.backward()         
            optimizer.step()          

            _, argmax = torch.max(outputs, 1)
            accuracy = (labels == argmax).float().mean()

            if (i+1) % 3 == 0:
                print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'.format(
                    epoch+1, num_epochs, i+1, len(data_loader), loss.item(), accuracy.item() * 100))

        if (epoch + 1) % val_every == 0:
            avrg_loss = validation(epoch + 1, model, val_loader, criterion, device)
            if avrg_loss < best_loss:
                print('Best performance at epoch: {}'.format(epoch + 1))
                print('Save model in', saved_dir)
                best_loss = avrg_loss
                save_model(model, saved_dir)

### 검증 함수


In [1]:
def validation(epoch, model, data_loader, criterion, device):
    print('Start validation #{}'.format(epoch) )
    model.eval()
    with torch.no_grad():
        total = 0
        correct = 0
        total_loss = 0
        cnt = 0
        for i, (imgs, labels) in enumerate(data_loader):
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            loss = criterion(outputs, labels)
            total += imgs.size(0)
            _, argmax = torch.max(outputs, 1)
            correct += (labels == argmax).sum().item()
            total_loss += loss
            cnt += 1
        avrg_loss = total_loss / cnt
        print('Validation #{}  Accuracy: {:.2f}%  Average Loss: {:.4f}'.format(epoch, correct / total * 100, avrg_loss))
    model.train()
    return avrg_loss

### 테스트 함수

In [None]:
def test(model, data_loader, device):
    print('Start test..')
    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for i, (imgs, labels) in enumerate(data_loader):
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            _, argmax = torch.max(outputs, 1)    # max()를 통해 최종 출력이 가장 높은 class 선택
            total += imgs.size(0)
            correct += (labels == argmax).sum().item()

        print('Test accuracy for {} images: {:.2f}%'.format(total, correct / total * 100))
    model.train()

## 7. 모델 저장 

In [None]:
def save_model(model, saved_dir, file_name='best_model.pt'):
    os.makedirs(saved_dir, exist_ok=True)
    check_point = {
        'net': model.state_dict()
    }
    output_path = os.path.join(saved_dir, file_name)
    torch.save(check_point, output_path)

## 8. 모델 생성 및 Loss function, Optimizer 정의


In [None]:
torch.manual_seed(7777) # 일관된 weight initialization을 위한 random seed 설정
## 코드 시작 ##
model = SimpleCNN()          
model = model.to(device)

criterion = nn.CrossEntropyLoss()     
optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)    
## 코드 종료 ##

model = model.to(device)
val_every = 1
saved_dir = './saved/SimpleCNN'

## 9. Training

In [None]:
train(num_epochs, model, train_loader, criterion, optimizer, saved_dir, val_every, device)

## 10. 저장된 모델 불러오기 및 test

In [None]:
model_path = './saved/SimpleCNN/best_model.pt'
# model_path = './saved/pretrained/SimpleCNN/best_model.pt' # 모델 학습을 끝까지 진행하지 않은 경우에 사용
model = SimpleCNN().to(device)   # 아래의 모델 불러오기를 정확히 구현했는지 확인하기 위해 새로 모델을 선언하여 학습 이전 상태로 초기화

## 코드 시작 ##
checkpoint = torch.load(model_path, map_location=device)   
state_dict = checkpoint['net']   
model.load_state_dict(state_dict=state_dict)                
## 코드 종료 ##

<All keys matched successfully>

In [None]:
test(model, test_loader, device)

Start test..
Test accuracy for 2000 images: 76.50%


## 11. Transfer Learning(ResNet)


In [None]:
new_model = torchvision.models.resnet50(pretrained=True)

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomRotation(5),
        transforms.RandomHorizontalFlip(),
        transforms.RandomResizedCrop(224, scale=(0.96, 1.0), ratio=(0.95, 1.05)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize([224, 224]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

train_data = CatDogDataset(data_dir='./data/my_cat_dog', mode='train', transform=data_transforms['train'])
val_data = CatDogDataset(data_dir='./data/my_cat_dog', mode='val', transform=data_transforms['val'])
test_data = CatDogDataset(data_dir='./data/my_cat_dog', mode='test', transform=data_transforms['val'])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, drop_last=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, drop_last=True)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, drop_last=True)

In [None]:
for param in new_model.parameters():
    param.requires_grad = False

num_ftrs = new_model.fc.in_features
new_model.fc = nn.Linear(num_ftrs, 2)  
criterion = nn.CrossEntropyLoss()      
new_model = new_model.to(device)
optimizer = torch.optim.Adam(params=new_model.parameters(), lr=learning_rate)      
val_every = 1
saved_dir = './saved/ResNet50'

In [None]:
train(num_epochs, new_model, train_loader, criterion, optimizer, saved_dir, val_every, device)

In [None]:
model_path = './saved/ResNet50/best_model.pt'
# model_path = './saved/pretrained/ResNet50/best_model.pt' # 모델 학습을 끝까지 진행하지 않은 경우에 사용

checkpoint = torch.load(model_path) 
state_dict = checkpoint['net'] 
new_model.load_state_dict(state_dict=state_dict)        

<All keys matched successfully>

In [None]:
test(new_model, test_loader, device)

Start test..
Test accuracy for 2000 images: 98.05%
