# VGG 16

### 라이브러리 import

In [26]:
# !pip install opencv-python

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
import os
import pandas as pd
import cv2 # 이미지 처리를 도와주는 라이브러리
import numpy as np
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import json
import h5py
from torch.utils.data import DataLoader, Dataset
from torchsummary import summary

## 데이터 로드

In [None]:
# hdf5, json 파일 로드
hdf5_path = os.path.join(os.getcwd(), "data/butterfly_original.hdf5")
json_path = os.path.join(os.getcwd(), "data/butterfly_original_classes.json")

with h5py.File(hdf5_path, 'r') as hf:
    train_data = hf['train_data'][:]
    train_label = hf['train_label'][:]
    test_data = hf['test_data'][:]

with open(json_path, 'r') as j:
    train_classes = json.load(j)

# key값들이 str로 들어오기에 int로 변환해서 다시 저장
train_classes = {int(key):value for key, value in train_classes.items()}

In [None]:
# 이미지 샘플 확인
def imshow(images, labels, classes):
    plt.figure(figsize=(12, 8))

    for i in range(len(images)):
        plt.subplot(1, len(images), i+1)
        plt.imshow(images[i])
        plt.title(classes[labels[i]])
    plt.shwo()

In [None]:
imshow(train_data[:4], train_label[:4], train_classes)

In [None]:
# 데이터셋 생성 클래스
# init: self, image, label, classes, transform, target_transform, test
# len: train - image, label / test - image
# getitem: 정규화
class CustomImageDataset(Dataset):
    def __init__(self, image, label=None, classes=None, transform=None, target_transform=None, test=False):
        self.image = image
        self.label = label
        self.classes = classes
        self.transform = transform
        self.target_transform = target_transform
        self.test = test

    def __len__(self):
        if not(self.test):
            return len(list(zip(self.image, self.label)))
        else:
            return len(list(self.image))
        
    def __getitem__(self, index):
        if not(self.test):
            image = self.image[index]
            label = self.label[index]

            if self.transform:
                image = self.transform(image)
            if self.target_transform:
                label = self.transform(label)

            return image, label
        else:
            image = self.image[index]

            if self.transform:
                image = self.transform(image)

            return image

In [28]:
# 픽셀값들을 정규화 해줌
# 픽셀들은 0~255사이의 값을 가지기 때문에 값들간의 편차가 크다
# 따라서 값들간의 차이를 줄여 가중치의 영향을 줄여주기 위함
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [None]:
# train test dataset 객체 생성
train_dataset = CustomImageDataset(
    image=train_data,
    label=train_label,
    classes=train_classes,
    transform=transform,
    test=False
)

test_dataset = CustomImageDataset(
    image=test_data,
    transform=transform,
    test=True
)

In [None]:
# dataloader 생성
train_dataloader = DataLoader(dataset=train_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=16, shuffle=True)

### 모델 구현

In [None]:
class VGG_16(nn.Module):
    def __init__(self):
        super(VGG_16,self).__init__()
        self.convnet = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 224 -> 112

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 112 -> 56

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 56 -> 28
            
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 28 -> 14

            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            # 14 -> 7

            nn.Flatten(), # Flatten은 layer로 안쳐준다.
            nn.Dropout(.5),
            nn.Linear(in_features=512*7*7, out_features=4096),
            nn.ReLU(),
            nn.Dropout(.5),
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Linear(in_features=4096, out_features=1000)
        )
    def forward(self, x):
        return self.convnet(x)
    

### GPU 사용을 위한 코드

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

### 모델 객체 생성, 손실함수, 최적화 함수 설정

In [None]:
model = VGG_16().to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(params=model.parameters(), lr=1e-2, momentum=.9, weight_decay=5e-4)

### 모델 구조 확인

In [None]:
summary(model, (3, 224, 224))

### BestScoreSave

In [None]:
class BestScoreSave():
    
    def __init__(self, model_name, mode="accuracy", delta=0.0, save_mode="weight", vervose=True, save=False):
        self.best_score = 0 if mode == "accuracy" else np.inf   # mode가 accuracy이면 0, loss면 무한
        self.mode = mode                                        # loss : loss가 최소가 될때 저장, accuracy : accuracy가 제일 높을 때 저장
        self.save_mode = save_mode                              # weight : weight만 저장, model : model 구조와 weight 모두 저장
        self.delta = delta                                      # 작은 값 무시
        self.vervose = vervose                                  # Save 기록 출력할 것인지
        self.save = save                                        # save 할 것인지
        self.model_name = model_name                            # model명
        self.model_path = os.path.join(os.getcwd(), self.model_name)
        if not(os.path.isdir(self.model_path)):                 # 폴더 확인 후 없으면 생성
            os.mkdir(self.model_path)
        
    def __call__(self, score):
        if self.mode == "accuracy":
            if score > (self.best_score + self.delta):
                self.save = True
                self.best_score = score
            else:
                self.save = False
        elif self.mode == "loss":
            if score < (self.best_score - self.delta):
                self.save = True
                self.best_score = score
            else:
                self.save = False

