<a href="https://colab.research.google.com/github/RishitLaddha/session27/blob/main/customizedCnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Custom CNN and Transformer Models with Custom Layers

This notebook implements two custom models for demonstration:

1. **Custom CNN for MNIST**: Uses custom layer functions (convolution, ReLU, max pooling, etc.) defined from scratch.
2. **Custom Transformer (Decoder-only)**: Uses custom layer functions for a simple decoder-only Transformer model on toy text data.


In [14]:
# Clone the repository
!git clone https://github.com/RishitLaddha/session27.git

# Change to the repository directory (if needed)
import os
os.chdir('session27')

# Now your data folder is available as ./data/MNIST/raw/...


Cloning into 'session27'...
remote: Enumerating objects: 33, done.[K
remote: Counting objects: 100% (33/33), done.[K
remote: Compressing objects: 100% (29/29), done.[K
remote: Total 33 (delta 8), reused 14 (delta 2), pack-reused 0 (from 0)[K
Receiving objects: 100% (33/33), 22.33 MiB | 23.77 MiB/s, done.
Resolving deltas: 100% (8/8), done.


# custom_layer

In [15]:
# Import necessary libraries for tensor operations and math
import torch
import math

####################################################################################################
# CUSTOM LAYER FUNCTIONS
# This section defines all the custom layer functions using basic PyTorch tensor operations.
####################################################################################################

def pad2d(input_tensor, pad):
    """
    Manually applies 2D zero-padding to an input tensor.

    Parameters:
      input_tensor (Tensor): Input tensor of shape (N, C, H, W)
      pad (int): Number of zeros to add to each side

    Returns:
      Tensor: Padded tensor of shape (N, C, H+2*pad, W+2*pad)
    """
    if pad == 0:
        return input_tensor
    N, C, H, W = input_tensor.shape
    H_new, W_new = H + 2 * pad, W + 2 * pad
    # Create a new tensor filled with zeros on the same device and dtype as the input
    padded = torch.zeros((N, C, H_new, W_new), device=input_tensor.device, dtype=input_tensor.dtype)
    # Copy the original tensor into the center of the padded tensor
    padded[:, :, pad:pad+H, pad:pad+W] = input_tensor
    return padded

def conv2d_custom(x, weight, bias, stride=1, padding=0):
    """
    Custom implementation of a 2D convolution layer using explicit loops.

    Parameters:
      x (Tensor): Input tensor of shape (N, C_in, H, W)
      weight (Tensor): Convolution kernel of shape (C_out, C_in, kH, kW)
      bias (Tensor): Bias tensor of shape (C_out,)
      stride (int): Stride of the convolution
      padding (int): Zero-padding to add to each side of the input

    Returns:
      Tensor: Convolved output of shape (N, C_out, H_out, W_out)
    """
    # First apply manual padding
    x_padded = pad2d(x, padding)
    N, C_in, H, W = x_padded.shape
    C_out, _, kH, kW = weight.shape
    # Compute the output spatial dimensions
    H_out = (H - kH) // stride + 1
    W_out = (W - kW) // stride + 1

    # Create an empty output tensor
    out = torch.zeros((N, C_out, H_out, W_out), device=x.device, dtype=x.dtype)

    # Loop over each element in the output tensor
    for n in range(N):
        for c in range(C_out):
            for i in range(H_out):
                for j in range(W_out):
                    h_start = i * stride
                    w_start = j * stride
                    # Extract the patch from the padded input
                    patch = x_padded[n, :, h_start:h_start+kH, w_start:w_start+kW]
                    # Perform element-wise multiplication, sum, and add bias
                    out[n, c, i, j] = torch.sum(patch * weight[c]) + bias[c]
    return out

def relu(x):
    """
    Custom ReLU activation function that sets negative values to zero.

    Parameters:
      x (Tensor): Input tensor

    Returns:
      Tensor: Output tensor with negative values replaced by 0
    """
    return torch.clamp(x, min=0)

def max_pool2d_custom(x, kernel_size=2, stride=2):
    """
    Custom implementation of 2D max pooling using explicit loops.

    Parameters:
      x (Tensor): Input tensor of shape (N, C, H, W)
      kernel_size (int): Size of the pooling window
      stride (int): Stride for the pooling operation

    Returns:
      Tensor: Pooled output of shape (N, C, H_out, W_out)
    """
    N, C, H, W = x.shape
    H_out = (H - kernel_size) // stride + 1
    W_out = (W - kernel_size) // stride + 1
    out = torch.zeros((N, C, H_out, W_out), device=x.device, dtype=x.dtype)

    for n in range(N):
        for c in range(C):
            for i in range(H_out):
                for j in range(W_out):
                    h_start = i * stride
                    w_start = j * stride
                    patch = x[n, c, h_start:h_start+kernel_size, w_start:w_start+kernel_size]
                    out[n, c, i, j] = torch.max(patch)
    return out

def flatten(x):
    """
    Flattens the input tensor except for the batch dimension.

    Parameters:
      x (Tensor): Input tensor

    Returns:
      Tensor: Flattened tensor of shape (N, -1)
    """
    return x.view(x.shape[0], -1)

def linear_custom(x, weight, bias):
    """
    Custom implementation of a linear (fully connected) layer.

    Parameters:
      x (Tensor): Input tensor of shape (N, in_features)
      weight (Tensor): Weight matrix of shape (in_features, out_features)
      bias (Tensor): Bias tensor of shape (out_features,)

    Returns:
      Tensor: Output tensor of shape (N, out_features)
    """
    return x @ weight + bias

def softmax(x, dim):
    """
    Custom softmax function for numerical stability.
    It subtracts the maximum value before exponentiating and normalizes the result.

    Parameters:
      x (Tensor): Input tensor
      dim (int): Dimension along which to apply softmax

    Returns:
      Tensor: Softmax-normalized tensor
    """
    exp_x = torch.exp(x - torch.max(x, dim=dim, keepdim=True)[0])
    return exp_x / torch.sum(exp_x, dim=dim, keepdim=True)

def gelu(x):
    """
    Implements the Gaussian Error Linear Unit (GELU) activation function.
    Formula: GELU(x) = 0.5 * x * (1 + tanh(sqrt(2/pi) * (x + 0.044715 * x^3)))

    Parameters:
      x (Tensor): Input tensor

    Returns:
      Tensor: Output tensor after applying GELU activation
    """
    return x * 0.5 * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * x**3)))

def layer_norm(x, eps=1e-5):
    """
    Custom layer normalization that normalizes over the last dimension.

    Parameters:
      x (Tensor): Input tensor of any shape
      eps (float): A small number for numerical stability

    Returns:
      Tensor: Layer-normalized tensor
    """
    mean = torch.mean(x, dim=-1, keepdim=True)
    var = torch.var(x, dim=-1, keepdim=True, unbiased=False)
    return (x - mean) / torch.sqrt(var + eps)

# End of custom layer functions
####################################################################################################

# Note: The functions above will be used by both our CNN and Transformer models.

# custom_cnn

In [16]:
import torch


In [17]:
# CUSTOM CNN MODEL, TRAINING, AND EVALUATION

import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset

####################################################################################################
# CustomCNN Class
# Implements a simple CNN with two convolutional layers, ReLU activation, and max pooling.
# The network architecture:
#  - Conv1: 1 input channel, 6 output channels, kernel size 5
#  - ReLU activation
#  - Max Pooling: 2x2 window, stride 2
#  - Conv2: 6 input channels, 12 output channels, kernel size 5
#  - ReLU activation
#  - Max Pooling: 2x2 window, stride 2
#  - Flatten and Fully Connected layer: maps from 12*4*4 features to 10 classes (for MNIST)
####################################################################################################

class CustomCNN:
    def __init__(self):
        """
        Initializes the Custom CNN model parameters.
        """
        # Define weights and biases for the first convolutional layer
        self.conv1_weight = torch.nn.Parameter(torch.randn(6, 1, 5, 5) * 0.1)
        self.conv1_bias   = torch.nn.Parameter(torch.zeros(6))

        # Define weights and biases for the second convolutional layer
        self.conv2_weight = torch.nn.Parameter(torch.randn(12, 6, 5, 5) * 0.1)
        self.conv2_bias   = torch.nn.Parameter(torch.zeros(12))

        # Calculate number of input features for the fully connected layer
        fc_in_features = 12 * 4 * 4
        fc_out_features = 10  # 10 classes for MNIST

        # Define weights and biases for the fully connected layer
        self.fc_weight = torch.nn.Parameter(torch.randn(fc_in_features, fc_out_features) * 0.1)
        self.fc_bias   = torch.nn.Parameter(torch.zeros(fc_out_features))

        # Store all parameters in a list for the optimizer
        self.params = [
            self.conv1_weight, self.conv1_bias,
            self.conv2_weight, self.conv2_bias,
            self.fc_weight, self.fc_bias
        ]
        self.device = torch.device("cpu")

    def to(self, device):
        """Moves model parameters to the specified device."""
        self.device = device
        self.conv1_weight = self.conv1_weight.to(device)
        self.conv1_bias = self.conv1_bias.to(device)
        self.conv2_weight = self.conv2_weight.to(device)
        self.conv2_bias = self.conv2_bias.to(device)
        self.fc_weight = self.fc_weight.to(device)
        self.fc_bias = self.fc_bias.to(device)
        return self

    def forward(self, x):
        """
        Forward pass of the CNN model.

        Steps:
          1. Convolution with first layer and add bias
          2. Apply ReLU activation
          3. Apply max pooling
          4. Convolution with second layer, ReLU, and max pooling
          5. Flatten the feature maps
          6. Apply the fully connected layer to produce final logits
        """
        x = x.to(self.device)
        x = conv2d_custom(x, self.conv1_weight, self.conv1_bias, stride=1, padding=0)
        x = relu(x)
        x = max_pool2d_custom(x, kernel_size=2, stride=2)

        x = conv2d_custom(x, self.conv2_weight, self.conv2_bias, stride=1, padding=0)
        x = relu(x)
        x = max_pool2d_custom(x, kernel_size=2, stride=2)

        x = flatten(x)
        x = linear_custom(x, self.fc_weight, self.fc_bias)
        return x

    def get_parameters(self):
        """Returns all the model parameters as a list."""
        return self.params

####################################################################################################
# Training and Evaluation Functions for the Custom CNN
####################################################################################################

def train_cnn(model, train_loader, epochs=5, lr=0.01, device='cpu'):
    """
    Trains the Custom CNN on the MNIST dataset.

    Parameters:
      model (CustomCNN): The CNN model instance
      train_loader (DataLoader): DataLoader for the training data
      epochs (int): Number of training epochs
      lr (float): Learning rate for SGD optimizer
      device (str): Device to run the training on ('cpu' or 'cuda')

    Returns:
      List of log strings for each epoch
    """
    optimizer = optim.SGD(model.get_parameters(), lr=lr)
    logs = []
    total_samples = len(train_loader.dataset)

    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        print("-" * 60)
        running_samples = 0
        # Set the next threshold to print progress (e.g., every 6400 samples)
        next_print_threshold = 6400
        for batch_idx, (images, labels) in enumerate(train_loader):
            images = images.to(device)
            labels = labels.to(device)

            outputs = model.forward(images)
            log_probs = torch.log(softmax(outputs, dim=1) + 1e-8)
            one_hot = torch.zeros_like(log_probs)
            one_hot.scatter_(1, labels.view(-1, 1), 1)
            loss = -torch.sum(one_hot * log_probs) / images.shape[0]

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            running_samples += images.shape[0]
            if running_samples >= next_print_threshold:
                percent = (running_samples / total_samples) * 100
                print(f"Train Epoch: {epoch} [{running_samples}/{total_samples} ({percent:.0f}%)]\tLoss: {loss.item():.6f}")
                next_print_threshold += 6400  # Increase the threshold by 6400 for next print
        logs.append(f"Epoch {epoch+1}: Completed {running_samples}/{total_samples} samples")
    return logs


def evaluate_cnn(model, test_loader, device='cpu', epoch=None):
    """
    Evaluates the Custom CNN on the test dataset.

    Parameters:
      model (CustomCNN): The CNN model instance
      test_loader (DataLoader): DataLoader for the test data
      device (str): Device to run evaluation on
      epoch (int, optional): Epoch number for logging

    Returns:
      float: Accuracy percentage on the test dataset
    """
    total_loss = 0.0
    total_correct = 0
    total_samples = 0

    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model.forward(images)
        log_probs = torch.log(softmax(outputs, dim=1) + 1e-8)
        one_hot = torch.zeros_like(log_probs)
        one_hot.scatter_(1, labels.view(-1, 1), 1)
        loss = -torch.sum(one_hot * log_probs) / images.shape[0]

        # Compute predictions and update accuracy counts
        preds = torch.argmax(outputs, dim=1)
        total_correct += torch.sum(preds == labels).item()
        total_loss += loss.item()
        total_samples += images.shape[0]

    avg_loss = total_loss / len(test_loader)
    accuracy = (total_correct / total_samples) * 100

    if epoch is not None:
        print(f"Epoch {epoch} Test set: Average loss: {avg_loss:.4f}, Accuracy: {total_correct}/{total_samples} ({accuracy:.2f}%)")
    else:
        print(f"Test set: Average loss: {avg_loss:.4f}, Accuracy: {total_correct}/{total_samples} ({accuracy:.2f}%)")
    return accuracy

####################################################################################################
# Main Function for CNN Training
####################################################################################################

def cnn_main():
    """
    Main function to train and evaluate the Custom CNN on a small subset of MNIST.
    """
    # Set manual seed for reproducibility
    torch.manual_seed(42)

    # Define transformations for the MNIST data
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])

    # Load full MNIST datasets (download flag set to False, change if needed)
    full_train_dataset = torchvision.datasets.MNIST('./data', train=True, download=False, transform=transform)
    full_test_dataset = torchvision.datasets.MNIST('./data', train=False, download=False, transform=transform)

    # Use a small subset for a quick test run
    train_subset = Subset(full_train_dataset, range(60000))
    test_subset = Subset(full_test_dataset, range(10000))

    # Create DataLoaders for training and testing
    train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_subset, batch_size=50, shuffle=False)

    # Initialize the Custom CNN model and move it to the appropriate device
    model = CustomCNN()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    print("Training Custom CNN on MNIST (training for 5 epochs)...")
    cnn_logs = train_cnn(model, train_loader, epochs=5, lr=0.01, device=str(device))
    evaluate_cnn(model, test_loader, device=str(device))

    # Write training logs to a file
    with open("README_CNN.txt", "w") as f:
        f.write("Custom CNN Training Logs:\n")
        for log in cnn_logs:
            f.write(log + "\n")

