In [1]:
# 🚀 **Importing Helper Modules**

import torch  # 🧠 Core PyTorch library for tensor operations and neural networks
import torch.nn as nn  # 🏗️ Neural network components (layers, loss functions)
import torch.optim as optim  # ⚙️ Optimization algorithms (SGD, Adam, etc.)
import torchvision  # 🎨 Computer vision utilities and datasets
import torchvision.transforms as transforms  # 🖼️ Data transformations (normalization, augmentation)
from torch.utils.data import DataLoader  # 🚚 For loading and batching data
import matplotlib.pyplot as plt  # 📊 Visualization for losses and accuracies
from torchvision.transforms import ToTensor
from torchvision import datasets
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
torch.cuda.empty_cache()

Using device: cuda


In [2]:
from torchvision import transforms, datasets
from torch.utils.data import DataLoader

def load_mnist_data(batch_size=64):
    """
    📦 Load and preprocess the MNIST dataset.
    📜 Returns: train_loader and test_loader 🎯
    """
    transform = transforms.Compose([
        transforms.Resize((224, 224)),  # 🔄 Resize images to 224x224 pixels
        transforms.ToTensor(),  # 🔄 Convert images to tensors 📊
        transforms.Lambda(lambda x: x.repeat(3, 1, 1)),  # 🌈 Convert grayscale to RGB
        transforms.Normalize((0.1307, 0.1307, 0.1307), (0.3081, 0.3081, 0.3081))  # ⚖️ Normalize for RGB
        #transforms.Normalize((0.1307,), (0.3081,)) #normalize for grayscale
    ])

    # 🛠️ Load MNIST training and test datasets 🖼️
    train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader  # 🚚 Return the loaders 📦


In [3]:
# 🚀 **Part 2: Custom Dropout Implementation**

class CustomDropout(nn.Module):
    """
    🛠️ TODO: Implement custom dropout layer 🎯

    📜 **Requirements:**
    1️⃣ Initialize with **dropout probability** `p` 🎲
    2️⃣ Implement **forward pass** with proper scaling 🔄
    3️⃣ **Only drop** units during **training** (`self.training` flag) 🏋️‍♂️
    """

    def __init__(self, p=0.5):
        super(CustomDropout, self).__init__()
        # 🎲 Store dropout probability (p between 0 and 1)
        self.p = p
        pass  # 🚧 Initialization complete! Time to implement the logic 🛠️

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass**
        if self.training:  # 🏋️‍♂️ Drop units only during training mode
            mask = torch.bernoulli(torch.ones_like(x) * (1 - self.p))
            x = x * mask / (1 - self.p)
            # 🚧 Work in progress! Apply dropout logic 🧪
        return x  # 🔄 Return the (possibly dropped) output ✨


In [4]:
# 🚀 **Part 3: Custom BatchNorm2d Implementation**

class CustomBatchNorm2d(nn.Module):
    """
    🛠️ TODO: Implement custom 2D batch normalization 🔄

    📜 **Requirements:**
    1️⃣ Initialize **running mean**, **variance**, **gamma (scale)**, and **beta (shift)** ⚖️
    2️⃣ Implement **forward pass** with proper normalization ✨
    3️⃣ Track **running statistics** during training 📊
    """

    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super(CustomBatchNorm2d, self).__init__()
        # 🛠️ **TODO: Initialize parameters and buffers**
        self.num_features = num_features
        self.eps = eps
        self.momentum = momentum

        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta  = nn.Parameter(torch.zeros(num_features))

        self.register_buffer('running_mean', torch.zeros(num_features))
        self.register_buffer('running_var', torch.ones(num_features))
        # 🚧 Work in progress 🚀

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for batch normalization**
        # Steps:|
        # 1️⃣ Calculate batch mean and variance 📊
        # 2️⃣ Normalize the input 🎯
        # 3️⃣ Apply learnable parameters (gamma and beta) ⚙️
        # 4️⃣ Update running statistics during training 🏋️‍♂️
        if self.training:
            batch_mean = x.mean(dim=[0, 2, 3], keepdim=True)  # Mean over N, H, W
            batch_var = x.var(dim=[0, 2, 3], unbiased=False, keepdim=True)  # Variance over N, H, W

            # Normalize input
            x_norm = (x - batch_mean) / torch.sqrt(batch_var + self.eps)

            # Scale and shift using gamma and beta
            out = self.gamma.view(1, -1, 1, 1) * x_norm + self.beta.view(1, -1, 1, 1)

            # Update running statistics
            self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean.view(-1)
            self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var.view(-1)

        else:
            # Use stored running statistics during inference
            x_norm = (x - self.running_mean.view(1, -1, 1, 1)) / torch.sqrt(self.running_var.view(1, -1, 1, 1) + self.eps)
            out = self.gamma.view(1, -1, 1, 1) * x_norm + self.beta.view(1, -1, 1, 1)

        return out

        # 🚧 Normalize and return the output 🧪


