### 데이터 분할을 위한 폴더 생성

In [1]:
import os
import shutil
 
original_dataset_dir = '../dataset'   
classes_list = os.listdir(original_dataset_dir) # 해당 경로 하위에 있는 모든 폴더의 목록을 가져오는 메서드
 
base_dir = '../splitted' 
os.mkdir(base_dir)
 
train_dir = os.path.join(base_dir, 'train') 
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

for cls in classes_list:     
    os.mkdir(os.path.join(train_dir, cls))
    os.mkdir(os.path.join(validation_dir, cls))
    os.mkdir(os.path.join(test_dir, cls))

### 데이터 분할과 클래스별 데이터 수 확인

In [2]:
import math
 
for cls in classes_list:
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path) # path 위치에 존재하는 모든 이미지 파일의 목록을 변수 fnames에 저장
 
    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)
    
    train_fnames = fnames[:train_size]
    print("Train size(",cls,"): ", len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname) # 복사할 원본 파일의 경로 지정
        dst = os.path.join(os.path.join(train_dir, cls), fname) # 복사한 후 저장할 파일의 경로 지정
        shutil.copyfile(src, dst) # src의 경로에 해당하는 파일을 dst의 경로에 저장
        
    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size(",cls,"): ", len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)
        
    test_fnames = fnames[(train_size+validation_size):(validation_size + train_size +test_size)]

    print("Test size(",cls,"): ", len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)

### 베이스라인 모델 학습을 위한 준비

In [8]:
import torch
import os

# MPS 지원 여부 확인
USE_MPS = torch.backends.mps.is_available()  # MPS 지원 여부
USE_CUDA = torch.cuda.is_available()        # CUDA 지원 여부

# 디바이스 선택 (MPS > CUDA > CPU 순으로 우선 선택)
if USE_MPS:
    DEVICE = torch.device("mps")
elif USE_CUDA:
    DEVICE = torch.device("cuda")
else:
    DEVICE = torch.device("cpu")

BATCH_SIZE = 256
EPOCH = 30
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder 

# transforms.Compose : 이미지 데이터의 전처리, Augmentation 등의 과정에서 사용되는 메서드
# transforms.Resize : 이미지 크기를 조정
# transforms.ToTensor : 이미지를 Tensor 형태로 변환하고, 모든 값을 0에서 1 사이로 정규화
# ImageFolder : 데이터셋을 불러오는 메서드. 하나의 클래스가 하나의 폴더에 대응되는 구조의 데이터셋을 불러올 때 사용 (transform = 전처리 또는 Augmentation 방법 지정)
transform_base = transforms.Compose([transforms.Resize((64,64)), transforms.ToTensor()]) 
train_dataset = ImageFolder(root='../splitted/train', transform=transform_base) 
val_dataset = ImageFolder(root='../splitted/val', transform=transform_base)

from torch.utils.data import DataLoader

# DataLoader : 불러온 이미지 데이터를 주어진 조건에 따라 미니 배치 단위로 분리하는 역할
# shuffle = True : 데이터의 순서가 섞여 모델이 학습할 때 Label 정보의 순서를 기억하는 것을 방지할 수 있음
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

### 베이스라인 모델 설계

In [9]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
 
class Net(nn.Module): 
  
    def __init__(self):  # 사용할 모든 Layer를 정의
    
        super(Net, self).__init__() # nn.Module에 있는 메서드를 상속받아 사용

        self.conv1 = nn.Conv2d(3, 32, 3, padding=1) # (입력 채널 수, 출력 채널 수, 커널 크기)
        self.pool = nn.MaxPool2d(2,2)   # (커널 크기, Stride)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)  
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)  

        self.fc1 = nn.Linear(4096, 512) # Flatten 이후 사용할 Fully Connected Layer 정의
        self.fc2 = nn.Linear(512, 33) 
    
    def forward(self, x):  #모델이 학습 데이터를 입력받아 Forward Propagation을 실행시켜 Output을 계산하는 과정
    
        x = self.conv1(x) # conv1 layer를 통해 Feature Map 생성
        x = F.relu(x)  # 생성된 Feature Map에 활성 함수 적용
        x = self.pool(x) # MaxPooling
        x = F.dropout(x, p=0.25, training=self.training)  # 25% 노드를 Dropout

        x = self.conv2(x)
        x = F.relu(x) 
        x = self.pool(x) 
        x = F.dropout(x, p=0.25, training=self.training)

        x = self.conv3(x) 
        x = F.relu(x) 
        x = self.pool(x) 
        x = F.dropout(x, p=0.25, training=self.training)

        x = x.view(-1, 4096)  # 생성된 Feature Map을 1차원으로 Flatten
        x = self.fc1(x) 
        x = F.relu(x) 
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.fc2(x) 

        return F.log_softmax(x, dim=1)  # softmax() 함수를 적용하여 데이터가 각 클래스에 속할 확를을 Output값으로 출력

