In [57]:
import os
import shutil
from collections import OrderedDict

import torch 
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, datasets
#from torchsummary.torchsummary import summary

from torch.utils.data import DataLoader, random_split

In [59]:
import torch
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, random_split

def unpickle(file):
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def dataloader_cifar():
    # Define transforms
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    # Load CIFAR-10 training data from local directory
    train_dataset = datasets.CIFAR10(
        root='/Users/aniketmane/Desktop/sem-2/DL/Project1/deep-learning-spring-2025-project-1/cifar-10-python',
        train=True,
        download=False,  # Set to False since data is already present
        transform=transform
    )

    # Split into training and validation sets (45000/5000)
    train_dataset, val_dataset = random_split(train_dataset, [45000, 5000])

    # Load CUSTOM TEST DATA (replace with your custom test data path)
    # Example: Custom test data stored in a file called 'custom_test_batch'
    test_dict = unpickle('/Users/aniketmane/Desktop/sem-2/DL/Project1/deep-learning-spring-2025-project-1/cifar-10-python/custom_test_batch')
    test_data = test_dict[b'data']  # Shape: [N, 3072]
    test_ids = test_dict[b'ids']    # List of N sample IDs

    # Convert test data to PyTorch Dataset
    class CustomTestDataset(torch.utils.data.Dataset):
        def __init__(self, data, ids, transform=None):
            self.data = data.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)  # Reshape to [N, 32, 32, 3]
            self.ids = ids
            self.transform = transform

        def __len__(self):
            return len(self.ids)

        def __getitem__(self, idx):
            image = self.data[idx]
            if self.transform:
                image = self.transform(image)
            return image, self.ids[idx]

    test_dataset = CustomTestDataset(test_data, test_ids, transform=transform)

    # Create DataLoaders
    BATCH_SIZE = 32
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    return train_loader, val_loader, test_loader

In [11]:
!nvidia-smi

zsh:1: command not found: nvidia-smi


In [61]:
class LambdaLayer(nn.Module):
    """
    This class defines a Lambda Layer. It allows to perform arbitrary operations specified by the "lambd" argument.

    Attributes:
        lambd: a function that defines the operation to be performed on the input.
    """
    
    def __init__(self, lambd):
        """
        Init method for the Lambda Layer.

        Args:
            lambd (function): Function that defines the operation to be performed on the input.
        """
        super(LambdaLayer, self).__init__()
        self.lambd = lambd
    
    def forward(self, x):
        """
        Forward pass of the Lambda Layer. It applies the function to the input.

        Args:
            x (torch.Tensor): Input tensor to the Lambda Layer.

        Returns:
            torch.Tensor: The output of the Lambda Layer after applying the function.
        """
        return self.lambd(x)