In [5]:
class CustomReLU(nn.Module):
    """
    🛠️ TODO: Implement custom ReLU activation function ✨

    📜 **Requirements:**
    1️⃣ Apply ReLU manually using tensor operations (avoid using `F.relu`) 🔄
    2️⃣ Output should replace all negative values with 0 (ReLU behavior) 🧹
    """

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for ReLU**
        # Hint: Use `torch.max` to replace all negative values with 0 🎯
        return torch.max(x, torch.tensor(0.0, device = x.device))
        # 🚧 Replace and return the ReLU-activated output ⚡


In [6]:
class CustomMaxPooling2d(nn.Module):
    """
    🛠️ TODO: Implement custom 2D MaxPooling layer 🏊

    📜 **Requirements:**
    1️⃣ Implement a max-pooling operation with a given kernel size and stride 📐
    2️⃣ Return the maximum value in each pooling window 🌊
    3️⃣ Ensure it supports both training and evaluation modes 🔄
    """

    def __init__(self, kernel_size=2, stride=2):
        super(CustomMaxPooling2d, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for max-pooling**
        # Hint: Use `unfold` to break the input into windows and compute the max for each window 🔍
        # Calculate output dimensions
        N, C, H, W = x.shape
        H_out = max(1, (H - self.kernel_size) // self.stride + 1)
        W_out = max(1, (W - self.kernel_size) // self.stride + 1)

        # Adjust kernel size and stride if input is too small
        kernel_h = min(self.kernel_size, H)
        kernel_w = min(self.kernel_size, W)
        stride_h = min(self.stride, H)
        stride_w = min(self.stride, W)

        unfolded = x.unfold(2, kernel_h, stride_h).unfold(3, kernel_w, stride_w)
        unfolded = unfolded.contiguous().view(N, C, H_out, W_out, kernel_h * kernel_w)
        pooled = unfolded.max(dim=-1)[0]

        return pooled
        # 🚧 Pool and return the reduced output 🏊‍♂️


In [7]:
class CustomVGG16(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomVGG16, self).__init__()
        self.features = nn.Sequential(
            # Block 1: conv3-64 -> conv3-64 -> maxpool
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            CustomBatchNorm2d(64),
            CustomReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            CustomBatchNorm2d(64),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 2: conv3-128 -> conv3-128 -> maxpool
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            CustomBatchNorm2d(128),
            CustomReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            CustomBatchNorm2d(128),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 3: conv3-256 -> conv3-256 -> maxpool
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 4: conv3-512 -> conv3-512 -> maxpool
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 5: conv3-512 -> conv3-512 -> maxpool
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2)
        )

         # Calculate flattened size for Linear layer input
        self.flattened_size = 512 * 7 * 7

        # Classifier layers
        self.classifier = nn.Sequential(
            nn.Linear(self.flattened_size , 4096),   # Input size matches flattened feature map
            CustomReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(4096 , 4096),
            CustomReLU(),
            nn.Dropout(p=0.5),

            nn.Linear(4096 , num_classes)    # Output layer for classification
        )

    def forward(self, x):
        x = self.features(x)
        x = torch.flatten(x, start_dim=1)  # Flatten all dimensions except batch
        x = self.classifier(x)
        return x


In [8]:
# 🚀 **Part 5: Training Functions**

def train_epoch(model, train_loader, criterion, optimizer, device):
    """
    🛠️ TODO: Implement training loop for one epoch 🏋️‍♂️
    """
    model.train()  # 📈 Switch to training mode
    running_loss = 0.0  # 💰 Track the cumulative loss
    correct = 0  # ✅ Correct predictions counter
    total = 0  # 📊 Total samples counter

    for data, target in train_loader:  # 🔄 Loop through batches
        # 📌 Your code here (e.g., forward pass, loss calculation, backward pass, optimizer step)
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = output.max(1)
        total += target.size(0)
        correct += predicted.eq(target).sum().item()

    # 📊 Return average loss and accuracy for the epoch
    return running_loss / len(train_loader), 100. * correct / total

def evaluate(model, test_loader, criterion, device):
    """
    🧪 TODO: Implement evaluation loop 🔍
    """
    model.eval()  # 🔕 Switch to evaluation mode (no gradients)
    test_loss = 0  # 💰 Track cumulative test loss
    correct = 0  # ✅ Correct predictions counter
    total = 0  # 📊 Total samples counter

    with torch.no_grad():  # 🚫 No gradient calculation for evaluation
        # 📌 Your code here (e.g., forward pass, loss calculation, accuracy calculation)
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)

            test_loss += loss.item()
            _, predicted = output.max(1)
            total += target.size(0)
            correct += predicted.eq(target).sum().item()

    # 📊 Return average test loss and accuracy
    return test_loss / len(test_loader), 100. * correct / total


In [9]:
# 🚀 **Part 6: Main Training Loop**

def main():
    # ⚙️ **Hyperparameters**
    BATCH_SIZE = 16  # 📦 Batch size for data loading
    EPOCHS = 3  # 🔄 Number of training epochs
    LEARNING_RATE = 0.001  # 🚀 Learning rate for optimizer
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # ⚡ Use GPU if available

    # 📊 **Load data**
    train_loader, test_loader = load_mnist_data(BATCH_SIZE)

    # 🛠️ **Initialize model, criterion, optimizer**
    model = CustomVGG16().to(DEVICE)  # 🖥️ Move model to the selected device
    criterion = nn.CrossEntropyLoss()  # 🎯 Loss function for classification
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  # 🚀 Adam optimizer for better convergence

    # 🔄 **Training loop**
    train_losses = []  # 📉 Track training losses
    test_losses = []  # 📉 Track test losses
    train_accs = []  # 📊 Track training accuracy
    test_accs = []  # 📊 Track test accuracy

    for epoch in range(EPOCHS):
        # 🏋️‍♂️ **TODO: Implement main training loop**
        print(f"🌟 Epoch {epoch+1}/{EPOCHS}")

        # Train for one epoch
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
        train_losses.append(train_loss)
        train_accs.append(train_acc)

        # Evaluate on test set
        test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)
        test_losses.append(test_loss)
        test_accs.append(test_acc)

        # Print progress
        print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Train Acc = {train_acc:.2f}%, Test Loss = {test_loss:.4f}, Test Acc = {test_acc:.2f}%")

    # 📈 **Plot results**
    # 🛠️ **TODO: Create loss and accuracy plots**
    # Example: plt.plot(train_losses), plt.plot(test_losses), etc.
    epochs_range = range(1, EPOCHS+1)
    plt.figure(figsize=(12, 5))

    # Plot losses
    plt.subplot(1, 2, 1)
    plt.plot(epochs_range, train_losses, label='Train Loss')
    plt.plot(epochs_range, test_losses, label='Test Loss')
    plt.title('Loss over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.xticks(epochs_range)
    plt.legend()

    # Plot accuracies
    plt.subplot(1, 2, 2)
    plt.plot(epochs_range, train_accs, label='Train Accuracy')
    plt.plot(epochs_range, test_accs, label='Test Accuracy')
    plt.title('Accuracy over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.xticks(epochs_range)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
if __name__ == '__main__':
    main()

🌟 Epoch 1/3
