In [33]:
import torch
import torch.nn as nn
from torch import Tensor

class ConvNet(nn.Module):
    """
    A simple convolutional network with the following architecture:

    [conv - bn - relu] x M - global_average_pooling - affine - softmax

    "[conv - bn - relu] x M" means the "conv-bn-relu" block is repeated for
    M times, where M is implicitly defined by the convolution layers' parameters.
    Whether to use the batch normalization layer (bn) in-between is a design choice.

    For each convolution layer, we do downsampling of factor 2 by setting the stride
    to be 2. So we can have a large receptive field size.

    The network operates on minibatches of data that have shape (N, C, H, W)
    consisting of N images, each with height H and width W and with C input
    channels.
    """

    def __init__(self, input_dim=(3, 32, 32), filter_sizes=[7], filter_channels=[32],
            num_classes=10, use_batch_norm=True):
        """
        Initialize a new CNN.

        Inputs:
        - input_dim: Tuple (C, H, W) giving size of input data
        - filter_sizes: Width/height of filters to use in the convolutional layer. It is a
          list whose length defines the number of convolution layers.
        - filter_channels: Number of filters to use in each convolutional layer. It has the
          same length as filter_sizes.
        - num_classes: Number of output classes
        - use_batch_norm: A boolean variable indicating whether to use batch normalization
        """
        super().__init__()

        assert len(filter_sizes) == len(filter_channels), "Inconsistent filter sizes and channels."

        ############################################################################
        # TODO: Define a set of layers according to the user input.                #
        #                                                                          #
        # IMPORTANT:                                                               #
        # 1. For this assignment, you can assume that the padding of the every     #
        # convolutional layer are chosen so that **the width and height of the     #
        # input are preserved** (without considering the stride). You need to      #
        # carefully set the `pad` parameter for the convolution.                   #
        #                                                                          #
        # 2. For each convolution layer, we use stride of 2 to do downsampling.    #
        ############################################################################
        self.use_batch_norm = use_batch_norm
        C, H, W = input_dim
        layers = []
        in_channels = C

        # build each conv-bn-relu block
        for i in range(len(filter_sizes)):
            filter_size = filter_sizes[i]
            out_channels = filter_channels[i]

            # padding to preserve H and W
            padding = (filter_size - 1) // 2
            stride = 2
            use_bias = not use_batch_norm

            # define the conv layer
            curr_layer = nn.Conv2d(in_channels, out_channels, kernel_size= filter_size,
                                    stride= stride, padding=padding, bias= use_bias)

            layers.append(curr_layer)
            if use_batch_norm:
                layers.append(nn.BatchNorm2d(out_channels))
            layers.append(nn.ReLU(inplace=True))

            in_channels = out_channels
        # stack the layers together in order
        self.conv_layers = nn.Sequential(*layers)
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(in_channels, num_classes)
        ############################################################################
        #                             END OF YOUR CODE                             #
        ############################################################################

    def forward(self, x):
        logits = None
        feat_before_gap = None

        ############################################################################
        # TODO: Implement the forward pass for the simple convolutional net,       #
        # computing the class scores for x and storing them in the logits          #
        # variable. Also, store the feature map right before the global average    #
        # pooling (GAP) layer in the feat_before_gap variable for debugging        #
        # purpose only.                                                            #
        ############################################################################
        out = self.conv_layers(x)
        feat_before_gap = out
        out = self.global_avg_pool(out)
        out = torch.flatten(out, 1)
        # we won't apply softmax directly because we are gonna use cross entropy loss
        logits = self.fc(out)
        ############################################################################
        #                             END OF YOUR CODE                             #
        ############################################################################

        return logits, feat_before_gap

In [34]:
import torch
import torch.nn as nn

# Function to test an already trained model
def test_model(model, data_loader):
    """
    Compute accuracy of the model.

    Inputs:
      - model: A CNN implemented in PyTorch
      - data_loader: A data loader that will provide batched images and labels
    """

    # set the model in evaluation mode so the batch norm layers will behave correctly
    model.eval()

    correct = 0
    total = 0
    # since we're not training, we don't need to calculate the gradients for our outputs
    with torch.no_grad():
        for batch_data in data_loader:
            images, labels = batch_data
            images = images.cuda()
            labels = labels.cuda()
            labels = labels.long()

            predicted = None
            ############################################################################
            # TODO: Compute the predicted labels of the batched input images and store #
            # them in the predicted varaible.                                          #
            ############################################################################
            outputs, _ = model(images)
            # torch max returns the max value and the index at whih they happen at
            # we use the index as the predicted label
            _, predicted = torch.max(outputs.data, 1)
            ############################################################################
            #                             END OF YOUR CODE                             #
            ############################################################################

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    acc = 100 * correct // total
    return acc