class BasicConvBlock(nn.Module):
    
    ''' The BasicConvBlock takes an input with in_channels, applies some blocks of convolutional layers 
    to reduce it to out_channels and sum it up to the original input. 
    If their sizes mismatch, then the input goes into an identity. 
    
    Basically The BasicConvBlock will implement the regular basic Conv Block + 
    the shortcut block that does the dimension matching job (option A or B) when dimension changes between 2 blocks
    '''
    
    def __init__(self, in_channels, out_channels, stride=1, option='A'):
        """
        Init method for the Basic Convolution Block.

        Args:
            in_channels (int): Number of channels in the input tensor.
            out_channels (int): Number of channels in the output tensor.
            stride (int, optional): Stride for the convolution operation. Default is 1.
            option (str, optional): Option for the shortcut connection to match dimensions. Default is 'A'.
        """
        super(BasicConvBlock, self).__init__()
        
        self.features = nn.Sequential(OrderedDict([
            ('conv1', nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)),
            ('bn1', nn.BatchNorm2d(out_channels)),
            ('act1', nn.ReLU()),
            ('conv2', nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)),
            ('bn2', nn.BatchNorm2d(out_channels))
        ]))

        self.shortcut = nn.Sequential()
        
        '''  When input and output spatial dimensions don't match, we have 2 options, with stride:
            - A) Use identity shortcuts with zero padding to increase channel dimension.    
            - B) Use 1x1 convolution to increase channel dimension (projection shortcut).
         '''
        if stride != 1 or in_channels != out_channels:
            if option == 'A':
                # Use identity shortcuts with zero padding to increase channel dimension.
                pad_to_add = out_channels//4
                ''' ::2 is doing the job of stride = 2
                F.pad apply padding to (W,H,C,N).
                
                The padding lengths are specified in reverse order of the dimensions,
                F.pad(x[:, :, ::2, ::2], (0,0, 0,0, pad,pad, 0,0))

                [width_beginning, width_end, height_beginning, height_end, channel_beginning, channel_end, batchLength_beginning, batchLength_end ]

                '''
                self.shortcut = LambdaLayer(lambda x:
                            F.pad(x[:, :, ::2, ::2], (0,0, 0,0, pad_to_add, pad_to_add, 0,0)))
            if option == 'B':
                self.shortcut = nn.Sequential(OrderedDict([
                    ('s_conv1', nn.Conv2d(in_channels, 2*out_channels, kernel_size=1, stride=stride, padding=0, bias=False)),
                    ('s_bn1', nn.BatchNorm2d(2*out_channels))
                ]))
        
    def forward(self, x):
        """
        Forward pass of the Basic Convolution Block. It applies the sequence of layers and adds the shortcut connection.

        Args:
            x (torch.Tensor): Input tensor to the Basic Convolution Block.

        Returns:
            torch.Tensor: The output of the Basic Convolution Block.
        """
        out = self.features(x)
        # sum it up with shortcut layer
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [25]:
class ResNet(nn.Module):
    """ ResNet-56 architecture for CIFAR-10 Dataset of shape 32*32*3.
    
    Args:
        block_type (nn.Module): The type of residual block to use.
        num_blocks (list): List containing the number of blocks for each layer.
    
    Attributes:
        in_channels (int): Number of input channels.
        conv0 (nn.Conv2d): Initial convolutional layer.
        bn0 (nn.BatchNorm2d): Batch normalization layer.
        block1 (nn.Sequential): First block layer.
        block2 (nn.Sequential): Second block layer.
        block3 (nn.Sequential): Third block layer.
        avgpool (nn.AdaptiveAvgPool2d): Adaptive average pooling layer.
        linear (nn.Linear): Linear layer for classification. """
    def __init__(self, block_type, num_blocks):
        super(ResNet, self).__init__()
        
        self.in_channels = 16
        
        self.conv0 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn0 = nn.BatchNorm2d(16)
        
        self.block1 = self.__build_layer(block_type, 16, num_blocks[0], starting_stride=1)
        
        self.block2 = self.__build_layer(block_type, 32, num_blocks[1], starting_stride=2)
        
        self.block3 = self.__build_layer(block_type, 64, num_blocks[2], starting_stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.linear = nn.Linear(64, 10)
    
    def __build_layer(self, block_type, out_channels, num_blocks, starting_stride):
        """
        Build a layer consisting of multiple residual blocks.
        
        Args:
            block_type (nn.Module): The type of residual block to use.
            out_channels (int): Number of output channels.
            num_blocks (int): Number of blocks in the layer.
            starting_stride (int): Stride value for the first block.
        
        Returns:
            nn.Sequential: Sequential container of the residual blocks.
        """
        
        strides_list_for_current_block = [starting_stride] + [1]*(num_blocks-1)
        ''' Above line will generate an array whose first element is starting_stride
        And it will have (num_blocks-1) more elements each of value 1
         '''
        # print('strides_list_for_current_block ', strides_list_for_current_block)
        
        layers = []
        
        for stride in strides_list_for_current_block:
            layers.append(block_type(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        
        return nn.Sequential(*layers)
    
    def forward(self, x):
        """
        Forward pass of the ResNet model.
        
        Args:
            x (torch.Tensor): Input tensor.
        
        Returns:
            torch.Tensor: Output tensor.
        """
        out = F.relu(self.bn0(self.conv0(x)))
        out = self.block1(out)
        out = self.block2(out)        
        out = self.block3(out)
        out = self.avgpool(out)
        out = torch.flatten(out, 1)
        out = self.linear(out)
        return out

In [63]:
def ResNet56():
    return ResNet(block_type=BasicConvBlock, num_blocks=[9,9,9])

In [65]:
model = ResNet56()
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
#device = 'cpu'
model.to(device)
#summary(model, (3, 32, 32))

ResNet(
  (conv0): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn0): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (block1): Sequential(
    (0): BasicConvBlock(
      (features): Sequential(
        (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (act1): ReLU()
        (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (shortcut): Sequential()
    )
    (1): BasicConvBlock(
      (features): Sequential(
        (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (act1): ReLU()
        (conv2): Con

In [69]:
class CustomTestDataset(torch.utils.data.Dataset):
    def __init__(self, data, ids, transform=None):
        self.data = data.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)  # Shape [N, 32, 32, 3]
        self.ids = ids
        self.transform = transform

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, idx):
        image = self.data[idx]
        if self.transform:
            image = self.transform(image)
        return image, self.ids[idx]

In [79]:
def dataloader_cifar():
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Fixed for 3 channels
    ])

    # Load CIFAR-10 Training Data
    train_dataset = datasets.CIFAR10(
        root='/Users/aniketmane/Desktop/sem-2/DL/Project1/deep-learning-spring-2025-project-1/cifar-10-python',
        train=True,
        download=False,
        transform=transform
    )

    # Load CUSTOM TEST DATA (correct path)
    test_dict = unpickle('/Users/aniketmane/Desktop/sem-2/DL/Project1/deep-learning-spring-2025-project-1/cifar_test_nolabel.pkl')  # Include filename
    test_data = test_dict[b'data']
    test_ids = test_dict[b'ids']
    test_dataset = CustomTestDataset(test_data, test_ids, transform=transform)

    # Split into training and validation sets
    train_dataset, val_dataset = random_split(train_dataset, (45000, 5000))

    print("Image shape of a random sample image: {}".format(train_dataset[0][0].shape))
    print("Training Set: {} images".format(len(train_dataset)))
    print("Validation Set: {} images".format(len(val_dataset)))
    print("Test Set: {} images".format(len(test_dataset)))

    BATCH_SIZE = 32
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

    return train_loader, val_loader, test_loader

In [81]:
train_loader, val_loader, test_loader = dataloader_cifar()

Image shape of a random sample image: torch.Size([3, 32, 32])
Training Set: 45000 images
Validation Set: 5000 images
Test Set: 10000 images


Training 


In [83]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [91]:
def train_model():
    EPOCHS = 30
    train_samples_num = 45000
    val_samples_num = 5000
    train_costs, val_costs = [], []
    
    #Training phase.    
    for epoch in range(EPOCHS):

        train_running_loss = 0
        correct_train = 0
        
        model.train().cuda()
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            """ for every mini-batch during the training phase, we typically want to explicitly set the gradients 
            to zero before starting to do backpropragation """
            optimizer.zero_grad()
            
            # Start the forward pass
            prediction = model(inputs)
                        
            loss = criterion(prediction, labels)
          
            # do backpropagation and update weights with step()
            loss.backward()         
            optimizer.step()
            
            # print('outputs on which to apply torch.max ', prediction)
            # find the maximum along the rows, use dim=1 to torch.max()
            _, predicted_outputs = torch.max(prediction.data, 1)
            
            # Update the running corrects 
            correct_train += (predicted_outputs == labels).float().sum().item()
            
            ''' Compute batch loss
            multiply each average batch loss with batch-length. 
            The batch-length is inputs.size(0) which gives the number total images in each batch. 
            Essentially I am un-averaging the previously calculated Loss '''
            train_running_loss += (loss.data.item() * inputs.shape[0])


        train_epoch_loss = train_running_loss / train_samples_num
        
        train_costs.append(train_epoch_loss)
        
        train_acc =  correct_train / train_samples_num

        # Now check trained weights on the validation set
        val_running_loss = 0
        correct_val = 0
      
        model.eval().cuda()
    
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                # Forward pass.
                prediction = model(inputs)

                # Compute the loss.
                loss = criterion(prediction, labels)

                # Compute validation accuracy.
                _, predicted_outputs = torch.max(prediction.data, 1)
                correct_val += (predicted_outputs == labels).float().sum().item()

            # Compute batch loss.
            val_running_loss += (loss.data.item() * inputs.shape[0])

            val_epoch_loss = val_running_loss / val_samples_num
            val_costs.append(val_epoch_loss)
            val_acc =  correct_val / val_samples_num
        
        info = "[Epoch {}/{}]: train-loss = {:0.6f} | train-acc = {:0.3f} | val-loss = {:0.6f} | val-acc = {:0.3f}"
        
        print(info.format(epoch+1, EPOCHS, train_epoch_loss, train_acc, val_epoch_loss, val_acc))
        
        torch.save(model.state_dict(), '/content/checkpoint_gpu_{}'.format(epoch + 1)) 
                                                                
    torch.save(model.state_dict(), '/content/resnet-56_weights_gpu')  
        
    return train_costs, val_costs

    

In [89]:
test_samples_num = 10000
correct = 0 

model.eval().cuda()

with  torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        # Make predictions.
        prediction = model(inputs)

        # Retrieve predictions indexes.
        _, predicted_class = torch.max(prediction.data, 1)

        # Compute number of correct predictions.
        correct += (predicted_class == labels).float().sum().item()

test_accuracy = correct / test_samples_num
print('Test accuracy: {}'.format(test_accuracy))


AssertionError: Torch not compiled with CUDA enabled

In [None]:
model.eval().to(device)
predictions = []
ids = []

with torch.no_grad():
    for images, batch_ids in test_loader:
        images = images.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        predictions.extend(predicted.cpu().numpy())
        ids.extend(batch_ids.numpy())

# Create DataFrame and save as CSV
import pandas as pd
df = pd.DataFrame({'ID': ids, 'Labels': predictions})
df.to_csv('submission.csv', index=False)
print("Submission CSV saved!")