model_base = Net().to(DEVICE)  # 정의한 CNN 모델 Net()의 새로운 객체 생성
optimizer = optim.Adam(model_base.parameters(), lr=0.001) 

### 모델 학습을 위한 함수

In [10]:
def train(model, train_loader, optimizer):
    model.train()  # 입력받는 모델을 학습 모드로 설정
    for batch_idx, (data, target) in enumerate(train_loader): 
        data, target = data.to(DEVICE), target.to(DEVICE) 
        optimizer.zero_grad() # 이전 배치의 gradient 초기화
        output = model(data)  
        loss = F.cross_entropy(output, target) 
        loss.backward() # loss값을 바탕으로 back propagation을 통해 계산한 gradient값을 각 parameter에 할당
        optimizer.step()  # 각 parameter에 할당된 gradient 값을 이용해 모델의 parameter를 업데이트

### 모델 평가를 위한 함수

In [6]:
def evaluate(model, test_loader):
    model.eval()  
    test_loss = 0 
    correct = 0   
    
    with torch.no_grad(): # 모델 평가 단계에서는 parameter를 업데이트하면 않아야 하므로 업데이트 중단
        for data, target in test_loader:  
            data, target = data.to(DEVICE), target.to(DEVICE)  
            output = model(data) 
            
            test_loss += F.cross_entropy(output,target, reduction='sum').item() 
 
            
            pred = output.max(1, keepdim=True)[1] # 가장 높은 값을 가진 인덱스를 예측값으로 저장
            # target.view_as(pred) : target Tensor 구조를 pred Tensor와 같은 모양으로 정렬
            # eq : pred와 target.view_as(pred)의 값이 일치하면 1, 그렇지 않으면 0
            correct += pred.eq(target.view_as(pred)).sum().item() 
   
    test_loss /= len(test_loader.dataset) 
    test_accuracy = 100. * correct / len(test_loader.dataset) 
    return test_loss, test_accuracy  

### 모델 학습 실행하기

In [7]:
import time
import copy
 
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  
    best_model_wts = copy.deepcopy(model.state_dict()) # 정확도가 가장 높은 모델을 저장할 변수
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()  
        train(model, train_loader, optimizer) 
        train_loss, train_acc = evaluate(model, train_loader) 
        val_loss, val_acc = evaluate(model, val_loader)
        
        if val_acc > best_acc: 
            best_acc = val_acc 
            best_model_wts = copy.deepcopy(model.state_dict()) # 해당 Epoch 모델을 best_model_wts에 저장
        
        time_elapsed = time.time() - since 
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 
    model.load_state_dict(best_model_wts)  
    return model
 

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)  # baseline 모델 학습
torch.save(base,'baseline.pt')

KeyboardInterrupt: 

### VisonTransformer를 위한 준비

In [28]:
VIT_BATCH_SIZE = 64

vit_transform_base = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.5],[0.5])
])

vit_train_dataset = ImageFolder(root = '../splitted/train',transform=vit_transform_base)
vit_val_dataset = ImageFolder(root = '../splitted/val',transform=vit_transform_base)

vit_train_loader = DataLoader(vit_train_dataset, batch_size=VIT_BATCH_SIZE, shuffle=True, num_workers=4)
vit_val_loader = DataLoader(vit_val_dataset, batch_size=VIT_BATCH_SIZE, shuffle=True, num_workers=4)