In [35]:


def train_val_model(model, train_data_loader, val_data_loader, loss_fn, optimizer, lr_scheduler, num_epochs, print_freq=50):

    """
    Training and validating a CNN model using PyTorch.

    Inputs:
      - model: A CNN implemented in PyTorch
      - data_loader: A data loader that will provide batched images and labels
      - loss_fn: A loss function (e.g., cross entropy loss)
      - lr_scheduler: Learning rate scheduler
      - num_epochs: Number of epochs in total
      - print_freq: Frequency to print training statistics

    Output:
      - model: Trained CNN model
    """

    for epoch_i in range(num_epochs):
        # set the model in the train mode so the batch norm layers will behave correctly
        model.train()

        running_loss = 0.0
        running_total = 0.0
        running_correct = 0.0
        for i, batch_data in enumerate(train_data_loader):
            # Every data instance is an image + label pair
            images, labels = batch_data
            images = images.cuda()
            labels = labels.cuda()
            labels = labels.long()

            predicted = None
            ############################################################################
            # TODO: Finish loss computation, gradient backpropagation, weight update,  #
            # and computing the predicted labels of the input images and store them in #
            # the predicted varaible, which will be used to monitor the training       #
            # accuracy.                                                                #
            #                                                                          #
            # Note: The learning rate is updated after each **epoch**.                 #
            ############################################################################
            # zero the parameter gradients
            optimizer.zero_grad()
            outputs, _ = model(images)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            _, predicted = torch.max(outputs.data, 1)
            ############################################################################
            #                             END OF YOUR CODE                             #
            ############################################################################

            # print statistics
            running_loss += loss.item()
            running_total += labels.size(0)
            running_correct += (predicted == labels).sum().item()
            if i % print_freq == 0:    # print every certain number of mini-batches
                running_loss = running_loss / print_freq
                running_acc = running_correct / running_total * 100
                last_lr = lr_scheduler.get_last_lr()[0]
                print(f'[{epoch_i + 1}/{num_epochs}, {i + 1:5d}/{len(train_data_loader)}] loss: {running_loss:.3f} acc: {running_acc:.3f} lr: {last_lr:.5f}')
                running_loss = 0.0
                running_total = 0.0
                running_correct = 0.0

        # adjust the learning rate
        lr_scheduler.step()

        val_acc = test_model(model, val_data_loader)
        print(f'[{epoch_i + 1}/{num_epochs}] val acc: {val_acc:.3f}')

    return model

In [36]:
def set_up_loss_optimizer_lr_scheduler(model, learning_rate, momentum, lr_step_size, lr_gamma):
    """
    In this programming assignment, we will adopt the most common choice for the optimizer:
    SGD + momentum and learning rate scheduler: StepLR. Please refer to https://pytorch.org/docs/stable/optim.html#algorithms
    and https://pytorch.org/docs/stable/generated/torch.optim.lr_scheduler.StepLR.html#torch.optim.lr_scheduler.StepLR for more details.
    """
    loss_fn = None
    optimizer = None
    lr_scheduler = None

    ############################################################################
    # TODO: Define the loss function, optimizer (SGD + momentum), and          #
    # learning rate scheduler (StepLR).                                        #
    #                                                                          #
    # Note: We expect you to set up the learning rate in an epoch-based way.   #
    # We will run the learning rate scheduler after each epoch.                #
    ############################################################################
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr= learning_rate, momentum= momentum)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size= lr_step_size, gamma= lr_gamma)
    ############################################################################
    #                             END OF YOUR CODE                             #
    ############################################################################

    return loss_fn, optimizer, lr_scheduler

In [42]:
import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import torch

