<a href="https://colab.research.google.com/github/Aryanp018/Machile-learning-practice/blob/main/practice_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Practice Notebook for ML Research Poject
This is a practice notebook for you to assess your own level of comfort with the ML framework **PyTorch**, which will play an important role during the research project.  
If you haven't yet installed PyTorch, follow [the official installation instructions](https://pytorch.org/get-started/locally/). Make sure that you select the correct OS & select the version with CUDA if your computer supports it.
If you do not have an Nvidia GPU, you can install the CPU version by setting `CUDA` to `None`. Note that some experience with the package is recommended for this notebook.  
If you can successfully complete the tasks below, we think you are ready for the project.
With that, good luck!

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

### Task 1: Simple Custom Module: Dropout
This task will test your ability to set up basic custom PyTorch modules. For this task, you are asked to re-implement the Dropout module. Dropout is a form of regularization for neural networks. It works by randomly setting activations (values) to 0, each one with equal probability `p`. The values are then scaled by a factor $\frac{1}{1-p}$ to conserve their mean. Dropout effectively trains a pseudo-ensemble of models with stochastic gradient descent. During evaluation, we want to use the full ensemble and therefore have to turn off dropout. Use `self.training` to check if the model is in training or evaluation mode.  
Do not use any dropout implementation from PyTorch for this!

In [None]:
class Dropout(nn.Module):
    """
    Dropout, as described here:
    https://pytorch.org/docs/stable/nn.html#torch.nn.Dropout

    Args:
        p: float, dropout probability
    """
    def __init__(self, p: float):
        super().__init__()
        self.p = p

    def forward(self, input: torch.Tensor):
        """
        The module's forward pass.
        `input` contains the activations to apply Dropout to.
        Args:
            input: PyTorch tensor, arbitrary shape

        Returns:
            PyTorch tensor, same shape as input
        """
        # TODO: your implememation code
        # the code is written below which mimics the pytorch's in-builts nn.dropout

        if self.training: #it will only apply dropout in the training mode
          mask = (torch.rand_like(input) > self.p).float() #this will generate the dropout mask
          return (input * mask) / (1 - self.p) # Scalling the remainging activations
        else:
          return input #No dropout needed in evaluation mode

In [None]:
# Test dropout
test = torch.rand(10_000)
dropout = Dropout(0.2)
test_dropped = dropout(test)

# In principle, these assertions can fail due to bad luck, but
# if implemented correctly they should almost always succeed.
assert np.isclose(test_dropped.mean().item(), test.mean().item(), atol=1e-2)
assert np.isclose((test_dropped > 0).float().mean().item(), 0.8, atol=1e-2)
print("TEST SUCCESSFULL")

TEST SUCCESSFULL


**Question:** Why is it not necessary to implement a custom `backward` function for the `Dropout` module? Write your answer as a comment below.

In [None]:
'''
-> it is not necessary because of pytorch's autograd system which automatically computes gradients for each operations performed in the forward pass

-> During backpropogation, the dropout mask is treated as a constant multiplier, meaning that the gradient of the input gets scaled by the same dropout mask.

-> Explicit implementation is not needed because pytorch's computational graph always keeps the track of these operations and their gradients.

'''

"\n-> it is not necessary because of pytorch's autograd system which automatically computes gradients for each operations performed in the forward pass\n\n-> During backpropogation, the dropout mask is treated as a constant multiplier, meaning that the gradient of the input gets scaled by the same dropout mask.\n\n-> Explicit implementation is not needed because pytorch's computational graph always keeps the track of these operations and their gradients.\n\n"

