<a href="https://colab.research.google.com/github/Druvith/NN_from_scratch2023/blob/main/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Implementation of a three-layer CNN from scratch



In [None]:
import os
import gzip
import numpy as np
import requests
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

In [None]:
class FashionMNIST(Dataset):
    def __init__(self, images, labels):
      self.images = images
      self.labels = labels

    def __len__(self):
      return len(self.images)

    def __getitem__(self, idx):
      return self.images[idx], self.labels[idx]

In [None]:
def download_mnist(url, path):
  response = requests.get(url)
  with open(path, 'wb') as f:
    for chunk in response.iter_content(chunk_size=1024):
      if chunk:
        f.write(chunk)

In [None]:
#Purpose: The goal is to download, extract and convert it to a tensor for training a CNN

def load_fashion_mnist():
  base_url = 'http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/'
  files = {
        'train_images': 'train-images-idx3-ubyte.gz',
        'train_labels': 'train-labels-idx1-ubyte.gz',
        'test_images': 't10k-images-idx3-ubyte.gz',
        'test_labels': 't10k-labels-idx1-ubyte.gz'
    }

  data_dir = 'fashion_mnist_data'
  if not os.path.exists(data_dir):
    os.makedirs(data_dir)

  for key, file_name in files.items():
    file_path = os.path.join(data_dir, file_name)
    if not os.path.exists(file_path):
      download_mnist(base_url + file_name, file_path)

  train_images = extract_images(os.path.join(data_dir, files['train_images']))
  train_labels = extract_labels(os.path.join(data_dir, files['train_labels']))
  test_images = extract_images(os.path.join(data_dir, files['test_images']))
  test_labels = extract_labels(os.path.join(data_dir, files['test_labels']))

  train_data = FashionMNIST(train_images, train_labels)
  test_data = FashionMNIST(test_images, test_labels)

  return train_data, test_data

In [None]:
def extract_images(file_path):
  with gzip.open(file_path, 'rb') as f:
    data = np.frombuffer(f.read(), np.uint8, offset=16)
  data = data.reshape(-1, 1, 28, 28).astype(np.float32) / 255.0
  return torch.tensor(data)

In [None]:
def extract_labels(file_path):
  with gzip.open(file_path, 'rb') as f:
    data = np.frombuffer(f.read(), np.uint8, offset=8)
  return torch.tensor(data, dtype=torch.long)

In [None]:
train_data, test_data = load_fashion_mnist()

In [None]:
image, label = train_data[10]
image.shape, label.shape

(torch.Size([1, 28, 28]), torch.Size([]))

In [None]:
class DataLoader:
  def __init__(self, dataset, batch_size=1, shuffle=False):
    self.dataset = dataset
    self.batch_size = batch_size
    self.shuffle = shuffle
    self.indices = np.arange(len(dataset))

  def __iter__(self):
    if self.shuffle:
      np.random.shuffle(self.indices)
    self.current_idx = 0
    return self

  def __next__(self):
    if self.current_idx >= len(self.dataset):
      raise StopIteration
    batch_indices = self.indices[self.current_idx:self.current_idx + self.batch_size]
    batch = [self.dataset[i] for i in batch_indices]
    batch_images = torch.stack([item[0] for item in batch])
    batch_labels = torch.stack([item[1] for item in batch])
    self.current_idx += self.batch_size
    return batch_images, batch_labels


In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)

print(f"Using {device}")

Using cuda


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.init as init