# End of CNN section

# Uncomment the following line to run the CNN training directly in this cell
# cnn_main()

# custom_transformer

In [18]:
# CUSTOM TRANSFORMER MODEL, TRAINING, AND DATASET

import torch.optim as optim
from dataclasses import dataclass
from torch.utils.data import Dataset, DataLoader

####################################################################################################
# Transformer Configuration Data Class
# Holds all the configuration parameters for the Transformer model.
####################################################################################################

@dataclass
class TransformerConfig:
    vocab_size: int = 1000   # Vocabulary size
    max_seq_len: int = 64    # Maximum sequence length
    dim: int = 256           # Model dimension
    num_layers: int = 2      # Number of Transformer layers (not used in this simple version)
    num_heads: int = 2       # Number of attention heads (not used in this simple version)
    dropout: float = 0.1     # Dropout rate (not used in this simple version)

####################################################################################################
# Custom Transformer (Decoder-only) Model
# This simple Transformer implements a single self-attention layer with a feed-forward network.
####################################################################################################

class CustomTransformer:
    def __init__(self, config: TransformerConfig):
        """
        Initializes the Transformer model parameters.
        """
        self.config = config
        self.d_model = config.dim

        # Token embedding: maps vocabulary indices to d_model-dimensional vectors
        self.embed = torch.nn.Parameter(torch.randn(config.vocab_size, config.dim) * 0.1)

        # Define weights for the self-attention mechanism
        self.Wq = torch.nn.Parameter(torch.randn(config.dim, config.dim) * 0.1)
        self.Wk = torch.nn.Parameter(torch.randn(config.dim, config.dim) * 0.1)
        self.Wv = torch.nn.Parameter(torch.randn(config.dim, config.dim) * 0.1)
        self.Wo = torch.nn.Parameter(torch.randn(config.dim, config.dim) * 0.1)

        # Feed-forward network parameters
        self.ff_weight1 = torch.nn.Parameter(torch.randn(config.dim, config.dim * 4) * 0.1)
        self.ff_bias1 = torch.nn.Parameter(torch.zeros(config.dim * 4))
        self.ff_weight2 = torch.nn.Parameter(torch.randn(config.dim * 4, config.dim) * 0.1)
        self.ff_bias2 = torch.nn.Parameter(torch.zeros(config.dim))

        # Final projection layer: projects the Transformer output back to vocabulary size
        self.proj_weight = torch.nn.Parameter(torch.randn(config.dim, config.vocab_size) * 0.1)
        self.proj_bias = torch.nn.Parameter(torch.zeros(config.vocab_size))

        # Store all parameters for the optimizer
        self.params = [
            self.embed, self.Wq, self.Wk, self.Wv, self.Wo,
            self.ff_weight1, self.ff_bias1, self.ff_weight2, self.ff_bias2,
            self.proj_weight, self.proj_bias
        ]

    def attention(self, Q, K, V):
        """
        Computes the scaled dot-product attention.

        Parameters:
          Q (Tensor): Query tensor
          K (Tensor): Key tensor
          V (Tensor): Value tensor

        Returns:
          Tensor: Output of the attention layer
        """
        d = Q.shape[-1]
        scores = Q @ K.transpose(-2, -1) / math.sqrt(d)
        attn = softmax(scores, dim=-1)
        return attn @ V

    def forward(self, x_indices):
        """
        Forward pass of the Transformer model.

        Parameters:
          x_indices (Tensor): Input tensor containing token indices of shape (N, seq_len)

        Returns:
          Tensor: Logits of shape (N, seq_len, vocab_size)
        """
        # Convert indices to embeddings
        x = self.embed[x_indices]  # Shape: (N, seq_len, d_model)

        # Compute queries, keys, and values using custom linear transformation
        Q = linear_custom(x, self.Wq, bias=0)
        K = linear_custom(x, self.Wk, bias=0)
        V = linear_custom(x, self.Wv, bias=0)

        # Compute self-attention output
        attn_out = self.attention(Q, K, V)
        attn_out = linear_custom(attn_out, self.Wo, bias=0)

        # Residual connection from the embeddings
        x = x + attn_out

        # Feed-forward network block
        ff = linear_custom(x, self.ff_weight1, self.ff_bias1)
        ff = gelu(ff)
        ff = linear_custom(ff, self.ff_weight2, self.ff_bias2)
        x = x + ff  # Residual connection

        # Project the output to the vocabulary size
        logits = linear_custom(x, self.proj_weight, self.proj_bias)
        return logits

    def get_parameters(self):
        """Returns all the model parameters for optimization."""
        return self.params