### Task 2: Batch Normalization
In this task, you are asked to re-implement classical Batch Normalization, which uses standardization instead of the kernel density-based method we propose in our project. It is defined as the function
$ y = \frac{x - \mu_x}{\sigma_x + \epsilon} \cdot \gamma + \beta $,
where $\gamma$ and $\beta$ and learnable parameters and $\epsilon$ is a some small number to avoid dividing by zero. The Statistics $\mu_x$ and $\sigma_x$ are taken separately for each feature. Since Batch Normalization comes up most often in Convolutional Neural Networks (CNNs), write the module so that it accepts image batches as input. For the normalization formula, this means averaging **over the batch and all pixels**. Use appropriae tensor slicing to achieve the computation of statistics along the correct dimensions. [This image](https://i.sstatic.net/DLwRc.png) may be helpful.

Do not use any batch normalization implementation from PyTorch for this!

In [None]:
class BatchNorm(nn.Module):
    """
    Batch normalization, similar to
    https://pytorch.org/docs/stable/nn.html#torch.nn.BatchNorm1d

    Only uses batch statistics (no running mean for evaluation).
    Batch statistics are calculated for a single dimension.
    Gamma is initialized as 1, beta as 0.

    Args:
        num_features: Number of features to calculate batch statistics for.
    """
    def __init__(self, num_features):
        super().__init__()

        # TODO: Initialize the required parameters
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))

    def forward(self, input):
        """
        Batch normalization over the dimension C of (N, C, L).

        Args:
            input: PyTorch tensor, shape [N, C, L]

        Return:
            PyTorch tensor, same shape as input
        """
        eps = 1e-5

        N, C, *spatial_dims = input.shape

        # TODO: Implement the required transformation
        # the below is the required transformation as mentioned in the tasl

        #this will allow us to calculate the statistics accross batch and all spatial dimentions

        reshaped = input.transpose(0, 1).contiguous()
        reshaped = reshaped.view(C, -1)

        #Calculating the mean and cariance along the flattened batch and spatial dimentions
        mean = reshaped.mean(dim = 1, keepdim = True) # [c,1]
        var = reshaped.var(dim = 1, keepdim = True, unbiased = False) # [c,1]

        #normalizing
        reshaped_normalized = (reshaped - mean) / torch.sqrt(var + eps)

        #reshaping back and applying the gamma and beta
        output = reshaped_normalized.view(C, N, *spatial_dims)
        output = output.transpose(0, 1).contiguous()

        #adjusting the gamma and beta shapes for proper broadcasting across spatial dimensions
        gamma_shape = [1, C] + [1] * len(spatial_dims)
        beta_shape = [1, C] + [1] * len(spatial_dims)

        return self.gamma.view(*gamma_shape) * output + self.beta.view(*beta_shape)

In [None]:
# Tests the batch normalization implementation
torch.random.manual_seed(42)
test = torch.randn(8, 2, 4)

b1 = BatchNorm(2)
test_b1 = b1(test)

b2 = nn.BatchNorm1d(2, affine=False, track_running_stats=False)
test_b2 = b2(test)

assert torch.allclose(test_b1, test_b2, rtol=0.02)

**Question:** Which feature is this implementation missing compared to PyTorch's `nn.BatchNorm1d` module? What is the advantage of having that feature? Write your answer as a comment below.

In [None]:
#This implementation lacks pytorch's nn.batchnorm1d feature of tracking running statistics, which is essential for inference.
'''-> Without running statistics, normalization depends solely on the current batch, making inference incosistent with varying batch sizes, impossible with single samplesand vulnerable to batch-specific
      variations - critical limitations for real world model deployment
'''

'-> Without running statistics, normalization depends solely on the current batch, making inference incosistent with varying batch sizes, impossible with single samplesand vulnerable to batch-specific\n      variations - critical limitations for real world model deployment\n'

### Task 3: Model Training
In this task, you are asked to train a model which uses our kernel density normalization method. Be sure to follow good data science practices and report the model's performance in the end. Note that your task is not to create the model (you should use the `MLP_LN_LogReg` model we provide below), but to demonstrate your knowledge about the training process itself (dataset loading, optimization, etc.)

In [None]:
# The normalization module to be used with the Torch.NN model framework
import torch
import torch.nn as nn
import numpy as np
from typing import Type, Union
from torch.nn import functional as F
from math import prod
ArrayLike = Union[torch.Tensor, np.ndarray, float]
from abc import ABC, abstractmethod

class DensityKernel(ABC):
    """Abstract base class for kernel functions used in Kernel Density Estimation"""

    @abstractmethod
    def evaluate(x: torch.Tensor) -> torch.Tensor:
        """Evaluate the kernel at all points in x"""
        pass

    @abstractmethod
    def cdf(x: torch.Tensor) -> torch.Tensor:
        """Evaluate the CDF of the kernel at all points in x"""
        pass

    @abstractmethod
    def ppf(x: torch.Tensor) -> torch.Tensor:
        """Evaluate the inverse CDF of the kernel at all points in x"""
        pass

