In [None]:
!pip install -U -q kaggle
!mkdir -p ~/.kaggle
!echo '{"username":"<my_username>","key":"<my_key>"}' > ~/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json
!kaggle competitions download -c AIST4010-Spring2024-A1

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/84.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.6/84.6 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for kaggle (setup.py) ... [?25l[?25hdone
Downloading AIST4010-Spring2024-A1.zip to /content
 97% 140M/144M [00:04<00:00, 31.9MB/s]
100% 144M/144M [00:05<00:00, 30.0MB/s]


In [None]:
!unzip -q AIST4010-Spring2024-A1.zip

Reminder: First change the runtime to a GPU runtime

Check if the download is success

In [None]:
import torch
print(torch.__version__) # check torch vision

2.1.0+cu121


In [None]:
from torchvision.datasets import ImageFolder
from torchvision import transforms
from torchvision.io import read_image
from torch.utils.data import Dataset, DataLoader

import numpy as np
import pandas as pd

import os

# determine if using gpu or cpu
device = 'cuda' if torch.cuda.is_available else 'cpu'
# device = 'cpu'
print(device)

cuda


In [None]:
path = './data/'

train_annotates, val_annotates = [], []
for dirname, _, filenames in os.walk(path + 'train'):
    for filename in filenames:
        train_annotates.append([os.path.join(dirname.split('/')[-1], filename), int(dirname.split('_')[-1])])

for dirname, _, filenames in os.walk(path + 'val'):
    for filename in filenames:
        val_annotates.append([os.path.join(dirname.split('/')[-1], filename), int(dirname.split('_')[-1])])

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.DataFrame(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [None]:
# transforms PIL to tensor
trans = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.Grayscale(),
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.GaussianBlur(kernel_size=(7, 11), sigma=(0.1, 4)),
    transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.5),
    transforms.Normalize((0.5,), (0.5,))
])

train_data = CustomImageDataset(train_annotates, path + 'train', transform=trans)
valid_data = CustomImageDataset(val_annotates, path + 'val', transform=trans)

In [None]:
train_dataloader = DataLoader(train_data, batch_size=64, shuffle=True)
val_dataloader = DataLoader(valid_data, batch_size=64, shuffle=True)

# Build the network

In [None]:
import torch
import torch.nn as nn

In [None]:
import torch.nn.functional as F

class Bottleneck(nn.Module):
    expansion = 4
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Bottleneck, self).__init__()

        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        identity = x.clone()
        x = self.relu(self.batch_norm1(self.conv1(x)))

        x = self.relu(self.batch_norm2(self.conv2(x)))

        x = self.conv3(x)
        x = self.batch_norm3(x)

        #downsample if needed
        if self.i_downsample is not None:
            identity = self.i_downsample(identity)

        #add identity
        x+=identity
        x=self.relu(x)

        return x

class Block(nn.Module):
    expansion = 1
    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()


        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm2 = nn.BatchNorm2d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
      identity = x.clone()

      x = self.relu(self.batch_norm2(self.conv1(x)))
      x = self.batch_norm2(self.conv2(x))

      if self.i_downsample is not None:
          identity = self.i_downsample(identity)

      x += identity
      x = self.relu(x)
      return x

class ResNet(nn.Module):
    def __init__(self, ResBlock, layer_list, num_classes, num_channels=3):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(num_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2, padding=1)

        self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64)
        self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2)
        self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2)
        self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*ResBlock.expansion, num_classes)

    def forward(self, x):
        x = self.relu(self.batch_norm1(self.conv1(x)))
        x = self.max_pool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, ResBlock, blocks, planes, stride=1):
        ii_downsample = None
        layers = []

        if stride != 1 or self.in_channels != planes*ResBlock.expansion:
            ii_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes*ResBlock.expansion)
            )

        layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride))
        self.in_channels = planes*ResBlock.expansion

        for i in range(blocks-1):
            layers.append(ResBlock(self.in_channels, planes))

        return nn.Sequential(*layers)