####################################################################################################
# Simple Text Dataset for Quick Test Run
# Generates random sequences of token indices to simulate text data.
####################################################################################################

class SimpleTextDataset(Dataset):
    def __init__(self, vocab_size, seq_len, size=20):
        # Randomly generate a tensor of token indices
        self.data = torch.randint(0, vocab_size, (size, seq_len))
        # Shift the data by one position to serve as the target output
        self.targets = torch.roll(self.data, shifts=-1, dims=1)
        self.targets[:, -1] = 0

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]

####################################################################################################
# Training Function for the Transformer
####################################################################################################

def train_transformer(model, dataloader, epochs=5, lr=0.001, device='cpu'):
    """
    Trains the Custom Transformer on a toy text dataset.

    Parameters:
      model (CustomTransformer): The Transformer model instance
      dataloader (DataLoader): DataLoader for the text dataset
      epochs (int): Number of training epochs
      lr (float): Learning rate for SGD optimizer
      device (str): Device to run training on

    Returns:
      List of log strings for each epoch
    """
    optimizer = optim.SGD(model.get_parameters(), lr=lr)
    logs = []
    # Using built-in CrossEntropyLoss for the projection logits
    criterion = torch.nn.CrossEntropyLoss()

    for epoch in range(epochs):
        print(f"Epoch {epoch+1}/{epochs}")
        print("-" * 60)
        total_loss = 0.0
        total_tokens = 0

        for data, targets in dataloader:
            data = data.to(device)
            targets = targets.to(device)

            # Forward pass through the Transformer
            logits = model.forward(data)
            N, seq_len, vocab_size = logits.shape
            # Reshape logits and targets for loss computation
            logits_flat = logits.view(N * seq_len, vocab_size)
            targets_flat = targets.view(N * seq_len)

            loss = criterion(logits_flat, targets_flat)

            # Backward pass and optimization step
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            total_loss += loss.item() * (N * seq_len)
            total_tokens += (N * seq_len)

        avg_loss = total_loss / total_tokens
        log_entry = f"Epoch {epoch+1}: Avg Loss = {avg_loss:.4f}"
        print("[Transformer]", log_entry)
        logs.append(log_entry)
    return logs

