In [1]:
import torch
import torchvision
from torch.utils.data import DataLoader

import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from torch.optim.lr_scheduler import StepLR

#### **MNIST Dataset: The Classic Benchmark** 

**MNIST** is a popular dataset of **70,000 handwritten digits** (0-9) widely used for training and testing image processing systems.  

📊 **Dataset Overview:**  
- 🖼️ **60,000 Training Images**  
- 🖼️ **10,000 Test Images**  
- 🔲 **Image Size:** 28x28 pixels (grayscale)  
- 🔍 **Digits:** Size-normalized and centered  
- Check out the full details here: [📖 Wikipedia - MNIST Database](https://en.wikipedia.org/wiki/MNIST_database)  


In [None]:
# **Part 1: Data Loading and Preprocessing**
# TODO: Complete the data loading code 

def load_mnist_data(batch_size=16):
    transform = transforms.Compose([
        transforms.RandomRotation(10),
        transforms.RandomAffine(0,translate=(0.05,0.05)),
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])

    train_dataset = torchvision.datasets.MNIST(root='./data', train=True,  transform=transform, download=True)
    test_dataset  = torchvision.datasets.MNIST(root='./data', train=False, transform=transform, download=True)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader  


#### **Understanding Dropout**

**Dropout** is a regularization technique 🛡️ used to prevent **overfitting** in deep neural networks by randomly ignoring or **"dropping out"** some layer outputs during training.  

##### 🔍 **Where is Dropout Applied?**  
Dropout can be implemented in different types of layers:  
- **Dense (Fully Connected) Layers**  
- **Convolutional Layers**  
- **Recurrent Layers**  
- It is **NOT applied** to the output layer!  

##### 🎲 **How Dropout Works**  
The **dropout probability** 🔢 determines how likely it is for a neuron to be dropped out:  
- **Input Layer:** Lower dropout probability  
- **Hidden Layers:** Higher dropout probability  



In [None]:
# 🚀 **Part 2: Custom Dropout Implementation**

class CustomDropout(nn.Module):
    def __init__(self, p=0.5):
        super(CustomDropout, self).__init__()
        self.p = p

    def forward(self, x):
        if self.training:
            # 🏋️‍♂️ Drop units only during training mode
            mask = torch.rand_like(x) > self.p
            scale = 1.0 / (1.0 - self.p) if self.p != 1.0 else 0.0
            x = x * mask.float()
            x = x * scale
        return x


#### **Batch Normalization for 2D Inputs**

BatchNorm2d normalizes inputs across a batch during training, accelerating training and improving generalization.

##### **Benefits**

-   Reduces internal covariate shift by normalizing activations.
-   Accelerates training.
-   Acts as a regularizer, potentially reducing the need for dropout.

##### **How It Works**

1.  Computes the mean and variance for each feature map across the batch.
2.  Normalizes activations by subtracting the mean and dividing by the standard deviation.
3.  Applies learnable scale (γ) and shift (β) parameters for flexibility.


In [4]:
# 🚀 **Part 3: Custom BatchNorm2d Implementation**

class CustomBatchNorm2d(nn.Module):
  """
  🛠️ Custom 2D Batch Normalization Layer 🔄

  📜 **Requirements:**
  1️⃣ Initialize **running mean**, **variance**, **gamma (scale)**, and **beta (shift)** ⚖️
  2️⃣ Implement **forward pass** with proper normalization ✨
  3️⃣ Track **running statistics** during training 📊
  """

  def __init__(self, num_features, eps=1e-5, momentum=0.1):
    super(CustomBatchNorm2d, self).__init__()
    self.num_features = num_features
    self.momentum = momentum
    self.eps = eps

    self.register_buffer('running_mean', torch.zeros(num_features))
    self.register_buffer('running_var', torch.ones(num_features))
    self.gamma = nn.Parameter(torch.ones(num_features))
    self.beta = nn.Parameter(torch.zeros(num_features))

  def forward(self, x):
    if self.training:
      # Calculate batch mean and variance
      batch_mean__ = torch.mean(x, dim = (0, 2, 3))
      batch_var___ = torch.var(x, dim = (0, 2, 3), unbiased = False)

      self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_mean__
      self.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_var___
      mean = batch_mean__
      var = batch_var___

    else:
      mean = self.running_mean
      var = self.running_var

    # Reshape for broadcasting
    mean  = mean.view(1, self.num_features, 1, 1)
    var   = var.view(1, self.num_features, 1, 1)
    gamma = self.gamma.view(1, self.num_features, 1, 1)
    beta  = self.beta.view(1, self.num_features, 1, 1)

    # Normalize the input
    x_norm = (x - mean) / torch.sqrt(var + self.eps)
    x_out = gamma * x_norm + beta
    return x_out


#### **Activation Functions**

**Activation functions** introduce **non-linearity** into neural networks, allowing them to learn and solve **complex tasks**. Without them, the network would only be able to learn **linear relationships** (which isn’t very useful for most problems 😅).  

---

#### 🌟 **Why are Activation Functions Important?**  
- 🔄 **Adds non-linearity** to the model  
- 🚀 **Enables learning of complex patterns**  
- 🛠️ Helps the network make decisions, just like neurons in the brain 🧠  

---

#### 📚 **Common Activation Functions**  

##### 1️⃣ **ReLU (Rectified Linear Unit)** ⚡  
- **Formula:** `f(x) = max(0, x)`  
- **Use:** Most common in hidden layers of CNNs and DNNs  
- **Pros:** Simple, fast, reduces vanishing gradient problem  
- **Cons:** Can suffer from the **dying ReLU** problem 😵  

---

##### 2️⃣ **Sigmoid (Logistic Function)** 📈  
- **Formula:** `f(x) = 1 / (1 + exp(-x))`  
- **Range:** (0, 1)  
- **Use:** Good for binary classification tasks 🔢  
- **Cons:** Can cause **vanishing gradients** 😬  

---

##### 3️⃣ **Tanh (Hyperbolic Tangent)** 🔄  
- **Formula:** `f(x) = (2 / (1 + exp(-2x))) - 1`  
- **Range:** (-1, 1)  
- **Use:** Often used in recurrent networks 🔁  
- **Pros:** Centered around 0, making optimization easier  
- **Cons:** Also prone to vanishing gradients  

---

##### 4️⃣ **Softmax** 🎯  
- **Formula:** Converts logits into probabilities 🔢  
- **Range:** (0, 1), sums to 1  
- **Use:** Last layer for multi-class classification problems  

---

#### 🎯 **How to Choose an Activation Function?**  
- For **hidden layers**, use **ReLU** or its variants (like Leaky ReLU or ELU)  
- For **binary classification**, use **Sigmoid**  
- For **multi-class classification**, use **Softmax**  


In [None]:
class CustomReLU(nn.Module):
    def forward(self, x):
        ReLU =  torch.max(x, torch.tensor(0.0))
        return ReLU

In [None]:
class CustomMaxPooling2d(nn.Module):
    def __init__(self, kernel_size=2, stride=2):
        super(CustomMaxPooling2d, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride

    def forward(self, x):
        # 🔄 **TODO: Implement forward pass for max-pooling**
        # Hint: Use `unfold` to break the input into windows and compute the max for each window 🔍
        batch_size = x.size(0)
        channel = x.size(1)
        height = x.size(2)
        width = x.size(3)
        stride__ = self.stride if height > 3 else 1

        k = self.kernel_size
        s = stride__

        x = x.unfold(2, k, s)
        x = x.unfold(3, k, s)
        x = x.contiguous().view(x.size(0), x.size(1), -1, k * k)
        x_pooled, _ = x.max(dim = -1)

        out_height = (height - k) // s + 1
        out_width = (width - k) // s + 1
        x_pooled = x_pooled.view(batch_size, channel, out_height, out_width)
        return x_pooled



#### **MaxPooling with `unfold`**

`torch.nn.functional.unfold` is a tool that breaks an input tensor into sliding windows. Here's how to use it for max pooling:

1.  **Unfold the input tensor:** This provides overlapping windows based on the `kernel_size` and `stride`.

    ```python
    # Unfold the input tensor into sliding windows
    x_unfolded = x.unfold(2, self.kernel_size, self.stride).unfold(3, self.kernel_size, self.stride)
    ```
2.  **Reshape the unfolded tensor:** Convert it to a shape that facilitates maximum computation.

    ```python
    x_unfolded = x_unfolded.contiguous().view(x.size(0), x.size(1), -1, self.kernel_size * self.kernel_size)
    ```
3.  **Compute the max along the last dimension:** This dimension represents all elements within each window.

    ```python
    x_pooled, _ = x_unfolded.max(dim=-1)
    ```
4.  **Reshape back to the correct output size:** This ensures the output matches a typical max pooling layer.


#### **Implement Your Own Custom VGG16 Model**

1.  Implement both `CustomReLU` and `CustomMaxPooling2d` in the provided classes.
2.  Use only tensor operations like `torch.max` or `unfold` (no built-in `F.relu` or `nn.MaxPool2d`).
3.  Integrate them into your `CustomVGG16` model.   
4.  Check out the full details here: [VGG16 Paper](https://arxiv.org/pdf/1409.1556)


In [None]:
# 🚀 **Part 4: Custom VGG16 Model Implementation**
class CustomVGG16(nn.Module):

    def __init__(self, num_classes=10):  # num_classes = 10 for MNIST
        super(CustomVGG16, self).__init__()

        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            CustomBatchNorm2d(64),
            CustomReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            CustomBatchNorm2d(64),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            CustomBatchNorm2d(128),
            CustomReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            CustomBatchNorm2d(128),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            CustomBatchNorm2d(256),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 4
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),

            # Block 5
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            CustomBatchNorm2d(512),
            CustomReLU(),
            CustomMaxPooling2d(kernel_size=2, stride=2),
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            CustomDropout(0.5),
            nn.Linear(512 * 7 * 7, 4096),
            CustomReLU(),
            CustomDropout(0.5),
            nn.Linear(4096, 4096),
            CustomReLU(),
            CustomDropout(0.5),
            nn.Linear(4096, num_classes)
        )

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None: nn.init.constant_(m.bias, 0)

            elif isinstance(m, CustomBatchNorm2d):
                nn.init.constant_(m.gamma, 1)
                nn.init.constant_(m.beta, 0)

            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x



In [None]:
# 🚀 **Part 5: Training Functions**

def train_epoch(model, train_loader, criterion, optimizer, device):
    """
    Implement training loop for one epoch
    """
    model.train()                                                                       # Switch to training mode
    running_loss = 0.0                                                                  # Track the cumulative loss
    correct = 0                                                                         # Correct predictions counter
    total = 0                                                                           # Total samples counter

    for batch_idx, (data, target) in enumerate(train_loader):                            # Loop through batches
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        outputs = model(data)
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

        if (batch_idx + 1) % 300 == 0:
            print(f"Batch {batch_idx + 1}/{len(train_loader):<5} | "
                  f"{'Loss:':<6}{loss.item():<8.4f} | "
                  f"{'Accuracy:':<9}{(100. * correct / total):>6.2f}%")

    # Return average loss and accuracy for the epoch
    return running_loss / len(train_loader), 100. * correct / total

def evaluate(model, test_loader, criterion, device):
    """
    Implement evaluation loop
    """
    model.eval()                                                                        # Switch to evaluation mode (no gradients)
    test_loss = 0                                                                       # Track cumulative test loss
    correct = 0                                                                         # Correct predictions counter
    total = 0                                                                           # Total samples counter

    with torch.no_grad():                                                               
        # Your code here (e.g., forward pass, loss calculation, accuracy calculation)
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            loss = criterion(outputs, target)
            test_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    # Return average test loss and accuracy
    return test_loss / len(test_loader), 100. * correct / total


In [None]:
# Part 6: Main Training Loop

def main():
  # Hyperparameters
  BATCH_SIZE = 16             # Batch size for data loading
  EPOCHS = 10                 # Number of training epochs
  LEARNING_RATE = 0.001       # Learning rate for optimizer
  DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # Use GPU if available

  # Load data
  train_loader, test_loader = load_mnist_data(BATCH_SIZE)

  # Initialize model, criterion, optimizer
  model = CustomVGG16().to(DEVICE)  # Move model to the selected device
  criterion = nn.CrossEntropyLoss()  # Loss function for classification
  optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  # Adam optimizer for better convergence
  scheduler = StepLR(optimizer, step_size=2, gamma=0.1)


  # Training loop
  train_losses = []  # Track training losses
  test_losses = []  # Track test losses
  train_accs = []  # Track training accuracy
  test_accs = []  # Track test accuracy

  for epoch in range(EPOCHS):
    print(f"Epoch {epoch+1}/{EPOCHS}")  # Display current epoch
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
    test_loss, test_acc = evaluate(model, test_loader, criterion, DEVICE)
    train_accs.append(train_acc)
    train_losses.append(train_loss)
    test_accs.append(test_acc)
    test_losses.append(test_loss)
    scheduler.step()

    print("\n" + "="*50)
    print(f" Epoch {epoch+1} Training Summary ".center(50))
    print("="*50)
    print(f"{'Train Loss':<20}{train_loss:>10.4f}")
    print(f"{'Train Accuracy':<20}{train_acc:>9.2f}%")
    print(f"{'Test Loss':<20}{test_loss:>10.4f}")
    print(f"{'Test Accuracy':<20}{test_acc:>9.2f}%")
    print("="*50 + "\n")


  # Plot results

  plt.figure(figsize=(12, 5))

  # Loss Plot
  plt.subplot(1, 2, 1)
  plt.plot(train_losses, label="Train Loss", color="blue", linewidth=2)
  plt.plot(test_losses, label="Test Loss", color="red", linewidth=2, linestyle="--")
  plt.xlabel("Epochs", fontsize=12)
  plt.ylabel("Loss", fontsize=12)
  plt.title("Training vs. Testing Loss", fontsize=14, fontweight="bold")
  plt.legend(fontsize=10)
  plt.grid(alpha=0.3)

  min_test_loss_epoch = test_losses.index(min(test_losses))
  plt.scatter(min_test_loss_epoch, min(test_losses), color="red", s=50, label="Min Test Loss")
  plt.legend()

  # Accuracy Plot
  plt.subplot(1, 2, 2)
  plt.plot(train_accs, label="Train Accuracy", color="green", linewidth=2)
  plt.plot(test_accs, label="Test Accuracy", color="orange", linewidth=2, linestyle="--")
  plt.xlabel("Epochs", fontsize=12)
  plt.ylabel("Accuracy", fontsize=12)
  plt.title("Training vs. Testing Accuracy", fontsize=14, fontweight="bold")
  plt.legend(fontsize=10)
  plt.grid(alpha=0.3)

  max_test_acc_epoch = test_accs.index(max(test_accs))
  plt.scatter(max_test_acc_epoch, max(test_accs), color="orange", s=50, label="Max Test Accuracy")
  plt.legend()

  plt.tight_layout()
  plt.show()


In [10]:
if __name__ == "__main__":
    main()

🌟 Epoch 1/10
Batch 300/3750  | Loss: 1.5471   | Accuracy: 23.21%
Batch 600/3750  | Loss: 0.7590   | Accuracy: 31.62%
Batch 900/3750  | Loss: 0.7773   | Accuracy: 38.03%
Batch 1200/3750  | Loss: 0.7575   | Accuracy: 43.70%
Batch 1500/3750  | Loss: 0.3569   | Accuracy: 48.46%
Batch 1800/3750  | Loss: 0.6805   | Accuracy: 52.02%
Batch 2100/3750  | Loss: 0.4728   | Accuracy: 56.01%
Batch 2400/3750  | Loss: 0.9284   | Accuracy: 59.23%
Batch 2700/3750  | Loss: 0.2107   | Accuracy: 62.01%
Batch 3000/3750  | Loss: 0.3379   | Accuracy: 64.67%
Batch 3300/3750  | Loss: 1.0562   | Accuracy: 67.05%
Batch 3600/3750  | Loss: 0.1389   | Accuracy: 69.18%

        [1m Epoch 1 Training Summary [0m        
Train Loss              0.8547
Train Accuracy          70.16%
Test Loss               0.4182
Test Accuracy           92.01%

🌟 Epoch 2/10
Batch 300/3750  | Loss: 0.8573   | Accuracy: 92.31%
Batch 600/3750  | Loss: 0.2074   | Accuracy: 92.76%
Batch 900/3750  | Loss: 0.1559   | Accuracy: 92.70%
Batch 12