def get_dataloaders(data_dir="../datasets/fer2013", batch_size=64, val_split=0.1, 
                   model_type='efficient', augmentation=True):
    """
    Get data loaders for FER2013 dataset.
    
    Args:
        model_type: 'efficient' for our CNN, 'resnet' for ResNet
        augmentation: Whether to use data augmentation for training
    """
    print("hi", data_dir)
    if data_dir is None:
        data_dir = os.path.join(os.path.dirname(__file__), "..", "datasets", "fer2013")
        data_dir = os.path.abspath(data_dir)
    
    if model_type == 'resnet':
        # original ResNet configuration
        transform_train = transforms.Compose([
            transforms.Grayscale(num_output_channels=3), # ResNet expects 3 channels
            transforms.Resize((224, 224)), # Resnet input size is (224, 224)
            transforms.ToTensor(),
            # Mean and std come from ImageNet dataset used to train ResNet
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        transform_test = transform_train
        
    else: # our CNN model
        if augmentation:
            transform_train = transforms.Compose([
                transforms.Grayscale(num_output_channels=1),
                transforms.Resize((48, 48)),
                transforms.RandomHorizontalFlip(p=0.5),
                transforms.RandomRotation(degrees=10),
                transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
                transforms.ColorJitter(brightness=0.2, contrast=0.2),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5], std=[0.5]),  # Grayscale normalization
                transforms.RandomErasing(p=0.1, scale=(0.02, 0.33))  # Cutout augmentation
            ])
        else:
            transform_train = transforms.Compose([
                transforms.Grayscale(num_output_channels=1),
                transforms.Resize((48, 48)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.5], std=[0.5])
            ])
        
        # Test/val without augmentation
        transform_test = transforms.Compose([
            transforms.Grayscale(num_output_channels=1),
            transforms.Resize((48, 48)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5])
        ])
    
    # Load datasets
    training_data = datasets.ImageFolder(os.path.join(data_dir, "train"), transform=transform_train)
    testing_data = datasets.ImageFolder(os.path.join(data_dir, "test"), transform=transform_test)
    # training data validation
    validation_size = int(len(training_data) * val_split)
    training_size = len(training_data) - validation_size
    
    # Use a fixed seed for reproducible splits
    generator = torch.Generator().manual_seed(42)
    training_data, validation_data = random_split(
        training_data, 
        [training_size, validation_size],
        generator=generator
    )
    
    # For validation data, we need to override the transform to remove augmentation
    if model_type != 'resnet':
        validation_data.dataset.transform = transform_test
    
    # Create data loaders with optimized settings
    training_loader = DataLoader(
        training_data, 
        batch_size=batch_size, 
        shuffle=True, 
        num_workers=4,  # Increase if you have more CPU cores
        pin_memory=True,  # Faster GPU transfer
        persistent_workers=True  # Keep workers alive between epochs
    )
    
    validation_loader = DataLoader(
        validation_data, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=4,
        pin_memory=True,
        persistent_workers=True
    )
    
    testing_loader = DataLoader(
        testing_data, 
        batch_size=batch_size, 
        shuffle=False, 
        num_workers=4,
        pin_memory=True,
        persistent_workers=True
    )
    
    num_classes = len(testing_data.classes)
    print(f"Dataset loaded: {training_size} train, {validation_size} val, {len(testing_data)} test samples")
    print(f"Number of classes: {num_classes}")
    print(f"Batch size: {batch_size}")
    
    return training_loader, validation_loader, testing_loader, num_classes

In [38]:
# In practice, this is a hyperparameter to tune.
# But here we use a fixed number to make the comparisons fair.
num_epochs = 3

model = None
loss_fn = None
optimizer = None
lr_scheduler = None
############################################################################
# TODO: Set up and tune the hyper parameters.                              #
############################################################################
batch_size = 16
learning_rate = 0.001
momentum = 0.99
lr_gamma = 1

model = ConvNet(input_dim=(1, 48, 48),
                filter_sizes=[3, 3, 3, 3],
                filter_channels=[64, 128, 256, 512], 
                use_batch_norm=True)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr= learning_rate, momentum= momentum)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size= 1, gamma= lr_gamma)
############################################################################
#                             END OF YOUR CODE                             #
############################################################################

model = model.cuda()
num_params = sum(p.numel() for p in model.parameters())
print('Number of parameters: {:.3f}K'.format(num_params / 1000))

# set up the data loaders
# note the usage of the batch_size hyperparameter here
train_loader, val_loader, test_loader, _ = get_dataloaders(model_type='cnn', batch_size=batch_size)

model = train_val_model(model, train_loader, val_loader, loss_fn, optimizer, lr_scheduler, num_epochs)
test_acc = test_model(model, test_loader)
print(f"testing accuracy: {test_acc:.3f}")