####################################################################################################
# Main Function for Transformer Training
####################################################################################################

def transformer_main():
    """
    Main function to train the Custom Transformer on a toy text dataset.
    """
    # Set manual seed for reproducibility
    torch.manual_seed(42)

    # Define configuration for the Transformer
    config = TransformerConfig(vocab_size=1000, max_seq_len=16, dim=128, num_layers=2, num_heads=2)
    model = CustomTransformer(config)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Create a simple text dataset
    dataset = SimpleTextDataset(vocab_size=config.vocab_size, seq_len=16, size=20)
    dataloader = DataLoader(dataset, batch_size=5, shuffle=True)

    print("Training Custom Transformer on toy text data (training for 5 epochs)...")
    transformer_logs = train_transformer(model, dataloader, epochs=5, lr=0.001, device=str(device))

    # Write training logs to a file
    with open("README_Transformer.txt", "w") as f:
        f.write("Custom Transformer Training Logs:\n")
        for log in transformer_logs:
            f.write(log + "\n")

# End of Transformer section

# Uncomment the following line to run the Transformer training directly in this cell
# transformer_main()

# main function

In [None]:
# MASTER MAIN FUNCTION TO RUN BOTH MODELS

import os
import certifi
# Set SSL certificate file for any HTTPS requests if needed
os.environ['SSL_CERT_FILE'] = certifi.where()

# Define master main function that runs both CNN and Transformer training
def main():
    print("========== Running Custom CNN Training ==========")
    cnn_main()  # Train and evaluate the CNN model

    print("\n========== Running Custom Transformer Training ==========")
    transformer_main()  # Train the Transformer model

if __name__ == "__main__":
    main()

Training Custom CNN on MNIST (training for 5 epochs)...
Epoch 1/5
------------------------------------------------------------