### Early Stopping

In [None]:
class EarlyStopping:
    def __init__(self, patience=10, delta=0.0, mode='min', verbose=True):
        """
        patience (int): loss or score가 개선된 후 기다리는 기간. default: 3
        delta  (float): 개선시 인정되는 최소 변화 수치. default: 0.0
        mode     (str): 개선시 최소/최대값 기준 선정('min' or 'max'). default: 'min'.
        verbose (bool): 메시지 출력. default: True
        """
        self.early_stop = False
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        
        self.best_score = np.inf if mode == 'min' else 0
        self.mode = mode
        self.delta = delta
        

    def __call__(self, score):

        if self.best_score is None:
            self.best_score = score
            self.counter = 0
        elif self.mode == 'min':
            if score < (self.best_score - self.delta):
                self.counter = 0
                self.best_score = score
                if self.verbose:
                    print(f'[EarlyStopping] (Update) Best Score: {self.best_score:.5f}')
            else:
                self.counter += 1
                if self.verbose:
                    print(f'[EarlyStopping] (Patience) {self.counter}/{self.patience}, ' \
                          f'Best: {self.best_score:.5f}' \
                          f', Current: {score:.5f}, Delta: {np.abs(self.best_score - score):.5f}')
                
        elif self.mode == 'max':
            if score > (self.best_score + self.delta):
                self.counter = 0
                self.best_score = score
                if self.verbose:
                    print(f'[EarlyStopping] (Update) Best Score: {self.best_score:.5f}')
            else:
                self.counter += 1
                if self.verbose:
                    print(f'[EarlyStopping] (Patience) {self.counter}/{self.patience}, ' \
                          f'Best: {self.best_score:.5f}' \
                          f', Current: {score:.5f}, Delta: {np.abs(self.best_score - score):.5f}')
                
            
        if self.counter >= self.patience:
            if self.verbose:
                print(f'[EarlyStop Triggered] Best Score: {self.best_score:.5f}')
            # Early Stop
            self.early_stop = True
        else:
            # Continue
            self.early_stop = False

### 학습 함수

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    model.train()   # torch안의 module안에 train이라는 메소드가 있다.
                    # 학습이라는 것을 명시해주는 것
    train_loss = []
    
    for batch, data in enumerate(dataloader):
        image, label = data.to(device)

        # Compute prediction error 
        pred = model(image) # model.forward(image)
        loss = loss_fn(pred, label)

        # backpropagation
        loss.backward() # 쓸모없는 노드를 지운다. # 매개변수를 안정해주면 랜덤으로 잡고 경사하강을 한다.
        optimizer.step() # learning rate 만큼 내려간다.
        optimizer.zero_grad() # 초기화해주는데 해주는 이유는 이전 스텝을 기억하고있으면 다음 스텝에 영향을 줄 수 있기때문에 초기화한다.


        if not(batch % 100):
            print(f"loss : {loss.item():>7f}   [{(batch + 1) * len(image):>5d}/{len(dataloader):>5d}]")
            train_loss.append(loss.item())

### 테스트 함수

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval() #평가를 해주겠다 명시
    test_loss, correct = 0, 0
    
    with torch.no_grad(): #no_grad -> 최적화를 수행하지 않겠다.
        for image, label in dataloader:
            pred = model(image)
            # softmax = nn.Softmax(dim=1)
            # pred_probab = softmax(pred)
            test_loss += loss_fn(pred, label).item() #.item() -> loss_fn에 있는 데이터값을 불러준다.
            correct += (pred.argmax(1) == label).type(torch.float).sum().item()
                            #argmax --> 예측값중에 가장 큰 값을 뽑아오는 것
    
    test_loss /= num_batches
    correct /=size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

### 실행

In [None]:
epochs = 30
for i in range(epochs):
    print(f"Epoch {i + 1}\n------------------------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done")

Epoch 1
------------------------------------------------


ValueError: too many values to unpack (expected 2)