In [None]:
# !pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# !pip3 install tqdm
# !pip3 install matplotlib pandas

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from tqdm import tqdm

import PIL
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch import Tensor
from typing import Any, Callable, List, Optional, Type, Union

import torchvision
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset


# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import pickle
import os
import time
# for dirname, _, filenames in os.walk('/kaggle/input/'):
# for dirname, _, filenames in os.walk('./kaggle/input/'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = torch.device("mps" if torch.backends.mps.is_available() else device)
# device = 'cpu'
print(f"Current device : {device}")

In [None]:
class Cutout(object):
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        h, w = img.size(1), img.size(2)
        mask = np.ones((h, w), np.float32)

        for n in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)
            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x - self.length // 2, 0, w)
            x2 = np.clip(x + self.length // 2, 0, w)
            mask[y1: y2, x1: x2] = 0.

        mask = torch.from_numpy(mask)
        mask = mask.expand_as(img)
        img = img * mask
        return img

In [None]:
transform_train = transforms.Compose([
    # transforms.ToTensor(),
    # Randomly crop the image and pad if needed
    transforms.RandomCrop(32, padding=4),
    
    # Randomly flip the image horizontally
    transforms.RandomHorizontalFlip(),
    
    # Randomly rotate the image by 15 degrees
    transforms.RandomRotation(15),
    
    # Randomly change the brightness, contrast, saturation, and hue of the image
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    
    # Convert image to a PyTorch tensor
    transforms.ToTensor(),
    
    # Apply Cutout regularization
    Cutout(n_holes=2, length=8),
    
    # Normalize the image using the mean and standard deviation of the CIFAR-10 dataset
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    # Convert image to a PyTorch tensor
    transforms.ToTensor(),
    
    # Normalize the image using the mean and standard deviation of the CIFAR-10 dataset
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])


In [None]:
# Custom Dataset class
import PIL.Image


class CIFAR10Custom(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]
        img = PIL.Image.fromarray(image)
        if self.transform:
            image = self.transform(img)
        return image, label

# Load data and labels from batch file
def load_cifar_batch(file):
    with open(file, 'rb') as fo:
        batch = pickle.load(fo, encoding='bytes')
    data = batch[b'data']
    labels = batch[b'labels']
    data = data.reshape((len(data), 3, 32, 32)).transpose(0, 2, 3, 1)
    return data, labels

# Load all training batches and combine them
cifar10_dir = './cifar-10-python/cifar-10-batches-py'
train_data = []
train_labels = []

# Assuming there are 5 training batches as per CIFAR-10 structure
for i in range(1, 6):
    data_batch, labels_batch = load_cifar_batch(os.path.join(cifar10_dir, f'data_batch_{i}'))
    train_data.append(data_batch)
    train_labels.extend(labels_batch)

# Convert to numpy arrays and apply transformations if needed
train_data = np.vstack(train_data)
train_labels = np.array(train_labels)

# Create the CIFAR-10 custom dataset
trainset = CIFAR10Custom(train_data, train_labels, transform=transform_train)

# DataLoader
trainloader = DataLoader(trainset, batch_size=128, shuffle=True)



In [None]:
#------------------------------------IMPORTANT--------------------------------------
#TRAIN VAL SPLIT. Please remove this during actual training. This is only for tuning.
transform_validation = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994,
0.2010)),
])
def load_cifar10_validation(filename):
    with open(filename, 'rb') as f:
        testset = pickle.load(f)
    return testset

# # Call the function to load the dataset
valdata, vallabel = load_cifar_batch("./cifar-10-python/cifar-10-batches-py/test_batch")
valset = CIFAR10Custom(valdata, np.array(vallabel), transform=transform_validation)
validationloader = DataLoader(valset, batch_size= 100, shuffle=False)

if valset:
    valloader = DataLoader(valset, batch_size=100, shuffle=True)


In [None]:
# Define the CIFAR-10 Custom Dataset class
class CIFAR10Custom(Dataset):
    def __init__(self, file, transform=None):
        self.data, self.ids = self.load_cifar_batch(file)
        self.transform = transform

    def load_cifar_batch(self, file):
        with open(file, 'rb') as fo:
            batch = pickle.load(fo, encoding='bytes')
        data = batch[b'data']
        ids = batch[b'ids']
        data = data.reshape((-1, 3, 32, 32)).transpose((0, 2, 3, 1))
        return data, ids

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        img_id = self.ids[idx]
        if self.transform:
            img = self.transform(img)
        return img, img_id

# Path to your test batch file
test_batch_file = './cifar_test_nolabels.pkl'

# Create the CIFAR-10 custom dataset for test
testset = CIFAR10Custom(test_batch_file, transform=transform_test)

# DataLoader for testset
testloader = DataLoader(testset, batch_size=100, shuffle=False)
# testloader = DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

