In [14]:
import torch
import torchvision
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
import os
import urllib.request
import tarfile

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import argparse
import numpy as np
import time
import pickle
from copy import deepcopy # Add Deepcopy for args
import seaborn as sns
import matplotlib.pyplot as plt
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

In [16]:
# CIFAR-100 데이터셋 다운로드
url = "https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz"
filename = "cifar-100-python.tar.gz"

if not os.path.exists(filename):
    urllib.request.urlretrieve(url, filename)

# 압축 풀기
with tarfile.open(filename, 'r:gz') as tar:
    tar.extractall()

# 데이터 불러오기
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

# CustomCIFAR100 클래스 정의
class CustomCIFAR100(Dataset):
    def __init__(self, root, train=True, transform=None):
        self.root = os.path.abspath(root)  # root를 절대 경로로 설정
        self.train = train
        self.transform = transform

        data_file_name = "train" if self.train else "test"
        data_file = os.path.join(self.root, "cifar-100-python", data_file_name)

        with open(data_file, 'rb') as fo:
            data_dict = pickle.load(fo, encoding='bytes')
            self.data = data_dict[b'data']
            self.fine_labels = data_dict[b'fine_labels']

    #데이터셋의 전체 길이 반환
    def __len__(self):
        return len(self.data)

    # 주어진 index에 대한 data smaple 반환
    def __getitem__(self, idx):
      image = self.data[idx]
      label = self.fine_labels[idx]

      # 이미지 차원 확인
      if len(image.shape) == 1:

          # 이미지 차원이 1인 경우, 예를 들어 (3072,)인 경우 (32, 32, 3)으로 변경
          image = image.reshape(3, 32, 32).transpose(1, 2, 0)

      #이미지 변환이 지정되어 있다면 해당 변환 적용
      if self.transform:
          image = self.transform(image)

      return image, label


# 이미지에 대한 변환을 정의
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # 이미지를 [-1, 1] 범위로 정규화합니다.
])

# 메타데이터 불러오기
meta = unpickle('cifar-100-python/meta')
fine_label_names = [t.decode('utf8') for t in meta[b'fine_label_names']]

# 훈련 데이터 불러오기
train = unpickle('cifar-100-python/train')

train_data = train[b'data']
train_labels = train[b'fine_labels']

# train과 validation data set 나누기 (8:2 비율)
# test_size 0.2가 validation에 20%를 사용하겠다는 의미.
# random_state : 데이터 분할 시 사용되는 무작위성을 제어
x_train, x_val, y_train, y_val = train_test_split(train[b'data'], train[b'fine_labels'], test_size=0.2, random_state=50)

# train dataset 정의
train_dataset =  CustomCIFAR100(root='/content', train=True, transform=transform)

# validation dataset 정의
val_dataset = CustomCIFAR100(root='/content', train=False, transform=transform)


# 훈련 세션에 사용할 데이터 partition
partition = {'train': train_dataset, 'val': val_dataset}

In [17]:
# VGGNet을 정의하는 dictionary
# 숫자: convolutional layer의 출력 채널 수, M: Max pooling layer
cfg = {
        'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M']
}

In [18]:
class CNN(nn.Module):

    def __init__(self, model_code, in_channels, out_dim, act, use_bn=True):
        super(CNN, self).__init__()

        # Activation 함수 선택
        if act == 'relu':
            self.act = nn.ReLU()
        elif act == 'sigmoid':
            self.act = nn.Sigmoid()
        elif act == 'tanh':
            self.act = nn.TanH()
        else:
            raise ValueError("Not a valid activation function code")

        # layers 생성하는 함수 호출
        self.layers = self._make_layers(model_code, in_channels, use_bn)

        # 분류를 위한 FC layer
        self.classifier = nn.Sequential(nn.Linear(512, 256),
                                        nn.ReLU(),
                                        nn.Linear(256, out_dim))

    # 모델의 forward 연산 정의
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

    def _make_layers(self, model_code, in_channels, use_bn):
        layers = []
        for x in cfg[model_code]:
            if x == 'M':
                # MaxPooling 레이어 생성
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            else:
                # Convolution layer 구성
                layers += [nn.Conv2d(in_channels=in_channels,
                                     out_channels=x,  # out_channels은 forward 연산 이후 x
                                     kernel_size=3,   # CNN kernel_size
                                     stride=1,        # CNN kernel의 stride
                                     padding=1)]      # CNN kernel의 padding

                # BN(Batch Normalization을 사용하는 경우 아래를 사용)
                if use_bn:
                    layers += [nn.BatchNorm2d(x)]
                layers += [self.act]   # 레이어 이후 활성화 함수 적용
                in_channels = x        # 다음 들어가야하는 레이어에 입력 채널 수 업데이트

        return nn.Sequential(*layers)