Conv layer 0: in_channels=1, out_channels=64
Conv layer 3: in_channels=64, out_channels=128
Conv layer 6: in_channels=128, out_channels=256
Conv layer 9: in_channels=256, out_channels=512
Number of parameters: 1555.914K
hi ../datasets/fer2013
Dataset loaded: 25839 train, 2870 val, 7178 test samples
Number of classes: 7
Batch size: 16
[1/3,     1/1615] loss: 0.048 acc: 0.000 lr: 0.00100
[1/3,    51/1615] loss: 1.981 acc: 22.625 lr: 0.00100
[1/3,   101/1615] loss: 1.993 acc: 21.125 lr: 0.00100
[1/3,   151/1615] loss: 1.971 acc: 22.750 lr: 0.00100
[1/3,   201/1615] loss: 1.959 acc: 22.500 lr: 0.00100
[1/3,   251/1615] loss: 1.865 acc: 23.500 lr: 0.00100
[1/3,   301/1615] loss: 1.776 acc: 28.250 lr: 0.00100
[1/3,   351/1615] loss: 1.815 acc: 27.125 lr: 0.00100
[1/3,   401/1615] loss: 1.781 acc: 26.875 lr: 0.00100
[1/3,   451/1615] loss: 1.828 acc: 28.000 lr: 0.00100
[1/3,   501/1615] loss: 1.771 acc: 27.875 lr: 0.00100
[1/3,   551/1615] loss: 1.835 acc: 25.750 lr: 0.00100
[1/3,   601/1615]

In [39]:
# Resnet like CNN
from functools import partial
from typing import Any, Callable, List, Optional, Type, Union
from torchvision.models.resnet import conv1x1, conv3x3, BasicBlock, Bottleneck, ResNet
from torch import Tensor

class MyResNet(ResNet):
    def __init__(
        self,
        block: Type[Union[BasicBlock, Bottleneck]],
        layers: List[int],
        num_classes: int = 1000,
        zero_init_residual: bool = False,
        groups: int = 1,
        width_per_group: int = 64,
        replace_stride_with_dilation: Optional[List[bool]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        """
        Here we will design a model architecture MyResNet, inherited from the ResNet model.
        First check here https://github.com/pytorch/vision/blob/main/torchvision/models/resnet.py about the
        implementation of ResNet in PyTorch.
        What you need to do in this part is the remove the layer3 and layer4 and also modify the final
        fully-connected layer accordingly.
        """
        super().__init__(
            block, layers, num_classes, zero_init_residual, groups,
            width_per_group, replace_stride_with_dilation, norm_layer
        )

        ############################################################################
        # TODO: Remove the layer3 and layer4 block in the original implementation  #
        # of ResNet and modify the fully-connected layer (classifier) accordingly. #
        ############################################################################
        self.layer3 = nn.Identity()
        self.layer4 = nn.Identity()
        self.fc = nn.Linear(128 * block.expansion, num_classes)
        ############################################################################
        #                             END OF YOUR CODE                             #
        ############################################################################

    def _forward_impl(self, x: Tensor) -> Tensor:
        logits = None
        feat_before_gap = None
        ############################################################################
        # TODO: Implement the forward pass for the ResNet-like model,              #
        # computing the class scores for x and storing them in the logits          #
        # variable. Also, store the feature map right before the global average    #
        # pooling (GAP) layer in the feat_before_gap variable for debugging        #
        # purpose only.                                                            #
        ############################################################################
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        # pass through layer1 and layer2 only
        x = self.layer1(x)
        x = self.layer2(x)
        feat_before_gap = x
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        logits = self.fc(x)
        ############################################################################
        #                             END OF YOUR CODE                             #
        ############################################################################

        return logits, feat_before_gap

In [43]:
# In practice, this is a hyperparameter to tune.
# But here we use a fixed number to make the comparisons fair.
num_epochs = 3

model = MyResNet(BasicBlock, [2, 2, 2, 2], num_classes=10)
num_params = sum(p.numel() for p in model.parameters())
print('Number of parameters: {:.3f}K'.format(num_params / 1000))

############################################################################
# TODO: Set up and tune the hyper parameters.                              #
############################################################################
batch_size = 64
learning_rate = 0.001
momentum = 0.99
lr_gamma = 0.99

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr= learning_rate, momentum= momentum)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size= 1, gamma= lr_gamma)
############################################################################
#                             END OF YOUR CODE                             #
############################################################################

