In [None]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [None]:
!pip install torch torchvision

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_curand_cu12-10.3.2.106-py3-

In [None]:
# I want to start importing the base packages that we need to train our CNN
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, random_split, Subset
from sklearn.metrics import f1_score
import numpy as np
from collections import defaultdict
import torch.nn.functional as F
from PIL import Image
import os

In [None]:
class PreprocessedDataset(torch.utils.data.Dataset):
    def __init__(self, folder):
        self.file_paths = [os.path.join(dp, f) for dp, dn, filenames in os.walk(folder) for f in filenames]
        self.classes = sorted(set(os.path.basename(os.path.dirname(fp)) for fp in self.file_paths))
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(self.classes)}

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        image = torch.load(self.file_paths[idx])
        label = self.get_label_from_path(self.file_paths[idx])
        return image, label

    def get_label_from_path(self, path):
        # Get the class name (folder name) from the path
        class_name = os.path.basename(os.path.dirname(path))
        # Map the class name to an integer label
        return self.class_to_idx[class_name]

# Create datasets and dataloaders
base_train_dataset = PreprocessedDataset("/content/drive/MyDrive/YBIGTA 신입플/TransformedDatasets/base_NB_train")
blur_train_dataset = PreprocessedDataset("/content/drive/MyDrive/YBIGTA 신입플/TransformedDatasets/blur_NB_train")
val_dataset = PreprocessedDataset("/content/drive/MyDrive/YBIGTA 신입플/TransformedDatasets/NB_val")
test_dataset = PreprocessedDataset("/content/drive/MyDrive/YBIGTA 신입플/TransformedDatasets/NB_test")

In [None]:
base_train_loader = DataLoader(base_train_dataset, batch_size=32, shuffle=True, num_workers=1)
blur_train_loader = DataLoader(blur_train_dataset, batch_size=32, shuffle=True, num_workers=1)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=1)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=1)

In [None]:
base_dataloaders = {'train': base_train_loader, 'val': val_loader, "test": test_loader}
base_image_datasets = {'train': base_train_dataset, 'val': val_dataset, "test": test_dataset}
blur_dataloaders = {'train': blur_train_loader, 'val': val_loader, "test": test_loader}
blur_image_datasets = {'train': blur_train_dataset, 'val': val_dataset, "test": test_dataset}

I loaded my datasets that we will use to train a model with 2 purposes:
- Classification
- Feature Extraction that can be used to calculate the similarity between the 12 classes

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torch.optim as optim
import os
from typing import Type, List, Optional
from torchsummary import summary
from torch.optim import Optimizer
from sklearn.metrics import f1_score

In [None]:
class BasicBlock(nn.Module):
    expansion: int = 1

    def __init__(self, in_planes: int, planes: int, stride: int = 1) -> None:
        super(BasicBlock, self).__init__()

        # VGG의 철학을 따라 3 x 3 filter 사용
        self.conv1: nn.Conv2d = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn1: nn.BatchNorm2d = nn.BatchNorm2d(planes)

        # 마찬가지로 3 x 3 filter 사용, 2번째 conv layer에서는 차원이 변하지 않음
        self.conv2: nn.Conv2d = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2: nn.BatchNorm2d = nn.BatchNorm2d(planes)

        ######################## ResNet의 핵심 부분 ######################################
        #
        # Mapping을 추가해 주어야 하는데, dimension이 변하는 부분을 고려해야 함.
        # (ResNet에서는 complexity를 유지하기 위해서 dimension이 변하는 경우에만 stride = 2로 변경)
        #
        # 1. stride == 1인 경우 : dimension이 변하지 않음 -> identity mapping 사용
        # 2. stride != 1인 경우 : dimension이 변하는 경우이므로 identity mapping을 사용할 수 없음
        #                        -> 1 x 1 convolution을 활용하여 차원을 맞추어 줌.
        #

        self.shortcut: nn.Sequential = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 실제 논문에서 사용한 방식과 거의 유사함 (shortcut 위치 등)
        out: torch.Tensor = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [None]:
class BottleNeck(nn.Module):
    expansion: int = 4

    def __init__(self, in_channels: int, out_channels: int, stride: int = 1) -> None:
        super(BottleNeck, self).__init__()

        # First 1x1 convolution and batch no`rm
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)

        # 3x3 convolution and batch norm
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Last 1x1 convolution and batch norm
        self.conv3 = nn.Conv2d(out_channels, out_channels * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)

        # Shortcut connection for residuals
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels * self.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * self.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * self.expansion)
            )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # First 1x1 conv + batch norm + ReLU
        out = F.relu(self.bn1(self.conv1(x)))

        # 3x3 conv + batch norm + ReLU
        out = F.relu(self.bn2(self.conv2(out)))

        # Last 1x1 conv + batch norm
        out = self.bn3(self.conv3(out))

        # Add the shortcut (residual) connection
        out += self.shortcut(x)

        # Apply final ReLU activation
        out = F.relu(out)

        return out


