In [None]:
!mkdir -p ~/.content/competitiondata # this is used to create the content for the
# custom test data to generate the submission csv


In [None]:
from google.colab import files # used in colab for uploading files
files.upload()  # Used to upload the model.pth and the kaggle.json API for volab

# Make sure kaggle.json is in the location ~/.kaggle/kaggle.json
# all these are commands to get the custom data set for the CIFAR-10
# it isnt neccesary if the test data is something else.
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle competitions download -c deep-learning-mini-project-spring-24-nyu
!unzip deep-learning-mini-project-spring-24-nyu.zip -d competitiondata

In [1]:
# imports
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.cuda.amp import GradScaler, autocast

In [None]:
# this is the residual block adapted from https://github.com/drgripal/resnet-cifar10

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None, padding=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=padding, bias=False) # trying same padding
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.gelu = nn.GELU() # adding in a gelu layer for testing
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=padding, bias=False) # valid padding trying
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample # this is a downsample that was used in teh residual block default is none

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        identity = nn.AdaptiveAvgPool2d(out.size()[2])(identity)
        out += identity
        out = self.gelu(out) # trying out a gelu for one of the blocks

        return out

class ModifiedResNet(nn.Module):
    def __init__(self, block, layers, num_classes=10):
        super(ModifiedResNet, self).__init__()
        self.in_channels = 16 # number of output channels
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.dropout = nn.Dropout(0.432); # dropout layer probability
        self.dropoutlow = nn.Dropout(0.287) # lower dropout prob
        self.bn1 = nn.BatchNorm2d(16) # a smaller batch norm
        self.maxpool1 = nn.AdaptiveMaxPool2d(24) # smaller max pool
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(block, 32, layers[0])
        self.layer2 = self._make_layer(block, 64, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 115, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 256, layers[3], stride=2)
        # Adjust the layer configuration to stay under 5 million parameters
        self.avgpool = nn.AdaptiveAvgPool2d((3, 3))
        self.fc = nn.Linear(256 * 3 * 3, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        # this function creates the residual blcoks and then
        # adds in the downsampling if neccesary
        downsample = None
        if stride != 1 or self.in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels),
            )
        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels
        for _ in range(1, blocks):
            layers.append(block(out_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        # the forward pass
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)

        x = self.layer1(x)
        x = self.dropoutlow(x)
        x = self.layer2(x)
        x = self.maxpool1(x)
        x = self.layer3(x)
        x = self.dropoutlow(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


model = ModifiedResNet(ResidualBlock, [3, 4, 11, 2]).to('cuda')
# the model being created

# Use torchsummary for a detailed summary and parameter count
from torchsummary import summary
summary(model, (3, 32, 32))


In [None]:
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),  # Add vertical flip
    transforms.RandomRotation(10),    # Add random rotation
    transforms.RandomCrop(32, 4),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),  # Add random translation
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),  # Add color jitter
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])



# Use the function to load the data and the test data

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True)

In [3]:
#defining model, optimizer, regularization, hyperparameters

# hyperparameters
epochs = 200
lr = 1e-4
grad_accumulation = 3
model_save = 5
# use modified ResNet model
GPU = True

# define loss function and optimizer
criterion = nn.CrossEntropyLoss() # the loss function used
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=0.001)
#optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.1) tried SGD with diff momentums
scaler = GradScaler(enabled=GPU) # the scaler for mixed avergae precision
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2) # the LR scheduler

# loading in the functions from the best_model deep for training and testing
# otherwise just the model can be used with training enabled
# uncomment these lines to load from the best_model.pth
#checkpoint = torch.load('best_model_deep.pth')
#model.load_state_dict(checkpoint['model_state_dict'])
#optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
#scaler.load_state_dict(checkpoint['scaler'])
#scheduler.load_state_dict(checkpoint['scheduler'])

In [None]:
# training model
import time

# train model and keep a tab on running loss
for epoch in range(epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0

    for i, data in enumerate(trainloader, 0):
        inputs, labels = data[0].cuda(), data[1].cuda()

        # amap for speeding up training
        with autocast(enabled=GPU):
            outputs = model(inputs)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        # grad accumulation for speeding training
        if (i + 1) % grad_accumulation == 0:
          scaler.step(optimizer)
          scaler.update()
          optimizer.zero_grad()

        running_loss += loss.item()
        if i % 200 == 0:    # print loss after 200 iters and first iter
            print(f'[{epoch + 1}, {i + 1}] loss: {running_loss / 200:.3f}')
            running_loss = 0.0

    scheduler.step()

    # save model incase something gooes wrong
    if epoch % model_save == 0:
        print("just saved model incase something goes wrong")
        model.cpu()  # Move model to CPU
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            'scheduler': scheduler.state_dict(),
            'scaler': scaler.state_dict(),
            }, "model.pth")
        model.cuda()  # Move model back to GPU if further training is needed


    end_time = (time.time() - start_time ) // 1
    print(f"finished epoch {epoch + 1} in {end_time} seconds" )

print('Finished Training')
print("just saved final model incase something goes wrong")
model.cpu()  # Move model to CPU
# save final model
torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            'scheduler': scheduler.state_dict(),
            'scaler': scaler.state_dict(),
            }, "model.pth")
model.cuda()  # Move model back to GPU if further training is needed

In [None]:
# testing data for colab if the testing is done on the custom data
# testing data code adopted from https://github.com/hzhao20/DLMiniproject/blob/main/GenerateCSV.py.

import torch
import numpy as np
import pandas as pd
import pickle
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset

# device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# unpickle function
def unpickle(file):
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

# load
transform = transforms.Compose([
    #transforms.ToPILImage(),# this is used to transform to an image and should be uncommented if using custom data
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # use for the same noramlization as test
])


testdata = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
test_loader = DataLoader(testdata, batch_size=64, shuffle=False)
# test data can be commented out if not custom data

"""
All this here is loading for the test data that was from teh competition
test_data_dict = unpickle('/content/competitiondata/cifar_test_nolabels.pkl')
test_images = test_data_dict[b'data']
test_ids = test_data_dict[b'ids']


# transform
test_images = test_images.reshape(len(test_images), 3, 32, 32).transpose(0, 2, 3, 1)  # reshaping data to the proper formats

# preprocess


test_images = torch.stack([transform(img) for img in test_images])

# data loader
#test_loader = DataLoader(TensorDataset(test_images, torch.tensor(test_ids)), batch_size=64, shuffle=False)
# the top line can be uncommented if the testing is done on the custom data
# else keeping it commented should load the original CIFAR-10 data set
"""

# load model
model = model.to(device) # prolly on cuda
model.eval()

# predict
correct = 0
total = 0
predicted_labels = []
# labels should be replaced with _ if CIFAR 10 data
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = data[0].to(device), data[1].to(device)
        _, preds = torch.max(outputs, 1)
        predicted_labels.extend(preds.cpu().numpy())
        total += labels.size(0) # this line shoiuld be commented out if competition data
        correct += (preds == labels).sum().item() # this line should be commented out if competition data

print(f'Accuracy of the network on the 10000 test images: {100 * correct / total} %')
# this should be commented it custom data

# generate CSV
"""
uncomment these lines if running to generate the competiion csv
submission_df = pd.DataFrame({
    'ID': test_ids,
    'Labels': predicted_labels
})

predicted_labels = np.array(predicted_labels)

submission_df.to_csv('submission.csv', index=False)
"""