드리아브 마운트

In [None]:
from google.colab import drive
drive.mount("/content/gdrive/", force_remount=True)

기본 경로 설정

In [None]:
cd "/content/gdrive/MyDrive/2024_MCL_Internship/CIFAR10"

라이브러리 불러오기

In [None]:
import os
from time import time
from matplotlib import pyplot as plt

import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
import torchvision
import torchvision.transforms as transforms

데이터셋 불러오기

In [None]:
# data 불러오기
transform = transforms.Compose(
    [transforms.ToTensor()]
)

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)


testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                    download=True, transform=transform)

classes = ('plane', 'car', 'bird', 'cat',
        'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Datset Visualization

In [None]:
import random
from torch.nn.functional import interpolate

print("total length of train dataset : ", len(trainset))
print("total length of test dataset : ", len(testset))
i = random.randrange(len(trainset))
image_train, label_train = trainset[i]

image_train_PIL = transforms.ToPILImage()(interpolate(image_train.unsqueeze(0), scale_factor=4, mode="nearest")[0])
display(image_train_PIL)
print(classes[label_train])
print()

i = random.randrange(len(testset))
image_test, label_test = testset[i]
image_test_PIL = transforms.ToPILImage()(interpolate(image_test.unsqueeze(0), scale_factor=4, mode="nearest")[0])
display(image_test_PIL)
print(classes[label_test])

model 정의하기

In [None]:
class Linear_Model(nn.Module):
    def __init__(self):
        super().__init__()

        self.flatten = nn.Flatten()

        self.linear = nn.Sequential(
            nn.Linear(3 * 32 * 32, 2048),
            nn.BatchNorm1d(2048),
            nn.ReLU(inplace=True),
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.Linear(64, 32),
            nn.BatchNorm1d(32),
            nn.ReLU(inplace=True),
            nn.Linear(32, 10),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        logits = self.flatten(x)
        logits = self.linear(logits)
        return logits

class vgg_model(nn.Module):
    def __init__(self):
        super(vgg_model, self).__init__()
        self.feature_layer1 = nn.Sequential( # conv1_1, conv1_2, MaxPool1
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1), # conv1_1 : 3 x 32 x 32 -> 64 x 32 x 32
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1), # conv1_2 : 64 x 32 x 32 -> 64 x 32 x 32
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 64 x 16 x 16
        )
        self.feature_layer2 = nn.Sequential( # conv2_1, conv2_2, Pool2
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 128 x  8 x 8
        )
        self.feature_layer3 = nn.Sequential( # conv3_1, conv3_2, conv3_3, Pool3
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 256 x 4 x 4
        )
        self.feature_layer4 = nn.Sequential( # conv4_1, conv4_2, conv4_3, Pool4
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # 512 x 2 x 2
        )
        self.feature_layer5 = nn.Sequential( # conv5_1, conv5_2, conv5_3, Pool5
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2) # B x 512 x 1 x 1
        )
        self.classifier = nn.Sequential( # 512 -> 256 -> 100 -> 10 FC Layers, ReLU로 activation, 마지막은 softmax
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 100),
            nn.ReLU(inplace=True),
            nn.Linear(100, 10),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        logits = self.feature_layer1(x)
        logits = self.feature_layer2(logits)
        logits = self.feature_layer3(logits)
        logits = self.feature_layer4(logits)
        logits = self.feature_layer5(logits)
        logits = logits.view(logits.size(0), -1)
        logits = self.classifier(logits)
        return logits

In [None]:
from torchsummary import summary as summ

model_linear = Linear_Model().cuda()
model_vgg = vgg_model().cuda()

summ(model_linear, (3, 32, 32))
summ(model_vgg, (3, 32, 32))

데이터셋 sampling