In [None]:
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class SE_Block(nn.Module):
    def __init__(self, c, r=16):
        super().__init__()
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(c, c // r, bias=False),
            nn.SiLU(inplace=True),
            nn.Linear(c // r, c, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        bs, c, _, _ = x.shape
        y = self.squeeze(x).view(bs, c)
        y = self.excitation(y).view(bs, c, 1, 1)
        return x * y.expand_as(x)
    
class SEBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None, r=16):
        super(SEBasicBlock, self).__init__()
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if groups != 1 or base_width != 64:
            raise ValueError('BasicBlock only supports groups=1 and base_width=64')
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.SiLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        self.downsample = downsample
        if stride != 1 or inplanes != planes:
            self.downsample = nn.Sequential(
                nn.Conv2d(inplanes, planes, kernel_size=1, stride=stride, padding=0),
                nn.BatchNorm2d(planes)
            )
        self.stride = stride
        # add SE block
        self.se = SE_Block(planes, r)

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        # add SE operation
        out = self.se(out)

        if self.downsample is not None:
            identity = self.downsample(x)
        
        out += identity
        out = self.relu(out)

        return out
    
class ModifiedResNetSE(nn.Module):
    def __init__(self, num_classes=10):
        super(ModifiedResNetSE, self).__init__()
        self.conv1 = conv3x3(3, 64)
        self.convmod = conv3x3(64, 64)
        # self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.SiLU(inplace=True)
        self.layer1 = self._make_layer(64, 64, 3, stride=1)
        self.layer2 = self._make_layer(64, 64, 4, stride=2)
        self.layer3 = self._make_layer(64, 128, 4, stride=2)
        self.layer4 = self._make_layer(128, 256, 3, stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)
        self.dropout = nn.Dropout(0.25)
        nn.init.kaiming_normal_(self.fc.weight)
        nn.init.zeros_(self.fc.bias)

    def _make_layer(self, in_channels, out_channels, num_blocks, stride):
        layers = []
        layers.append(SEBasicBlock(in_channels, out_channels, stride))
        for _ in range(1, num_blocks):
            layers.append(SEBasicBlock(out_channels, out_channels, stride=1))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn1(self.convmod(x)))
        x = self.relu(self.bn1(self.convmod(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

model = ModifiedResNetSE()

In [None]:
# Calculate the number of parameters in the model
model =model.to(device)
num_params = sum(p.numel() for p in model.parameters())
print(f"Number of parameters: {num_params}")

In [None]:
# #UNCOMMENT THIS BLOCK TO VISUALISE MODEL
# from torchview import draw_graph
# import graphviz
# graphviz.set_jupyter_format('png')

# model_graph = draw_graph(model, input_size=(1,3,32,32), expand_nested=True)
# model_graph.resize_graph(scale=5.0)
# model_graph.visual_graph

In [None]:
def val_acc(autosave=True, min_acc=80.0):
    global best_acc
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in valloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    acc = 100*correct/total
    if acc>best_acc and autosave and acc>min_acc:
        print("Saving model...")
        torch.save(model.state_dict(), "./checkpoint/modifiedresnet.pth")
        best_acc = acc
    return acc

In [None]:
load_model = False
best_acc = 0.0
if load_model:
    model.load_state_dict(torch.load("./checkpoint/modifiedresnet.pth"))
    best_acc = val_acc(autosave=False)
# if device!="cpu":
#     model = nn.DataParallel(model)
criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=0.1)
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
# Assuming 'optimizer' is defined and 'model' and 'criterion' are already defined and initialized

In [None]:
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=300, eta_min=1e-5)

In [None]:
def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group['lr']

In [None]:
train_loss_history = []
train_accuracy_list = []
test_accuracy_list = []
num_epochs = 300

for epoch in range(num_epochs):
    tt = time.time()
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for i, (inputs, labels) in tqdm(enumerate(trainloader), total=len(trainloader), leave=False):
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

        running_loss = running_loss / (i+1)
    
    tr_acc = 100*correct/total
    train_accuracy_list.append(tr_acc)
    train_loss_history.append(running_loss)
    tt = time.time() - tt
    
    # scheduler.step()
    va = 0.0
    if valset:
        va = val_acc()
        test_accuracy_list.append(va)

    print(f"Epoch: {epoch+1}, LR: {get_lr(optimizer)}, Train accuracy: {round(tr_acc, 3)}%, Time Taken: {round(tt, 3)}s, Val accuracy: {round(va, 3)}%")

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1, 2)
# fig.set_figheight(5)
fig.set_figwidth(15)
ax[0].plot(range(len(train_loss_history)), train_loss_history, '-', linewidth = 1, label = "Train error")
ax[0].set_xlabel('epoch')
ax[0].set_ylabel('loss')
ax[0].grid(True)
ax[1].plot(range(len(train_accuracy_list)), train_accuracy_list, '-', linewidth = 1, label = "Train Accuracy ")
ax[1].plot(range(len(test_accuracy_list)), test_accuracy_list, '-', linewidth = 1, label = "Val Accuracy ") if valset else None
ax[1].set_xlabel('epoch')
ax[1].set_ylabel('accuracy')
ax[1].grid(True)
plt.legend()
plt.savefig("MRSE_Cut_256_300")

In [None]:
model.load_state_dict(torch.load("./checkpoint/modifiedresnet.pth"))

import pandas as pd
model.eval()
correct = 0
total = 0
predictions = []
actuals = []
with torch.no_grad():
    for inputs, labels in testloader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        predictions.extend(predicted.tolist())
        actuals.extend(labels.tolist())

differ = {}
for i in predictions:
    if i in differ:
        differ[i] += 1
    else:
        differ[i] = 0
for i, j in differ.items():
    print(f"{i} : {j}")
df = pd.DataFrame({'ID': actuals, 'Labels': predictions})
df.to_csv('submission.csv', index=False)

In [None]:
trail = np.vstack([train_accuracy_list, test_accuracy_list])
np.save("./trail", trail)