In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os

checkpoints = '/content/drive/MyDrive/Colab Notebooks/VGGNet/checkpoints/'
if not os.path.exists(checkpoints):
    os.makedirs(checkpoints)

In [None]:
import torch
import torch.nn as nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np

In [None]:
import wandb

torch.manual_seed(42)
np.random.seed(42)

fixed_config = {
    "scheduler": "CosineAnnealingLR",
    "weight_decay": 5E-04,
    "learning_rate": 0.1,
    "epochs": 200,
    "transform_normalize": ((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
    "classifier": "avg",
    "batch_size": 128,
    "dropout_rate": 0.5,
}

sweep_config = {
    "method": "grid",
    "metric": {"goal": "minimize", "name": "loss"},
    "parameters": {
        "nesterov": {"values": [False]}
    }
}

In [None]:
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, 4),
    transforms.ToTensor(),
    transforms.Normalize(fixed_config["transform_normalize"][0], fixed_config["transform_normalize"][1])
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(fixed_config["transform_normalize"][0], fixed_config["transform_normalize"][1])
])

train_dataset = datasets.CIFAR10(root="/content/drive/MyDrive/Colab Notebooks/VGGNet/dataset/",
                                 train=True,
                                 download=True,
                                 transform=transform)

test_dataset = datasets.CIFAR10(root="/content/drive/MyDrive/Colab Notebooks/VGGNet/dataset/",
                                train=False,
                                download=True,
                                transform=transform_test)

num_classes = len(train_dataset.classes)

train_size = int(0.8 * len(train_dataset))
validation_size = len(train_dataset) - train_size

train_dataset, validation_dataset = random_split(train_dataset, [train_size, validation_size])

In [None]:
BATCH_SIZE = fixed_config["batch_size"]
dropout_rate = fixed_config["dropout_rate"]

train_loader = DataLoader(dataset=train_dataset,
                                           batch_size=BATCH_SIZE,
                                           shuffle=True)

validation_loader = DataLoader(dataset=validation_dataset,
                               batch_size=BATCH_SIZE,
                               shuffle=False)

test_loader = DataLoader(dataset=test_dataset,
                                          batch_size=BATCH_SIZE,
                                          shuffle=False)

In [None]:
X_train_size = 0
for (X_train, Y_train) in train_loader:
    X_train_size = X_train.size()[1:]
    print(X_train_size)
    print(f"X_train: {X_train.size()} type: {X_train.type()}")
    print(f"Y_train: {Y_train.size()} type: {Y_train.type()}")
    break

torch.Size([3, 32, 32])
X_train: torch.Size([128, 3, 32, 32]) type: torch.FloatTensor
Y_train: torch.Size([128]) type: torch.LongTensor


In [None]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
print("사용하는 Device :", DEVICE)

사용하는 Device : cuda


In [None]:
class VGGNet(nn.Module):
    def __init__(self, features, fc_units=512):
        super(VGGNet, self).__init__()
        self.features = features

        with torch.no_grad():
            self._feature_map_size = self._get_feature_map_size(features)

        self.fc = self._get_classifier(fc_units)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu' )
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal(m.weight, std=1e-3)
                if m.bias is not None:
                    m.bias.data.zero_()

    def _get_feature_map_size(self, features):
        x = torch.randn(1, *X_train_size)
        x = features(x)
        return torch.flatten(x, 1).size(1)

    def _get_classifier(self, fc_units):
        if fixed_config["classifier"] == "fc":
            classifier = nn.Sequential(
                nn.Linear(self._feature_map_size, fc_units),
                nn.ReLU(),
                nn.Dropout(p=dropout_rate),
                nn.Linear(fc_units, fc_units),
                nn.ReLU(),
                nn.Dropout(p=dropout_rate),
                nn.Linear(fc_units, num_classes)
            )
        elif fixed_config["classifier"] == "avg":
            classifier = nn.Sequential(
                nn.Linear(self._feature_map_size, num_classes)
            )
        return classifier

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

def make_layers(cfg):
    layers = []
    in_channels = 3
    for v in cfg:
        if v == 'maxpool':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            if v=='conv1':
                v = in_channels
                conv2d = nn.Conv2d(in_channels, v, kernel_size=1)
            else:
                conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            in_channels = v
            layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
    if fixed_config["classifier"] == "avg":
        layers += [nn.AvgPool2d(kernel_size=1, stride=1)]
    return nn.Sequential(*layers)

min_width = 64

cfgs = {
    'A': [min_width, 'maxpool',
          min_width*2, 'maxpool',
          min_width*4, min_width*4, 'maxpool',
          min_width*8, min_width*8, 'maxpool',
          min_width*8, min_width*8, 'maxpool'],
    'B': [min_width, min_width, 'maxpool',
          min_width*2, min_width*2, 'maxpool',
          min_width*4, min_width*4, 'maxpool',
          min_width*8, min_width*8, 'maxpool',
          min_width*8, min_width*8, 'maxpool'],
    'C': [min_width, min_width, 'maxpool',
          min_width*2, min_width*2, 'maxpool',
          min_width*4, min_width*4, 'conv1', 'maxpool',
          min_width*8, min_width*8, 'conv1', 'maxpool',
          min_width*8, min_width*8, 'conv1', 'maxpool'],
    'D': [min_width, min_width, 'maxpool',
          min_width*2, min_width*2, 'maxpool',
          min_width*4, min_width*4, min_width*4, 'maxpool',
          min_width*8, min_width*8, min_width*8, 'maxpool',
          min_width*8, min_width*8, min_width*8, 'maxpool'],
    'E': [min_width, min_width, 'maxpool',
          min_width*2, min_width*2, 'maxpool',
          min_width*4, min_width*4, min_width*4, min_width*4, 'maxpool',
          min_width*8, min_width*8, min_width*8, min_width*8, 'maxpool',
          min_width*8, min_width*8, min_width*8, min_width*8, 'maxpool']
}

