In [1]:
import torch
import os

USE_CUDA = torch.cuda.is_available()
if USE_CUDA :
    DEVICE = torch.device('cuda')
else :
    DEVICE = torch.device('cpu')
BATCH_SIZE = 256
EPOCH = 30

In [2]:
# 모델 설계
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import models


- 특징 추출 부분(초기 Conv 레이어들)을 동결하고, 출력층(FC Layer)만 재학습
- AdaptiveAvgPool2d
1)텐서의 공간 차원(Height, Width)을 평균 연산으로 고정된 크기로 변환.
2) 공간 정보를 축소하거나, CNN 출력 크기를 Fully Connected Layer 입력으로 변환하기 전에 사용.
3) 예: 입력 크기가 (Batch, Channels, Height, Width)일 때 Height와 Width를 원하는 고정 값으로 줄임.

In [None]:
# resnet = models.vgg16(pretrained=True)
resnet = models.resnet152(pretrained=True)
resnet

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to C:\Users\ilhan/.cache\torch\hub\checkpoints\vgg16-397923af.pth


  0%|          | 0.00/528M [00:00<?, ?B/s]

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [3]:
# 이미지 데이터를 텐서 객체로 변환하기 위한 객체를 생성한다.
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
# 이미지 데이터를 64x64사이즈 크기의 텐서 객체로 만들어주는 도구
# Compose : 리스트 안에 들어있는 변환기 순서대로 데이터를 통과시켜 만들어준다.
# 이미지를 64x64 사이즈로 변환 -> 텐서객체로 생성
transform_base = transforms.Compose([transforms.Resize((64, 64)), 
                                     transforms.ToTensor()])
# 학습용 이미지 관리 객체
train_dataset = ImageFolder(root='./splitted/train/', transform=transform_base)
# 검증용 이미지 관리 객체
val_dataset = ImageFolder(root='./splitted/val/', transform=transform_base)

In [4]:
# 데이터 로더를 생성한다.
# num_workers : 작업시 멀티코어를 이용하겠다는 의미(4 : 4개의 코어에 나눠서 병렬처리)
# 만약 멀티코어가 지원되지 않는 컴퓨터 환경이면 오류가 발생한다. 만약 이렇다면
# num_workers를 지워주세요
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE,
                                           shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE,
                                         shuffle=True, num_workers=4)

In [None]:

class Net(nn.Module) :

    def __init__(self) :
        super(Net, self).__init__()
        # resnet 152 사전학습 모델
        # pretrained : True를 넣어주면 1400만장의 이미지를 학습해서 얻은
        # Conv 레이어의 커널의 가중치가 설정되어 있는 상태로 받아올 수 있다.
        self.resnet = models.resnet152(pretrained=True)
        # resnet이 가지고 있는 모든 가중치에 대해 동결시킨다.(학습때 참여하지 않는다)
        for param in self.resnet.parameters():
            param.requires_grad = False

        # for param in self.resnet.layer4.parameters():
        #     param.requires_grad = True
        # for param in self.resnet.fc.parameters():
        #     param.requires_grad = True

        # for param in self.resnet.layer4[1].parameters():
        #     param.requires_grad = True


        # resnet 이 전달해 주는 데이터의 개수
        cnt = self.resnet.fc.in_features

        # resnet의 출력층을 변경해준다.
        self.resnet.fc = nn.Linear(in_features=cnt, out_features=33)

    def forward(self, x) :
        x = self.resnet(x)

        return F.log_softmax(x, dim=1)
        

In [6]:
# 학습 모델 객체 생성한다.
model_base = Net().to(DEVICE)
# 경사 하강법
optimizer = optim.Adam(model_base.parameters(), lr=0.001)

In [7]:
# 학습 함수
def train(model, train_loader, optimizer) :
    model.train()
    for data, target in train_loader :
        data = data.to(DEVICE)
        target = target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward()
        optimizer.step()