In [19]:
def train(net, partition, optimizer, criterion, args):

    #데이터 로더 설정
    trainloader = torch.utils.data.DataLoader(partition['train'],
                                              batch_size=args.train_batch_size,
                                              shuffle=True, num_workers=2)
    # 네트워크를 학습 모드로 설정
    net.train()

    correct = 0
    total = 0
    train_loss = 0.0

    # DateLoader에서 미니배치 단위로 데이터를 가져와 학습 수행
    for i, data in enumerate(trainloader, 0):
        optimizer.zero_grad()

        # 미니배치에서 입력과 정답 가져오기
        inputs, labels = data
        inputs = inputs.cuda()
        labels = labels.cuda()

        # Foward
        outputs = net(inputs)

        # 손실 함수 계산
        loss = criterion(outputs, labels)

        # Backward 및 최적화
        loss.backward()
        optimizer.step()

        # loss 및 정확도 계산
        train_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    # 전체 데이터에 대한 평균 loss와 정확도 계산
    train_loss = train_loss / len(trainloader)
    train_acc = 100 * correct / total

    # 네트워크, train loss, train acuuracy 반환
    return net, train_loss, train_acc



In [20]:
def validate(net, partition, criterion, args):
    # DataLoader를 사용하여 validation data set load
    valloader = torch.utils.data.DataLoader(partition['val'],
                                            batch_size=args.test_batch_size,
                                            shuffle=False, num_workers=2)

    # 네트워크를 평가 모드로 설정
    net.eval()

    correct = 0
    total = 0
    val_loss = 0

    # gradient계산 비활성화 -> 평가 모드에서는 필요 없음
    with torch.no_grad():

        for data in valloader:
            images, labels = data
            images = images.cuda()
            labels = labels.cuda()

            # Forward
            outputs = net(images)

            # loss 계산
            loss = criterion(outputs, labels)

            # loss 누적
            val_loss += loss.item()

            # 정확도 계산
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # 평균 loss 및 accuracy 계산
        val_loss = val_loss / len(valloader)
        val_acc = 100 * correct / total

    # validation loss와 accuary 반환
    return val_loss, val_acc

In [24]:
def experiment(partition, args):

    # CNN model 지정
    net = CNN(model_code = args.model_code,
              in_channels = args.in_channels,
              out_dim = args.out_dim,
              act = args.act,
              use_bn = args.use_bn
              )

   # GPU에 모델 할당하기
    net.cuda()

    # 손실함수 Cross-Entropy loss 설정
    criterion = nn.CrossEntropyLoss()

    # Optimizer 선택
    if args.optim == 'SGD':
        optimizer = optim.SGD(net.parameters(), lr=args.lr, weight_decay=args.l2)
    elif args.optim == 'RMSprop':
        optimizer = optim.RMSprop(net.parameters(), lr=args.lr, weight_decay=args.l2)
    elif args.optim == 'Adam':
        optimizer = optim.Adam(net.parameters(), lr=args.lr, weight_decay=args.l2)
    else:
        raise ValueError('In-valid optimizer choice')

    # train 및 validation loss/accuracy를 저장할 리스트 생성
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []

    for epoch in range(args.epoch):  # 데이터 셋을 여러번 반복하는 epoch loof
        ts = time.time()

        # train 함수 호출하여 모델 훈련
        net, train_loss, train_acc = train(net, partition, optimizer, criterion, args)

        # validation 함수 호출하여 모델 평가
        val_loss, val_acc = validate(net, partition, criterion, args)
        te = time.time()

        # train, validation loss/accuracy를 리스트에 추가
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accs.append(train_acc)
        val_accs.append(val_acc)

        # epoch별 train 및 validation 결과 출력
        print('Epoch {}, Acc(train/val): {:2.2f}/{:2.2f}, Loss(train/val) {:2.2f}/{:2.2f}. Took {:2.2f} sec'.format(epoch, train_acc, val_acc, train_loss, val_loss, te-ts))

    # 결과를 dictionary에 초기화 및 저장
    result = {}
    result['train_losses'] = train_losses
    result['val_losses'] = val_losses
    result['train_accs'] = train_accs
    result['val_accs'] = val_accs
    result['train_acc'] = train_acc
    result['val_acc'] = val_acc

    # 모델의 최종 가중치 저장
    weight_save_path = f"saved_weights_final_epoch_{args.epoch}.pth"
    torch.save(net.state_dict(), weight_save_path)

    # 결과 반환
    return vars(args), result