In [None]:
class ResNet(nn.Module):
    def __init__(self, block: Type[nn.Module], num_blocks: List[int], num_classes: int = 12, init_weights: bool = True) -> None:
        super().__init__()

        self.in_channels: int = 64

        self.conv1: nn.Sequential = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.conv2_x: nn.Sequential = self._make_layer(block, 64, num_blocks[0], 1)
        self.conv3_x: nn.Sequential = self._make_layer(block, 128, num_blocks[1], 2)
        self.conv4_x: nn.Sequential = self._make_layer(block, 256, num_blocks[2], 2)
        self.conv5_x: nn.Sequential = self._make_layer(block, 512, num_blocks[3], 2)

        self.avg_pool: nn.AdaptiveAvgPool2d = nn.AdaptiveAvgPool2d((1, 1))
        self.fc: nn.Linear = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block: Type[nn.Module], out_channels: int, num_blocks: int, stride: int) -> nn.Sequential:
        strides: List[int] = [stride] + [1] * (num_blocks - 1)
        layers: List[nn.Module] = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        output: torch.Tensor = self.conv1(x)
        output = self.conv2_x(output)
        output = self.conv3_x(output)
        output = self.conv4_x(output)
        output = self.conv5_x(output)
        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)
        return output

In [None]:
def resnet18() -> ResNet:
    return ResNet(BasicBlock, [2, 2, 2, 2])

def resnet34() -> ResNet:
    return ResNet(BasicBlock, [3,4,6,3])

def resnet50() -> ResNet:
    return ResNet(BottleNeck, [3,4,6,3])

def resnet101() -> ResNet:
    return ResNet(BottleNeck, [3,4,23,3])

def resnet152() -> ResNet:
    return ResNet(BottleNeck, [3,8,36,3])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
net = resnet18()
model_name = 'resnet18'
net = net.to(device)
net = torch.nn.DataParallel(net)

learning_rate = 0.1
file_name = f'{model_name}_fashion.pt'

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9, weight_decay=0.0002)

In [None]:
def train(epoch: int) -> None:
    print("------------------------------------------------------------")
    print('\nEpoch %d (train)' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(blur_train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()

        benign_outputs = net(inputs)
        loss = criterion(benign_outputs, targets)
        loss.backward()

        optimizer.step()
        train_loss += loss.item()
        _, predicted = benign_outputs.max(1)

        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

        if batch_idx % 100 == 0:
            print('\nBatch', str(batch_idx))
            print('Accuracy (train):', str(predicted.eq(targets).sum().item() / targets.size(0)))
            print('Loss:', loss.item())

    print('\nTotal accuracy (train):', 100. * correct / total)
    print('Total loss:', train_loss)

In [None]:
def adjust_learning_rate(optimizer: Optimizer, epoch: int) -> None:
    lr = learning_rate
    if epoch >= 100:
        lr /= 10
    if epoch >= 150:
        lr /= 10
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [None]:
def validate(epoch: int) -> float:
    print('\nEpoch: %d (validation)' % epoch)
    net.eval()
    validation_loss = 0
    correct = 0
    total = 0
    all_targets = []
    all_predictions = []

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(val_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            total += targets.size(0)

            outputs = net(inputs)
            validation_loss += criterion(outputs, targets).item()

            _, predicted = outputs.max(1)
            correct += predicted.eq(targets).sum().item()

            all_targets.extend(targets.cpu().numpy())
            all_predictions.extend(predicted.cpu().numpy())

    accuracy = 100. * correct / total
    f1 = f1_score(all_targets, all_predictions, average='weighted')

    print('\nAccuracy (validation):', accuracy)
    print('Validation average loss:', validation_loss / total)
    print('F1 Score (validation):', f1)

    return f1

In [None]:
def test(epoch: int) -> float:
    print('\nEpoch: %d (test)' % epoch)
    net.eval()
    loss = 0
    correct = 0
    total = 0
    all_targets = []
    all_predictions = []

    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)

        outputs = net(inputs)
        loss += criterion(outputs, targets).item()

        _, predicted = outputs.max(1)
        correct += predicted.eq(targets).sum().item()

        all_targets.extend(targets.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

    accuracy = 100. * correct / total
    f1 = f1_score(all_targets, all_predictions, average='weighted')

    print('\nAccuracy (test):', accuracy)
    print('Test average loss:', loss / total)
    print('F1 Score (test):', f1)

    state = {
        'net': net.state_dict()
    }
    if not os.path.isdir('/content/drive/MyDrive/YBIGTA 신입플/Models'):
        os.mkdir('/content/drive/MyDrive/YBIGTA 신입플/Models')
    torch.save(state, '/content/drive/MyDrive/YBIGTA 신입플/Models/' + file_name)
    print('Model Saved!')

    return f1

In [None]:
f1_scores = []
for epoch in range(0, 10):
    adjust_learning_rate(optimizer, epoch)
    train(epoch)
    f1_val = validate(epoch)
    f1_test = test(epoch)
    f1_scores.append((f1_val, f1_test))

------------------------------------------------------------

Epoch 0 (train)

Batch 0
Accuracy (train): 0.15625
Loss: 2.6361513137817383

Batch 100
Accuracy (train): 0.15625
Loss: 2.5051424503326416

Batch 200
Accuracy (train): 0.03125
Loss: 2.521423578262329

Total accuracy (train): 11.273809523809524
Total loss: 742.7755472660065

Epoch: 0 (validation)

Accuracy (validation): 16.61111111111111
Validation average loss: 0.07520795543988545
F1 Score (validation): 0.12008587975536697

Epoch: 0 (test)

Accuracy (test): 15.88888888888889
Test average loss: 0.07537256300449371
F1 Score (test): 0.11436145076719909
Model Saved!
------------------------------------------------------------

Epoch 1 (train)

Batch 0
Accuracy (train): 0.25
Loss: 2.259626626968384

Batch 100
Accuracy (train): 0.3125
Loss: 2.1973302364349365

Batch 200
Accuracy (train): 0.09375
Loss: 2.4042673110961914

Total accuracy (train): 17.738095238095237
Total loss: 608.7052080631256

Epoch: 1 (validation)

Accuracy (valid