class ConvLayer(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size):
        super(ConvLayer, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.filter = nn.Parameter(torch.randn(out_channels, in_channels, kernel_size, kernel_size))
        init.kaiming_normal_(self.filter, mode='fan_out', nonlinearity='relu')
    def forward(self, x):
        # Use PyTorch's built-in function for forward pass convolution
        out = F.conv2d(x, self.filter, stride=1, padding=0)
        return out

    def backward(self, grad_out, x, learning_rate):
        batch_size, _, h, w = x.shape
        grad_filter = torch.zeros_like(self.filter)
        grad_x = torch.zeros_like(x)

        out_h = h - self.kernel_size + 1
        out_w = w - self.kernel_size + 1

        for i in range(out_h):
            for j in range(out_w):
                region = x[:, :, i:i+self.kernel_size, j:j+self.kernel_size]
                for k in range(self.out_channels):
                    # Accumulate gradient for the filter
                    grad_filter[k] += torch.sum(grad_out[:, k, i, j].view(-1, 1, 1, 1) * region, dim=0)

                    # Accumulate gradient for the input
                    grad_x[:, :, i:i+self.kernel_size, j:j+self.kernel_size] += grad_out[:, k, i, j].view(-1, 1, 1, 1) * self.filter[k]

        # Update the filters using the computed gradients
        self.filter.data -= learning_rate * grad_filter
        return grad_x


> To create max pool operation which inherits nn.Module class (ofc) we need to 2 op vectors,  
1. out vector - max values
2. Indices - indices with max values. (for backward pass)

> For backward pass.   
1. Set the grad vector to be the same shape as the input.
2. The grad ops for MaxPool is 1 for the indices which gives us the max values else 0. (since d(x)/dx = 1)
3. Then multiply with output of the next layer, beacuse of chain rule.

In [None]:
class MaxPoolLayer(nn.Module):
  def __init__(self, kernel_size, stride=None, padding=0):
    super(MaxPoolLayer, self).__init__()
    self.kernel_size = kernel_size
    self.stride = stride if stride is not None else kernel_size
    self.max_indices = None
  def forward(self, x):
    batch_size, channels, h, w = x.shape
    out_h = (h - self.kernel_size) // self.stride + 1
    out_w = (w - self.kernel_size) // self.stride + 1

    # Initialise out and index tensors
    out = torch.zeros((batch_size, channels, out_h, out_w), device=x.device)
    self.max_indices = torch.zeros_like(out, dtype=torch.long, device=x.device)

    for i in range(out_h):
      for j in range(out_w):
        reigon = x[:, :, i*self.stride: i*self.stride +self.kernel_size, j*self.stride: j*self.stride + self.kernel_size]
        max_values, indices = torch.max(reigon.reshape(batch_size, channels, -1), dim=2)
        out[:, :, i, j] = max_values
        self.max_indices[:, :, i, j] = indices

    return out

  def backward(self, grad_out):
    batch_size, channels, out_h, out_w = grad_out.shape
    # Initialize the gradient for the input
    grad_x = torch.zeros((batch_size, channels,
                              out_h * self.stride,
                              out_w * self.stride),
                              device=grad_out.device)

    for i in range(out_h):
      for j in range(out_w):
        region_shape = (batch_size, channels, self.kernel_size, self.kernel_size)
        region_grad = grad_x[:, :, i*self.stride: i*self.stride + self.kernel_size, j*self.stride: j*self.stride + self.kernel_size]
        # flatten the region to map the max indices
        flat_region_grad = region_grad.reshape(batch_size, channels, -1)

        # Use advanced indexing to map gradients back to the correct indices
        flat_region_grad.scatter_(2, self.max_indices[:, :, i, j].unsqueeze(-1), grad_out[:, :, i, j].unsqueeze(-1))

    return grad_x

    ### This operation effectively backpropagates the gradient only to the positions that were the maximum in each pooling region.

In [None]:
import math

class Linear(nn.Module):
  def __init__(self, in_features, out_features, device = device):
    super(Linear, self).__init__()
    self.in_features = in_features
    self.out_features = out_features
    self.std = math.sqrt(2 / in_features)
    self.weight = nn.Parameter(torch.randn(out_features, in_features) * self.std).to(device)
    self.bias = nn.Parameter(torch.zeros(out_features)).to(device)

  def forward(self, x):
    return x @ self.weight.T + self.bias

  def backward(self, grad_out, x, learning_rate):
    grad_x = grad_out @ self.weight
    grad_weight = grad_out.T @ x
    grad_bias = torch.sum(grad_out, dim=0)

    self.weight.data -= learning_rate * grad_weight
    self.bias.data -= learning_rate * grad_bias

    return grad_x


In [None]:
class Flatten(nn.Module):     # backward pass is just resizing to the input shape
  def forward(self, x):
    self.input_shape = x.size()  # retain the original size
    return x.view(x.size(0), -1)  # (B, C * H * W)

  def backward(self, grad_out, x):
    grad_x = grad_out.view(self.input_shape)
    return grad_x


In [None]:
class ReLU(nn.Module):
  def forward(self, x):
    return torch.clamp(x, min=0)

  def backward(self, grad_out, x):
    grad_x = grad_out.clone()
    grad_x[x <= 0] = 0
    return grad_x

In [None]:
class CrossEntropy(nn.Module):
  def forward(self, y_pred, y_true):
    y_true_onehot = torch.nn.functional.one_hot(y_true, num_classes=10).float()  # Assuming 10 classes
    probs = torch.log_softmax(y_pred, dim=1)
    return -torch.mean(torch.sum(probs * y_true_onehot, dim=1)), probs

  def backward(self, probs, y_true):
    n = probs.shape[0]
    grad_out = probs.clone()            #.clone() is used so not transform the original probs
    grad_out[range(n), y_true] -= 1       # subtracts -1 from the index of current label
    grad_out /= n
    return grad_out

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv1 = ConvLayer(1, 32, 3)
        self.conv2 = ConvLayer(32, 64, 3)
        self.relu = ReLU()
        self.maxpool = MaxPoolLayer(2, stride=2)
        self.flatten = Flatten()
        self.fc1 = Linear(64 * 12 * 12, 128)
        self.fc2 = Linear(128, 10)
        self.loss_fn = CrossEntropy()

    def forward(self, x):
        self.x1 = self.conv1(x)
        self.x2 = self.relu(self.x1)
        self.x3 = self.conv2(self.x2)
        self.x4 = self.relu(self.x3)
        self.x5 = self.maxpool(self.x4)
        self.x6 = self.flatten(self.x5)
        self.x7 = self.fc1(self.x6)
        self.x8 = self.relu(self.x7)
        self.logits = self.fc2(self.x8)
        return self.logits

    def backward(self, logits, labels,x, learning_rate):
        loss, probs = self.loss_fn(logits, labels)
        grad_out = self.loss_fn.backward(probs, labels)

        grad_out = self.fc2.backward(grad_out, self.x8, learning_rate)
        grad_out = self.relu.backward(grad_out, self.x7)
        grad_out = self.fc1.backward(grad_out, self.x6, learning_rate)
        grad_out = self.flatten.backward(grad_out, self.x5)
        grad_out = self.maxpool.backward(grad_out)
        grad_out = self.relu.backward(grad_out, self.x3)
        grad_out = self.conv2.backward(grad_out, self.x2, learning_rate)
        grad_out = self.relu.backward(grad_out, self.x1)
        grad_out = self.conv1.backward((grad_out, x), learning_rate)

        return loss


In [None]:
model = NeuralNetwork().to(device)

In [None]:
print(device)

cuda


In [None]:
train_data_loader = DataLoader(train_data, batch_size=32, shuffle=True)
test_data_loader = DataLoader(test_data, batch_size=32, shuffle=False)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
lr = 0.001
criterion = CrossEntropy()
epochs = 10

for epoch in range(epochs):
  total_loss = 0
  for X, y in train_data_loader:
    X, y = X.to(device), y.to(device)
    logits = model.forward(X)
    loss, _ = model.loss_fn(logits, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    total_loss += loss.item()
  print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

Epoch 1/10, Loss: 0.4837
Epoch 2/10, Loss: 0.7472
Epoch 3/10, Loss: 0.7720
Epoch 4/10, Loss: 0.3256
Epoch 5/10, Loss: 0.7576
Epoch 6/10, Loss: 0.7845
Epoch 7/10, Loss: 0.4341
Epoch 8/10, Loss: 0.2821
Epoch 9/10, Loss: 0.4203
Epoch 10/10, Loss: 0.2991


In [None]:
with torch.no_grad():
  for X, y in test_data_loader:
    X, y = X.to(device), y.to(device)
    logits = model.forward(X)
    loss, _ = model.loss_fn(logits, y)
    print(f"Test Loss: {loss.item():.4f}")


Test Loss: 0.5973
Test Loss: 0.5286
Test Loss: 0.4212
Test Loss: 0.2138
Test Loss: 0.5401
Test Loss: 0.2563
Test Loss: 0.3152
Test Loss: 0.5231
Test Loss: 0.2496
Test Loss: 0.3885
Test Loss: 0.4578
Test Loss: 0.3588
Test Loss: 0.2452
Test Loss: 0.1879
Test Loss: 0.7333
Test Loss: 0.3136
Test Loss: 0.3138
Test Loss: 0.6727
Test Loss: 0.4683
Test Loss: 0.6434
Test Loss: 0.6153
Test Loss: 0.5464
Test Loss: 0.5384
Test Loss: 0.5364
Test Loss: 0.4255
Test Loss: 0.4873
Test Loss: 0.2990
Test Loss: 0.3574
Test Loss: 0.7512
Test Loss: 0.4500
Test Loss: 1.0246
Test Loss: 0.5639
Test Loss: 0.4548
Test Loss: 0.3760
Test Loss: 0.6025
Test Loss: 0.3542
Test Loss: 0.5193
Test Loss: 0.5195
Test Loss: 0.6590
Test Loss: 0.4839
Test Loss: 0.3324
Test Loss: 0.5411
Test Loss: 0.3482
Test Loss: 0.5069
Test Loss: 0.2894
Test Loss: 0.6433
Test Loss: 0.3550
Test Loss: 0.4446
Test Loss: 0.4157
Test Loss: 0.3178
Test Loss: 0.3936
Test Loss: 0.6566
Test Loss: 0.2970
Test Loss: 0.4231
Test Loss: 0.5133
Test Loss:

In [None]:
classes = [
    "T-shirt/top",
    "Trouser",
    "Pullover",
    "Dress",
    "Coat",
    "Sandal",
    "Shirt",
    "Sneaker",
    "Bag",
    "Ankle boot",
]

model.eval()
# Iterate over the first 10 samples of the test data
for i in range(20,30):
    x, y = test_data[i]  # Unpack the input and label for each sample
    x = x.to(device)
    x = x.unsqueeze(0)  # Add a batch dimension
    with torch.no_grad():
        pred = model.forward(x)
        predicted, actual = classes[pred[0].argmax()], classes[y]
        print(f'Predicted: "{predicted}", Actual: "{actual}"')

Predicted: "Pullover", Actual: "Pullover"
Predicted: "Sneaker", Actual: "Sandal"
Predicted: "Sneaker", Actual: "Sneaker"
Predicted: "Sandal", Actual: "Ankle boot"
Predicted: "Trouser", Actual: "Trouser"
Predicted: "Pullover", Actual: "Coat"
Predicted: "Shirt", Actual: "Shirt"
Predicted: "T-shirt/top", Actual: "T-shirt/top"
Predicted: "Ankle boot", Actual: "Ankle boot"
Predicted: "Dress", Actual: "Dress"


In [None]:
import gc
del model
gc.collect()
torch.cuda.empty_cache()