def ResNet50(num_classes, channels=3):
    return ResNet(Bottleneck, [3,4,6,3], num_classes, channels)


In [None]:
resnet50 = ResNet50(num_classes=1000, channels=1).to(device)

## build the training loop

In [None]:
EPOCH = 80
criterion = nn.CrossEntropyLoss()
learning_rate = 0.001
optimizer = torch.optim.Adam(resnet50.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCH, eta_min=0.0001)

for epoch in range(EPOCH):

    train_total = 0
    train_correct = 0

    resnet50.train()
    for i, (image, label) in enumerate(train_dataloader):

        optimizer.zero_grad()

        # retrieve image and label
        image = image.to(device)
        label = label.to(device)

        # forward step
        output = resnet50(image)
        loss = criterion(output, label)

        # optimization step
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(output.data, 1)

        train_total += label.size(0)
        train_correct += (predicted == label).sum().item()

    scheduler.step()

    print(f'Epoch {epoch+1}/{EPOCH}, Loss: {loss.item():.4f}, train_accuracy: {train_correct}/{train_total}, {100 * train_correct/train_total}%')

    # validate
    resnet50.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for image_val, label_val in val_dataloader:
            image_val = image_val.to(device)
            label_val = label_val.to(device)

            output_val = resnet50(image_val)

            _, predicted_val = torch.max(output_val.data, 1)

            total += label_val.size(0)
            correct += (predicted_val == label_val).sum().item()


        print(f'Accuracy of validation images: {correct}/{total}, {100 * correct/total}%')


Epoch 1/80, Loss: 6.3122, train_accuracy: 508/93134, 0.5454506410118753%
Accuracy of validation images: 19/2000, 0.95%
Epoch 2/80, Loss: 5.5856, train_accuracy: 2524/93134, 2.7100736573109714%
Accuracy of validation images: 82/2000, 4.1%
Epoch 3/80, Loss: 4.5627, train_accuracy: 9249/93134, 9.930852320312669%
Accuracy of validation images: 235/2000, 11.75%
Epoch 4/80, Loss: 4.5075, train_accuracy: 21198/93134, 22.760753323168768%
Accuracy of validation images: 431/2000, 21.55%
Epoch 5/80, Loss: 2.9074, train_accuracy: 31497/93134, 33.81901346447055%
Accuracy of validation images: 567/2000, 28.35%
Epoch 6/80, Loss: 2.9512, train_accuracy: 39906/93134, 42.847939527991926%
Accuracy of validation images: 735/2000, 36.75%
Epoch 7/80, Loss: 2.1031, train_accuracy: 47151/93134, 50.6270534928168%
Accuracy of validation images: 795/2000, 39.75%
Epoch 8/80, Loss: 1.5534, train_accuracy: 53619/93134, 57.57188567010974%
Accuracy of validation images: 780/2000, 39.0%
Epoch 9/80, Loss: 1.3669, train

In [None]:
submission = []

resnet50.eval()
with torch.no_grad():
    for filename in np.loadtxt('./data/test_list.txt', dtype='str'):
        dirname = './data/test'
        test_img = read_image(os.path.join(dirname, filename))

        trans2 = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.Grayscale(),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))])

        test_img = trans2(test_img).unsqueeze(0).to(device)
        test_output = resnet50(test_img)

        _, predicted = torch.max(test_output.data, 1)

        submission.append([filename, 'a1_'+str(predicted.item())])

submission = pd.DataFrame(submission, columns=['id', 'label'])
submission.to_csv('submission.csv', index=False)

In [None]:
!kaggle competitions submit -c <competition> -f submission.csv

100% 47.8k/47.8k [00:00<00:00, 74.9kB/s]
Successfully submitted to AIST4010-Spring2024-A1

In [None]:
from google.colab import runtime
runtime.unassign()