### VisonTransformer 모델 설계

In [29]:
from torchvision.models.vision_transformer import vit_b_16

class ViTClassifier(nn.Module):
    def __init__(self, num_classes = 33):
        super(ViTClassifier, self).__init__()
        self.vit = vit_b_16(weights = None)
        self.vit.heads = nn.Linear(768, num_classes)
    
    def forward(self,x):
        return self.vit(x)

model_vit = ViTClassifier(num_classes=len(vit_train_dataset.classes)).to(DEVICE)
optimizer = optim.Adam(model_vit.parameters(), lr = 0.001)

### VisonTransformer 학습 함수

In [30]:
def vit_train(model, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = nn.CrossEntropyLoss()(output, target)
        loss.backward()
        optimizer.step()

### VisonTransformer 평가 함수

In [31]:
def vit_evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data,target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            test_loss += nn.CrossEntropyLoss()(output, target).item()
            pred = output.argmax(dim = 1, keepdim = True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    test_loss /= len(test_loader.dataset)
    test_accuracy = 100 * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

### VisonTransformer 모델 학습 하기

In [32]:
def vit_train_baseline(model, train_loader, val_loader, optimizer, num_epochs = EPOCH):
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())

    for epoch in range(1, num_epochs + 1):
        since = time.time()
        train(model, train_loader, optimizer)
        train_loss, train_acc = evaluate(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)
        
        if val_acc > best_acc:
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since
        print(f'-------------- epoch {epoch} ----------------')
        print(f'train Loss: {train_loss:.4f}, Accuracy: {train_acc:.2f}%')
        print(f'val Loss: {val_loss:.4f}, Accuracy: {val_acc:.2f}%')
        print(f'Completed in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    
    model.load_state_dict(best_model_wts)
    return model

vit_model = vit_train_baseline(model_vit, vit_train_loader, vit_val_loader, optimizer, EPOCH)
torch.save(vit_model, 'vit_model.pt')

RuntimeError: MPS backend out of memory (MPS allocated: 20.34 GB, other allocations: 6.14 MB, max allowed: 20.40 GB). Tried to allocate 110.81 MB on private pool. Use PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0 to disable upper limit for memory allocations (may cause system failure).

### Transfer Learning을 위한 준비

In [52]:
data_transforms = {
    'train': transforms.Compose([transforms.Resize([64,64]), 
        transforms.RandomHorizontalFlip(), # 이미지 무작위 좌우 반전, 파라미터 p를 입력하여 반전되는 이미지 비율 설정(기본값 0.5)
        transforms.RandomVerticalFlip(),  # 이미지 무작위 상하 반전
        transforms.RandomCrop(52), # 이미지의 일부를 랜덤하게 잘라내어 52*52사이즈로 변경 (이미지의 가운데 부분, 오른쪽 위 또는 아래 부분만 선택 가능), Centercrop, Fivecrop등 다양한 Crop 존재
        transforms.ToTensor(), 
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]), # 정규화 시행 첫 번째 대괄호 -> Red,Green,Blue 채널 값에서 정규화를 적용할 평균값, 두번쨰 -> 표준편차 값
        # 사용된 평균과 표준편차 값은 Pre-Trained-Model의 학습에 사용된 ImageNet 데이터의 값
    
    'val': transforms.Compose([transforms.Resize([64,64]),  
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}

In [53]:
data_dir = '../splitted' 
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), transform=data_transforms[x]) for x in ['train', 'val']} 
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=BATCH_SIZE, shuffle=True, num_workers=4) for x in ['train', 'val']} # 불러온 이미지를 미니배치 단위로 분리 
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

### Pre-Trained Model 불러오기

In [54]:
from torchvision import models
 