In [25]:
import hashlib
import json
from os import listdir
from os.path import isfile, join
import pandas as pd

def save_exp_result(setting, result):
    # setting dictionary에서 'exp_name', 'epoch', 'test_batch_size'키 삭제
    exp_name = setting['exp_name']
    del setting['epoch']
    del setting['test_batch_size']

    # setting 딕셔너리를 해시하여 고유한 키 생성 (앞 6자리만 사용)
    hash_key = hashlib.sha1(str(setting).encode()).hexdigest()[:6]

    # 결과 파일 경로 및 이름 설정
    filename = './results/{}-{}.json'.format(exp_name, hash_key)

    # result 딕셔너리에 setting 정보 추가
    result.update(setting)

    # 결과를 JSON 파일로 저장
    with open(filename, 'w') as f:
        json.dump(result, f)


In [26]:
# ====== Random Seed Initialization ====== #
seed = 123
np.random.seed(seed)
torch.manual_seed(seed)

parser = argparse.ArgumentParser()
args = parser.parse_args("")
args.exp_name = "exp1_lr"

# ====== Model Capacity ====== #
args.model_code = 'VGG19'
args.in_channels = 3
args.out_dim = 100
args.act = 'relu'

# ====== Regularization ======= #
args.l2 = 0.001
args.use_bn = True

# ====== Optimizer & Training ====== #
args.optim = 'Adam'
args.lr = 0.0015
args.epoch = 30

args.train_batch_size = 256
args.test_batch_size = 1024


setting, result = experiment(partition, args)



Epoch 0, Acc(train/val): 1.89/2.04, Loss(train/val) 4.46/4.53. Took 26.74 sec
Epoch 1, Acc(train/val): 3.32/2.63, Loss(train/val) 4.21/4.36. Took 26.08 sec
Epoch 2, Acc(train/val): 3.76/4.43, Loss(train/val) 4.13/4.11. Took 26.33 sec
Epoch 3, Acc(train/val): 5.22/3.32, Loss(train/val) 4.00/4.29. Took 26.05 sec
Epoch 4, Acc(train/val): 6.75/6.03, Loss(train/val) 3.86/4.05. Took 26.08 sec
Epoch 5, Acc(train/val): 8.41/5.64, Loss(train/val) 3.72/4.47. Took 26.03 sec
Epoch 6, Acc(train/val): 11.18/4.88, Loss(train/val) 3.50/4.34. Took 25.95 sec
Epoch 7, Acc(train/val): 14.78/8.93, Loss(train/val) 3.26/3.76. Took 25.86 sec
Epoch 8, Acc(train/val): 18.40/11.94, Loss(train/val) 3.05/3.56. Took 25.69 sec
Epoch 9, Acc(train/val): 22.17/16.98, Loss(train/val) 2.86/3.34. Took 25.45 sec
Epoch 10, Acc(train/val): 25.69/23.96, Loss(train/val) 2.71/2.87. Took 25.43 sec
Epoch 11, Acc(train/val): 28.20/21.13, Loss(train/val) 2.59/3.25. Took 25.79 sec
Epoch 12, Acc(train/val): 30.50/19.79, Loss(train/va

In [27]:
save_exp_result(setting, result)