In [8]:
# 모델 평가 함수
def evaluate(model, test_loader) :
    model.eval()
    # 오차값을 담을 변수
    test_loss = 0
    # 얼마나 잘 맞췄지..
    correct = 0

    # 경사 하강법이 동작하지 않는 부분
    with torch.no_grad() :
        # 데이터의 수 만큼 반복한다.
        for data, target in test_loader :
            data = data.to(DEVICE)
            target = target.to(DEVICE)
            output = model(data)
            # 오차를 누적한다.
            test_loss += F.cross_entropy(output, target, reduction='sum').item()
            # 맞춘 개수를 누적한다.
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()
            
    # 누적한 오차를 데이터의 수 만큼 나줘서 평균 오차를 구한다.
    test_loss /= len(test_loader.dataset)
    # 맞춘 개수를 전체 개수로 나눠 맞춘 확률을 구한다.
    test_accuracy = 100.0 * correct / len(test_loader.dataset)

    return test_loss, test_accuracy

In [9]:
# 모델을 학습 하는 함수
import time
import copy

def train_baseline(model, train_loader, val_loader, optimizer, num_epochs=30) :
    # 학습 중 측정하는 정확도 최대치를 담을 변수
    best_acc = 0
    # 모델의 성능이 개선되었을 때만 학습모델의 가중치 데이터를 담을 변수
    best_model_wts = copy.deepcopy(model.state_dict())
    # 학습 횟수 만큼 반복한다.
    for epoch in range(1, num_epochs + 1) :
        # 학습 시작 시간을 담아준다.
        since = time.time()
        # 학습 함수 호출
        train(model, train_loader, optimizer)
        # 평가 함수 호출
        train_loss, train_acc = evaluate(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)

        # 검증용 데이터에 대한 평가수치가 이전에 가장 좋았던 것 보다 더 좋아지면..
        if val_acc > best_acc :
            # 최고 평가 점수 변수에 덮어 씌워준다.
            best_acc = val_acc
            # 학습 모델의 가중치 데이터를 덮어씌워준다.
            best_model_wts = copy.deepcopy(model.state_dict())
            
        
        # 현재시간에서 학습 시작시간을 빼서 얼마나 걸렸는지 구한다.
        time_elapsed = time.time() - since
        print(f'{epoch} 번째 학습')
        print(f'학습 데이터 오차 : {train_loss}, 학습 데이터 정확도 : {train_acc}')
        print(f'검증 데이터 오차 : {val_loss}, 검증 데이터 정확도 : {val_acc}')
        print(f'학습 소요 시간 : {time_elapsed}')
        print('--------------')

    # 학습이 완료되면 지금까지에서 얻은 최상의 가중치를 학습 모델에 넣어준다.
    model.load_state_dict(best_model_wts)
    return model

In [None]:
# 학습
# base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)
base = train_baseline(model_base, train_loader, val_loader, optimizer, 2)
# 학습이 완료된 모델을 저장한다.
torch.save(base, 'resnet152.pt')

In [None]:
# 예측
test_dataset = ImageFolder(root='./splitted/test/', transform=transform_base)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE,
                                         shuffle=True, num_workers=4)
data_iter = iter(test_loader)   # DataLoader의 iterator 생성
batch_x, batch_y = next(data_iter)  # 첫 번째 배치 가져오기
# 데이터 출력
print("Feature (x):", batch_x)       # 입력 데이터
print("Label (y):", batch_y)         # 정답 라벨
print("Feature shape:", batch_x.shape)  # 배치 크기 확인
print("Label shape:", batch_y.shape)

In [None]:
# 예측
test_dataset = ImageFolder(root='./splitted/test/', transform=transform_base)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE,
                                         shuffle=True, num_workers=4)

model = torch.load('resnet152.pt')

with torch.no_grad() :
    result = []
    # 데이터의 수 만큼 반복한다.
    for data, target in test_loader :
        data = data.to(DEVICE)
        target = target.to(DEVICE)
        output = model(data)
        # 결과를 예측한다.
        pred = output.max(1, keepdim=True)[1]
        pred = pred.cpu().numpy()
        result = result + pred.reshape(-1).tolist()

print(result)

In [None]:
# 결과를 원래의 이름으로 변경한다.
results_data = []
# 각 이름별 숫자값을 가지고 있는 딕셔너리를 추출한다.
classes_dict = test_dataset.class_to_idx
# 이름만 추출한다.
classes_names = list(classes_dict.keys())
# 예측한 결과의 수 만큼 반복한다.
for r1 in result :
    # 이름에서 결과번째 이름을 가져온다.
    n1 = classes_names[r1]
    # 결과에 담아준다.
    results_data.append(n1)

print(results_data)