resnet = models.resnet50(pretrained=True)  # ResNet50 모델을 불러오고 pretrained=True를 통해 미리 학습된 모델의 Parameter값을 그대로 가져옴/ False이면 모델만 가져오고 Parameter 값은 랜덤으로 설정
num_ftrs = resnet.fc.in_features # 불러온 resnet50은 다른 주제를 위해 설계되어 마지막 Layer의 출력 채널 수가 33개 아님. 이를 위해 이 프로젝트 모델의 마지막 Fully Connected Layer 대신 출력 채널 수가 33개인 새로운 Layer 추가 예정
resnet.fc = nn.Linear(num_ftrs, 33)  # 불러온 모델의 마지막 Fully Connected Layer를 새로운 Layer로 교체
resnet = resnet.to(DEVICE)
 
criterion = nn.CrossEntropyLoss() # 모델 학습 시 사용할 Loss함수를 지정하는 변수
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001) # 앞에 baseline model은 모든 파라미커를 업데이트했지만, 이 모델은 일부 Layer의 파라미터만 업데이트해야함, requires_grad=True로 설정된 Layer의 파라미터에만 적용
 
from torch.optim import lr_scheduler
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) # StepLR 메서드는 Epoch에 따라 Learning Rate를 변경하는 역할
# step_size = 7, gamma = 0.1로 설정하면 7 EPoch마다 0.1씩 곱해 learning rate를 감소시킨다는 의미



### Pre-Trained Model의 일부 Layer Freeze하기

In [55]:
ct = 0 # 해당 Layer가 몇번째인지 나타내는 변수

# children : 모델의 자식 모듈을 반복 가능한 객체로 반환하는 메서드
for child in resnet.children():  # 생성한 resnet 모델의 모든 Layer 정보를 담고 있음
    ct += 1  
    if ct < 6: # ResNet50에 존재하는 10개의 Layer 중에서 1번 부터 5번 Layer의 파라미터는 업데이트되지 않도록 고정하고, 6번부터 10번은 학습 과정에서 파라미터 업데이트 되도록 설정
        for param in child.parameters(): # child.parameters() : 각 Layer의 Parameter Tensor 의미. 각 Tensor에는 requires_grad 옵션이 있고, 기본값은 True로 설정
            param.requires_grad = False # 파라미터가 업데이트 되지 않도록 설정한다는 의미 layer 번호가 6보다 작으면 파라미터가 업데이트되지 않도록 설정

### Transfer Learning 모델 학습과 검증을 위한 함수

In [56]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):

    best_model_wts = copy.deepcopy(model.state_dict())  
    best_acc = 0.0  
    
    for epoch in range(num_epochs):
        print('-------------- epoch {} ----------------'.format(epoch+1)) 
        since = time.time()                                     
        for phase in ['train', 'val']: 
            if phase == 'train': 
                model.train() 
            else:
                model.eval()     
 
            running_loss = 0.0  
            running_corrects = 0  
 
            
            for inputs, labels in dataloaders[phase]: # 현재 모드에 해댱하는 Dataloader에서 데이터를 입력받음
                inputs = inputs.to(DEVICE)  
                labels = labels.to(DEVICE)  
                
                optimizer.zero_grad() 
                
                with torch.set_grad_enabled(phase == 'train'):  # 학습 단계에서만 모델의 gradient를 업데이트, 검증 단계에서는 업데이트하지 않도록 함
                    outputs = model(inputs)  
                    _, preds = torch.max(outputs, 1) 
                    loss = criterion(outputs, labels)  
    
                    if phase == 'train':   
                        loss.backward()
                        optimizer.step()
 
                running_loss += loss.item() * inputs.size(0) # inputs.size(0) : Dataloader에서 전달되는 미니 배치의 데이터 수 의미(배치사이즈)
                running_corrects += torch.sum(preds == labels.data)  # 모델을 통해 예측한 값이 Target과 같으면 running_corrects를 1만큼 증가시키고, 그렇지 않으면 증가시키지 않음
            if phase == 'train':  
                scheduler.step()
 
            epoch_loss = running_loss/dataset_sizes[phase]  
            epoch_acc = running_corrects.float()/dataset_sizes[phase]  
 
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc)) 
 
          
            if phase == 'val' and epoch_acc > best_acc: 
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
 
        time_elapsed = time.time() - since  
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
 
    model.load_state_dict(best_model_wts) 

    return model