def vgg11(): # configuration A
    return VGGNet(make_layers(cfgs['A']))

def vgg13(): # configuration B
    return VGGNet(make_layers(cfgs['B']))

def vgg16_1(): # configuration C
    return VGGNet(make_layers(cfgs['C']))

def vgg16(): # configuration D
    return VGGNet(make_layers(cfgs['D']))

def vgg19(): # configuration E
    return VGGNet(make_layers(cfgs['E']))

In [None]:
model = vgg16().to(DEVICE)
total_params = sum(p.numel() for p in model.parameters())
print(model)
print(f"총 파라미터 개수: {total_params}")

  nn.init.normal(m.weight, std=1e-3)


VGGNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU(inplace=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (9): ReLU(inplace=True)
    (10): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (12): ReLU(inplace=True)
    (13): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (14): Conv2d(128, 

In [None]:
from tqdm.auto import tqdm
from torch.optim.lr_scheduler import ReduceLROnPlateau

def train(config):
    criterion = torch.nn.CrossEntropyLoss().to(DEVICE)
    total_epochs = fixed_config["epochs"]

    try:
        checkpoint = torch.load(checkpoints + 'VGGNet_ablation_nesterov/epoch93')
        old_model_state_dict = checkpoint['model_state_dict']
        new_model_state_dict = model.state_dict()

        for name, param in old_model_state_dict.items():
            if name in new_model_state_dict:
                try:
                    new_model_state_dict[name].copy_(param)
                except Exception as e:
                    print(f"Failed to copy param: {name}, due to {e}")

        model.load_state_dict(new_model_state_dict, strict=False)

        last_epoch = checkpoint['epoch']
        best_val_loss = checkpoint["best_val_loss"]

    except:
        checkpoint = None
        last_epoch = -1
        best_val_loss = float('inf')

    finally:
        optimizer = torch.optim.SGD(model.parameters(), lr=fixed_config["learning_rate"], momentum=0.9,
                                    weight_decay=fixed_config["weight_decay"], nesterov=config.nesterov)
        if fixed_config["scheduler"] == "CosineAnnealingLR":
            scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=total_epochs)
        elif fixed_config["scheduler"] == "ReduceLROnPlateau":
            scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)
        if checkpoint and 'optimizer_state_dict' in checkpoint:
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

        train_losses = []
        val_losses = []

        for epoch in tqdm(range(last_epoch + 1, total_epochs), desc='Epoch Progress'):
            avg_cost = 0

            model.train()
            train_loss = 0.0
            correct_train = 0
            total_train = 0

            with tqdm(total=len(train_loader), desc='Batch Progress') as batch_bar:
                for X, Y in train_loader:
                    X = X.to(DEVICE)
                    Y = Y.to(DEVICE)

                    optimizer.zero_grad()
                    hypothesis = model(X)
                    loss = criterion(hypothesis, Y)
                    loss.backward()
                    optimizer.step()

                    train_loss += loss.item()

                    _, predicted_train = torch.max(hypothesis.data, 1)
                    total_train += Y.size(0)
                    correct_train += (predicted_train == Y).sum().item()

                    batch_bar.update()

                train_losses.append(train_loss / len(train_loader))
                train_accuracy = (100 * correct_train) / total_train

                model.eval()
                val_loss = 0.0

                with torch.no_grad():
                    correct = 0
                    total = 0
                    for X, Y in validation_loader:
                        X = X.to(DEVICE)
                        Y = Y.to(DEVICE)

                        output = model(X)
                        _, predicted = torch.max(output, 1)

                        val_loss += criterion(output, Y).item()

                        total += Y.size(0)
                        correct += (predicted == Y).sum().item()

                    val_losses.append(val_loss / len(validation_loader))
                    val_accuracy = correct / total
                    scheduler.step()

                train_desc = {
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'scheduler_state_dict': scheduler.state_dict(),
                'train_accuracy': train_accuracy,
                'val_accuracy': val_accuracy * 100,
                'train_losses': train_losses,
                'val_losses': val_losses,
                'best_val_loss': best_val_loss
                }

                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    torch.save(train_desc, checkpoints + f'VGGNet_ablation_nesterov/best_epoch')

                torch.save(train_desc, checkpoints+f'VGGNet_ablation_nesterov/last_epoch')

                wandb.log({"train_accuracy": train_accuracy, "val_accuracy": val_accuracy*100,
                           "train_losses": train_losses[-1], "val_losses": val_losses[-1],
                           "learning_rate": optimizer.param_groups[0]['lr']
                           }, step=epoch)

                print('Epoch [{}/{}], Train Loss: {:.4f}, Train Accuracy: {:.4f}%, Val Loss: {:.4f}, Val Accuracy: {:.2f}%'
                        .format(epoch, total_epochs, train_losses[-1], train_accuracy, val_losses[-1], val_accuracy*100))

        model.eval()
        test_loss = 0
        correct = 0
        total = 0

        with torch.no_grad():
            for X, Y in test_loader:
                X = X.to(DEVICE)
                Y = Y.to(DEVICE)

                output = model(X)
                loss = criterion(output, Y)
                test_loss += loss.item()

                _, predicted = torch.max(output, 1)
                total += Y.size(0)
                correct += (predicted == Y).sum().item()

        test_loss /= len(test_loader)
        test_accuracy = correct / total

        wandb.log({"test_loss": test_loss, "test_accuracy": test_accuracy})

        wandb.alert("[nesterov]Training Task Finished", f"nesterov: {str(config.nesterov)}")
        return val_losses[-1]