In [None]:
class CIFAR10_sampling(torch.utils.data.Dataset):
    def __init__(self, dataset, rate):
        self.img_list = []
        self.label_list = []
        cnt_list = [0] * 10
        for img, label in dataset:
            if cnt_list[label] < int(len(dataset) // 10 * rate):
                self.img_list.append(img)
                self.label_list.append(label)
                cnt_list[label] += 1

    def __getitem__(self, idx):
        return self.img_list[idx], self.label_list[idx]

    def __len__(self):
        return len(self.label_list)

train 코드 정의

In [None]:
def train_model(info):
    # model 정의
    if info["model"] == "vgg":
        model = vgg_model()
    elif info["model"] == "linear":
        model = Linear_Model()
    else:
        print("Model Error")
        exit(0)
    model.cuda()

    # checkpoints 저장 디렉토리 만들기, tensorboard 정의
    ckpt_path = info["model"]
    if not os.path.exists(ckpt_path): os.mkdir(ckpt_path)
    pth_path = os.path.join(ckpt_path, "pth")
    if not os.path.exists(pth_path): os.mkdir(pth_path)
    writer = info["writer"]

    # dataset
    transform = transforms.Compose(
        [transforms.ToTensor()]
    )
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
    testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                    download=True, transform=transform)

    # 빠른 훈련을 위해 dataset을 일부 sampling하여 사용하고 싶은 경우
    trainset = CIFAR10_sampling(trainset, info["train_sampling_rate"])

    print(f"Length of train dataset: {len(trainset)} / {int(len(trainset) / info['train_sampling_rate'])}")

    train_loader = torch.utils.data.DataLoader(trainset, batch_size=info["batch_size"],
                                            shuffle=True, num_workers=2)
    test_loader = torch.utils.data.DataLoader(testset, batch_size=info["batch_size"],
                                            shuffle=False, num_workers=2)

    # Loss 정의, Optimizer, Scheduler
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=info["lr"])
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.9)

    # train, test
    log_loss_train = []
    log_loss_test = []
    log_acc_test = []

    for epoch in range(info["epochs"]):
        # train
        start_time = time()
        train_loss_per_epoch = 0
        model.train()

        for iter, batch in enumerate(train_loader):
            image = batch[0].cuda()
            label = batch[1].cuda()

            y = model(image) # size of y : Batch x 10
            loss_train = criterion(y, label)

            optimizer.zero_grad()
            loss_train.backward()
            optimizer.step()

            train_loss_per_epoch += loss_train.item() / len(train_loader)

            writer.add_scalar(f"{info['model']}/Train_Loss", loss_train, epoch * len(train_loader) + iter)
            # if (iter + 1) % 10 == 0:
            #     print(f"\t[{iter + 1}/{len(train_loader)}] Train Loss: {loss_train.item():.4f}")

        print(f"Epoch[{epoch:3d}] Train Loss: {train_loss_per_epoch:.4f}", end='')
        log_loss_train.append(train_loss_per_epoch)

        # test
        model.eval()
        loss_test_epoch = 0
        total_num_accs = 0

        with torch.no_grad():
            for iter, batch in enumerate(test_loader):
                image = batch[0].cuda()
                label = batch[1].cuda()

                y = model(image)
                loss_test = criterion(y, label)
                loss_test_epoch += loss_test / len(test_loader)

                y_ = torch.argmax(y, dim=1)
                num_accs = torch.sum(y_ == label).item()
                total_num_accs += num_accs

        acc_rate = (total_num_accs / len(testset)) * 100
        writer.add_scalar(f"{info['model']}/Test_Loss", loss_test_epoch, epoch)
        writer.add_scalar(f"{info['model']}/Test_Accuracy", acc_rate, epoch)
        log_loss_test.append(loss_test_epoch.item())
        log_acc_test.append(acc_rate)

        # if (epoch + 1) % 10 == 0:
        #     torch.save({'epoch': epoch,
        #                 'model_state_dict': model.state_dict(),
        #                 'optimizer_state_dict': optimizer.state_dict()}, f'{pth_path}/Epoch{epoch:03d}.pth')

        time_per_epoch = time() - start_time
        print(f" Test Loss: {loss_test_epoch:.4f}"
              f" Accuracy Rate: {acc_rate:.2f}%"
              f" lr: {scheduler.get_last_lr()[0]:.1E}"
              f" time per epoch: {time_per_epoch:.2f}sec"
              )

        scheduler.step()

    return log_loss_train, log_loss_test, log_acc_test

main 함수

In [None]:
if not os.path.exists('logs'): os.mkdir('logs')
writer = SummaryWriter('logs')

info = {
    "epochs" : 15,
    "lr" : 0.0001,
    "model" : "linear",
    "writer" : writer,
    "train_sampling_rate": 0.4,
    "batch_size": 500,
}

print(f"mode: {info['model']} | epochs: {info['epochs']} | learning rate: {info['lr']}")
my_cnn = train_model(info)

info["model"] = "vgg"
print(f"\nmode: {info['model']} | epochs: {info['epochs']} | learning rate: {info['lr']}")
vgg = train_model(info)

print(f"Train End")

pyplot

In [None]:
# --- Logs Visualization
# training loss
plt.plot(my_cnn[0], label='Linear_Model')
plt.plot(vgg[0], label='VGG16')
plt.xticks(list(range(0, info["epochs"], info["epochs"] // 5)))
plt.legend()
plt.savefig('train_loss.png')
plt.clf()

# test loss
plt.plot(my_cnn[1], label='Linear_Model')
plt.plot(vgg[1], label='VGG16')
plt.xticks(list(range(0, info["epochs"], info["epochs"] // 5)))
plt.legend()
plt.savefig('test_loss.png')
plt.clf()

# test accuracy
plt.plot(my_cnn[2], label='Linear_Model')
plt.plot(vgg[2], label='VGG16')
plt.xticks(list(range(0, info["epochs"], info["epochs"] // 5)))
plt.legend()
plt.savefig('test_acc.png')
plt.clf()

tensorboard 실행

In [None]:
%load_ext tensorboard

%tensorboard --logdir logs