class GaussianKernel(DensityKernel):
    """Gaussian kernel for density estimation"""

    def evaluate(x: torch.Tensor) -> torch.Tensor:
        return torch.exp(-0.5 * torch.square(x)) / (np.sqrt(2 * np.pi))

    def cdf(x: torch.Tensor) -> torch.Tensor:
        return 0.5 * (1 + torch.erf(x / np.sqrt(2)))

    def ppf(x: torch.Tensor) -> torch.Tensor:
        return torch.erfinv(2*x - 1) * np.sqrt(2)

class BandwidthHeuristic:
    def scott(x: torch.Tensor) -> torch.Tensor:
            return 1.059 * x.std() * x.numel() ** (-1/5)
    def silverman(x: torch.Tensor) -> torch.Tensor:
            return 0.9 * x.std() * x.numel() ** (-1/5)

class KernelDensityEstimator:
    """Implements Kernel Density Estimation (KDE) with a given kernel"""

    def __init__(self, data: torch.Tensor, kernel: Type[DensityKernel] = GaussianKernel, bandwidth: ArrayLike = None, bandwidth_heuristic: Type[BandwidthHeuristic] = BandwidthHeuristic.silverman):
        self.data = data
        self.kernel = kernel
        if data is not None:
            self.bandwidth = bandwidth if bandwidth is not None else bandwidth_heuristic(data)

    def estimate(self, x: torch.Tensor) -> torch.Tensor:
        """Estimate the density at points x using the given kernel"""
        scaled_x = (x[:,np.newaxis] - self.data[np.newaxis,:]) / self.bandwidth
        return torch.mean(self.kernel.evaluate(scaled_x), dim=1) / self.bandwidth

    def cdf(self, x: torch.Tensor) -> torch.Tensor:
        """Estimate the CDF at points x using the given kernel"""
        scaled_x = (x[:,np.newaxis] - self.data[np.newaxis,:]) / self.bandwidth
        return torch.mean(self.kernel.cdf(scaled_x), dim=1)

    def normalize(self) -> torch.Tensor:
        """Normalize the data using the estimated CDF"""
        cdf = self.cdf(self.data)
        return self.kernel.ppf(cdf)

    def normalize_data(self, data: torch.Tensor, bandwidth: ArrayLike = None, bandwidth_heuristic: Type[BandwidthHeuristic] = BandwidthHeuristic.silverman) -> torch.Tensor:
        """Normalize the given data using the estimated CDF"""
        self.bandwidth = bandwidth if bandwidth is not None else bandwidth_heuristic(data)
        self.data = data
        cdf = self.cdf(data)
        return self.kernel.ppf(cdf)