# set up the data loaders
# note the usage of the batch_size hyperparameter here
train_loader, val_loader, test_loader, _ = get_dataloaders(model_type='resnet', batch_size=batch_size)

model = model.cuda()
model = train_val_model(model, train_loader, val_loader, loss_fn, optimizer, lr_scheduler, num_epochs)
test_acc = test_model(model, test_loader)
print(f"testing accuracy: {test_acc:.3f}")

Number of parameters: 684.362K
hi ../datasets/fer2013
Dataset loaded: 25839 train, 2870 val, 7178 test samples
Number of classes: 7
Batch size: 64
[1/3,     1/404] loss: 0.054 acc: 3.125 lr: 0.00100
[1/3,    51/404] loss: 2.029 acc: 19.438 lr: 0.00100
[1/3,   101/404] loss: 1.910 acc: 22.625 lr: 0.00100
[1/3,   151/404] loss: 1.848 acc: 24.281 lr: 0.00100
[1/3,   201/404] loss: 1.836 acc: 24.562 lr: 0.00100
[1/3,   251/404] loss: 1.811 acc: 24.875 lr: 0.00100
[1/3,   301/404] loss: 1.821 acc: 23.906 lr: 0.00100
[1/3,   351/404] loss: 1.788 acc: 25.156 lr: 0.00100


KeyboardInterrupt: 

In [49]:
# Let's experiment with transfer learning by borrowing the weights of a ResNet model pre-trained on ImageNet.
import torchvision
imagenet_resnet50 = torchvision.models.resnet50(weights=torchvision.models.ResNet50_Weights)
model = MyResNet(BasicBlock, [2, 2, 2, 2], num_classes=10)
num_params = sum(p.numel() for p in model.parameters())
print('Number of parameters: {:.3f}K'.format(num_params / 1000))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
imagenet_resnet18 = imagenet_resnet18.to(device)

############################################################################
# TODO: Copy the appropriate weights from imagenet_resnet18 to our custom  #
# model, which shares part of the network architecture.                    #
############################################################################
model.conv1.weight.data = imagenet_resnet18.conv1.weight.data
model.bn1.weight.data = imagenet_resnet18.bn1.weight.data
model.bn1.bias.data = imagenet_resnet18.bn1.bias.data
model.layer1.load_state_dict(imagenet_resnet18.layer1.state_dict())
model.layer2.load_state_dict(imagenet_resnet18.layer2.state_dict())
############################################################################
#                             END OF YOUR CODE                             #
############################################################################

############################################################################
# TODO: Set up and tune the hyper parameters.                              #
############################################################################
learning_rate = 0.001
momentum = 0.9
lr_gamma = 0.95

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr= learning_rate, momentum= momentum)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size= 1, gamma= lr_gamma)
############################################################################
#                             END OF YOUR CODE                             #
############################################################################

# set up the data loaders
# note the usage of the batch_size hyperparameter here
train_loader, val_loader, test_loader, _ = get_dataloaders(model_type='resnet', batch_size=batch_size)

model = model.cuda()
model = train_val_model(model, train_loader, val_loader, loss_fn, optimizer, lr_scheduler, num_epochs)
test_acc = test_model(model, test_loader)
print(f"testing accuracy: {test_acc:.3f}")

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /home/parvareshrizi.a/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:01<00:00, 73.0MB/s]


Number of parameters: 684.362K
hi ../datasets/fer2013
Dataset loaded: 25839 train, 2870 val, 7178 test samples
Number of classes: 7
Batch size: 128
[1/3,     1/202] loss: 0.046 acc: 3.906 lr: 0.00100
[1/3,    51/202] loss: 2.235 acc: 15.656 lr: 0.00100
[1/3,   101/202] loss: 2.096 acc: 25.297 lr: 0.00100
[1/3,   151/202] loss: 2.014 acc: 24.500 lr: 0.00100
[1/3,   201/202] loss: 1.961 acc: 25.031 lr: 0.00100
[1/3] val acc: 25.000
[2/3,     1/202] loss: 0.040 acc: 21.094 lr: 0.00095
[2/3,    51/202] loss: 1.922 acc: 25.766 lr: 0.00095


KeyboardInterrupt: 

In [1]:
import torch
print("CUDA available:", torch.cuda.is_available())
print("MPS available:", torch.backends.mps.is_available())

CUDA available: True
MPS available: False