### 모델 학습 실행

In [57]:
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH) 

torch.save(model_resnet50, 'resnet50.pt')

-------------- epoch 1 ----------------
train Loss: 0.5775 Acc: 0.8277
val Loss: 0.3133 Acc: 0.9067
Completed in 1m 23s
-------------- epoch 2 ----------------
train Loss: 0.2215 Acc: 0.9287
val Loss: 0.2581 Acc: 0.9219
Completed in 1m 21s
-------------- epoch 3 ----------------
train Loss: 0.1746 Acc: 0.9431
val Loss: 0.2387 Acc: 0.9294
Completed in 1m 20s
-------------- epoch 4 ----------------
train Loss: 0.1409 Acc: 0.9544
val Loss: 0.1920 Acc: 0.9397
Completed in 1m 20s
-------------- epoch 5 ----------------
train Loss: 0.1184 Acc: 0.9613
val Loss: 0.1950 Acc: 0.9409
Completed in 1m 21s
-------------- epoch 6 ----------------
train Loss: 0.1076 Acc: 0.9651
val Loss: 0.1519 Acc: 0.9522
Completed in 1m 21s
-------------- epoch 7 ----------------
train Loss: 0.0919 Acc: 0.9707
val Loss: 0.1206 Acc: 0.9619
Completed in 1m 21s
-------------- epoch 8 ----------------
train Loss: 0.0463 Acc: 0.9849
val Loss: 0.0493 Acc: 0.9829
Completed in 1m 20s
-------------- epoch 9 ----------------


### 베이스라인 모델 평가를 위한 전처리

In [58]:
transform_base = transforms.Compose([transforms.Resize([64,64]),transforms.ToTensor()])
test_base = ImageFolder(root='../splitted/test',transform=transform_base)  
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

### VisionTransformer 모델 평가를 위한 전처리

In [None]:
test_dataset = ImageFolder(root='../splitted/test', transform=transform_base)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

### Transfer Learning 모델 평가를 위한 전처리

In [59]:
transform_resNet = transforms.Compose([
        transforms.Resize([64,64]),  
        transforms.RandomCrop(52),  
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
    ])
    
test_resNet = ImageFolder(root='../splitted/test', transform=transform_resNet) 
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

### 베이스라인 모델 성능 평가하기

In [60]:
baseline=torch.load('baseline.pt') 
baseline.eval()  
test_loss, test_accuracy = evaluate(baseline, test_loader_base)

print('baseline test acc:  ', test_accuracy)

baseline test acc:   93.41594692702466


### Vision Transformer 모델 성능 평가

In [None]:
vit_model = torch.load('vit_model.pt')
vit_model.eval()
test_loss, test_accuracy = evaluate(vit_model, test_loader)
print('ViT test acc: ',test_accuracy)

### Transfer Learning 모델 성능 평가하기

In [61]:
baseline=torch.load('baseline.pt') 
baseline.eval()  
test_loss, test_accuracy = evaluate(baseline, test_loader_base)

print('baseline test acc:  ', test_accuracy)

baseline test acc:   93.41594692702466


### 결과

모델을 직접 구축하여 처음부터 학습시키는 것보다 많은 양의 데이터셋으로 미리 학습된 모델을 불러와 일부를 Fine-Tuning하는 것이 더 높은 예측 성능을 낸다고 볼 수 있음

이는 우리가 가지고 있는 데이터의 수가 약 40,000개로 적지 않음. 베이스라인 모델도 어느 정도 좋은 성능을 보였음 하지만 Pre-Trained 모델은 약 1,400만 개의 이미지를 학습해놓은 모델이고, 이 모델에는 다양한 이미지의 Feature가 학습되어 있음. 

베이스라인 모델은 초기 파라미터는 랜덤으로 설정되는 반면, Pre-Trained Model의 초기 파라미터는 수백만장의 이미지를 통해 미리 학습해 놓은 모델의 파라미터이다. 따라서 Pre-Trained Model이 더 좋은 성능을 나타낼 가능성이 높음