class KDLayerNorm(nn.Module):
    """
    Kernel Density Layer Normalization\n
    Applies Kernel Density Normalization over a miini-batch of inputs
    Statistics are computed over the last `ndim` dimensions.
    """

    def __init__(self,
                 normalized_size: ArrayLike,
                 bias: bool = True,
                 kernel: Type[DensityKernel] = GaussianKernel,
                 bandwidth: ArrayLike = None,
                 bandwidth_heuristic: Type[BandwidthHeuristic] = BandwidthHeuristic.silverman,
                 ):
        super(KDLayerNorm, self).__init__()
        self.kde = KernelDensityEstimator(None, kernel, bandwidth, bandwidth_heuristic)
        self.explicit_bandwidth = bandwidth
        self.bandwidth_heuristic = bandwidth_heuristic
        self.norm_size = tuple(normalized_size) if type(normalized_size) is not int else (normalized_size,)
        self.ndim = len(self.norm_size)
        self.weight = nn.Parameter(torch.ones(normalized_size))
        self.bias = nn.Parameter(torch.zeros(normalized_size)) if bias else None

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Apply Kernel Density Normalization to the input tensor"""
        self.extra_size = x.shape[:-self.ndim]
        slice_shape = [prod(self.extra_size), prod(self.norm_size)]
        x = x.reshape(slice_shape)
        x = torch.stack([
            self.kde.normalize_data(slice, bandwidth=self.explicit_bandwidth, bandwidth_heuristic=self.bandwidth_heuristic) for slice in x
        ])
        x = x.reshape(self.extra_size + self.norm_size)
        x = x * self.weight
        if self.bias is not None:
            x = x + self.bias
        return x

The cell below contains the model for you to train on the dataset.

In [None]:
'''class MLP_LN_LogReg(nn.Module): # Use this model for training
    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU()
        ) # Affine layer w/ ReLU activation
        "---This is where we use our KDLayerNorm---"
        self.layer_norm = KDLayerNorm(hidden_dim) # Our KDLayerNorm
        "------------------------------------------"
        self.log_reg = nn.Linear(hidden_dim, 1)  # Logistic regression layer

    def forward(self, x):
        x = self.mlp(x)
        x = self.layer_norm(x)
        x = self.log_reg(x)
        return torch.sigmoid(x)

'''

In [None]:
class RMSNorm(nn.Module):
  def __init__(self, dim, eps = 1e-8):
    super().__init__()
    self.scale = nn.Parameter(torch.ones(dim))
    self.eps = eps

  def forward(self, x):
    norm = x.norm(2, dim = -1, keepdim = True)
    return x * self.scale / (norm + self.eps)

In [None]:
class MLP_LN_LogReg(nn.Module): # Use this model for training
    def __init__(self, input_dim, hidden_dim = 256):
        super().__init__()
        self.mlp = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            RMSNorm(hidden_dim),
            nn.Mish(),
            nn.Dropout(0.02),

            nn.Linear(hidden_dim, hidden_dim),
            RMSNorm(hidden_dim),
            nn.Mish(),
            nn.Dropout(0.02),

            nn.Linear(hidden_dim, hidden_dim // 2),
            RMSNorm(hidden_dim // 2),
            nn.Mish(),
            nn.Dropout(0.02),
        )

        #self.layer_norm = KDLayerNorm(hidden_dim // 2)
        self.log_reg = nn.Linear(hidden_dim // 2, 1)

    def forward(self, x):
        x = self.mlp(x)
       # x = self.layer_norm(x)
        x = self.log_reg(x)
        return torch.sigmoid(x)

In [None]:
# The following imports may be useful
from sklearn.datasets import load_breast_cancer
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# TODO: your code for model training

#first step would be to load and preprocess the data
data = load_breast_cancer()
X, y = data.data, data.target

#now lets normalize the featurs using standard scaler
scaler = StandardScaler()
X = scaler.fit_transform(X)

#converting to pytorch tensors
X = torch.tensor(X, dtype = torch.float32)
y = torch.tensor(y, dtype = torch.float32).view(-1, 1) #Reshape y to match the output thought to do so from the error of hibiki shape mismatch I saw

#splitting the dataset into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)

#Next step is to initialize the model and training components
input_dim = X_train.shape[1] #number of features
#hidden_dim = 256 #size fo the hidden layer

model = MLP_LN_LogReg(input_dim)
criterion = nn.BCELoss() #Binary Cross entropy loss
optimizer = torch.optim.AdamW(model.parameters(), lr = 0.001, weight_decay = 1e-5) #added weight decay for regulariztion
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size = 10, gamma = 0.5)

#third step is to train the model
epochs = 50
batch_size = 64
num_batches = len(X_train) // batch_size

for epoch in range(epochs):
  model.train()#setting the model to training mode
  epoch_loss = 0.0

  for i in range(num_batches):
    start = i * batch_size
    end = min(start + batch_size, len(X_train))#this will handle the last batch properly
    X_batch = X_train[start:end]
    y_batch = y_train[start:end]

    optimizer.zero_grad()
    y_pred = model(X_batch)
    loss = criterion(y_pred, y_batch)
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
    optimizer.step()

    epoch_loss += loss.item()

  scheduler.step()


  #average loss
  avg_loss = epoch_loss / num_batches

  #printing the loss after 10 epochs
  if (epoch + 1) % 10 == 0:
    print(f"Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}")


#last step is to evaluate model
with torch.no_grad():
  y_test_pred = model(X_test)
  y_test_pred = (y_test_pred > 0.5).float() #here converting the probablities to binary
  accuracy = (y_test_pred == y_test).float().mean().item()

print(f"Test Accuracy: {accuracy * 100:.2f}%")

Epoch [10/50], Loss: 0.3859
Epoch [20/50], Loss: 0.3024
Epoch [30/50], Loss: 0.2686
Epoch [40/50], Loss: 0.2507
Epoch [50/50], Loss: 0.2429
Test Accuracy: 99.12%


### Bonus exercise for extra points: Squeeze out all the juice!
For extra points, edit the model architecture so that when you train it **with the same number of training iterations**, the model achieves a higher _test accuracy_ (higher _training accuracy_ alone does not count) For this task, you may edit the model directly within the cell we provided.