In [3]:
# Import libraries
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import pandas as pd

# We use pickle to load the CIFAR-10 dataset
import pickle

# We use os and urllib to download and extract the dataset
import os
import urllib.request

# We use tarfile to extract the downloaded tar.gz file
import tarfile

In [None]:
# Load CIFAR-10 dataset manually with NumPy
def load_cifar10_batch(file_path):
    with open(file_path, 'rb') as f: # Open the file
        data = pickle.load(f, encoding='bytes') # Load the batch

    # Reshape images    
    images = data[b'data'].reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1) # Reshape and transpose to (N, H, W, C)
    images = np.asarray(images, dtype=np.float32)  # Convert to NumPy array with float32
    labels = np.array(data[b'labels']) # Get labels as NumPy array
    return images, labels

# Data augmentation functions
def random_crop(img, crop_size=32, padding=4):

    # Pad the image
    img = np.pad(img, ((padding, padding), (padding, padding), (0, 0)), mode='constant', constant_values=0)
    h, w = img.shape[:2]

    # Random crop
    top = np.random.randint(0, h - crop_size + 1)
    left = np.random.randint(0, w - crop_size + 1)
    
    return img[top:top + crop_size, left:left + crop_size]

def random_horizontal_flip(img):
    if np.random.rand() > 0.5:
        return np.flip(img, axis=1)
    return img

def random_erasing(img, p=0.5, sl=0.02, sh=0.4, r1=0.3):
    if np.random.rand() > p:
        return img
    img_h, img_w = img.shape[:2]
    s = np.random.uniform(sl, sh) * img_h * img_w
    r = np.random.uniform(r1, 1/r1)
    w = int(np.sqrt(s / r))
    h = int(np.sqrt(s * r))
    left = np.random.randint(0, img_w - w + 1) if w < img_w else 0
    top = np.random.randint(0, img_h - h + 1) if h < img_h else 0
    img[top:top+h, left:left+w] = 0
    return img

def random_gaussian_noise(img, std=0.05):
    noise = np.random.normal(0, std, img.shape)
    return img + noise

def augment_image(img):
    img = random_crop(img)
    img = random_horizontal_flip(img)
    img = random_erasing(img)
    img = random_gaussian_noise(img)
    return img

# Define data directory
data_dir = './data/cifar-10-batches-py' 

# Download if not present
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
    urllib.request.urlretrieve(url, './data/cifar-10.tar.gz')

    # Extract the tar file
    with tarfile.open('./data/cifar-10.tar.gz', 'r:gz') as tar:
        tar.extractall('./data')

# Load training data (5 batches of 10,000 images each)
train_images = []
train_labels = []
for i in range(1, 6):
    images, labels = load_cifar10_batch(f'{data_dir}/data_batch_{i}')
    train_images.append(images)
    train_labels.append(labels)

# We need to concatenate all training images because they are in separate batches
train_images = np.concatenate(train_images) 
train_labels = np.concatenate(train_labels)  

# Load test data
test_images, test_labels = load_cifar10_batch(f'{data_dir}/test_batch')

# Normalize images (mean/std for CIFAR-10)
mean = np.array([0.4914, 0.4822, 0.4465])
std = np.array([0.2023, 0.1994, 0.2010])
train_images = (train_images / 255.0 - mean) / std
test_images = (test_images / 255.0 - mean) / std

# Split train into train/val (80/20 rule)
train_size = int(0.8 * len(train_images))
val_images, val_labels = train_images[train_size:], train_labels[train_size:]
train_images, train_labels = train_images[:train_size], train_labels[:train_size]

# Convert to NumPy arrays
train_images = train_images.transpose(0, 3, 1, 2)  # (N, C, H, W)
val_images = val_images.transpose(0, 3, 1, 2)
test_images = test_images.transpose(0, 3, 1, 2)

# Convert to NumPy arrays for CPU
train_images = np.asarray(train_images)
val_images = np.asarray(val_images)
test_images = np.asarray(test_images)
train_labels = np.asarray(train_labels)
val_labels = np.asarray(val_labels)
test_labels = np.asarray(test_labels)

print(f'Train: {len(train_images)}, Val: {len(val_images)}, Test: {len(test_images)}')

  data = pickle.load(f, encoding='bytes') # Load the batch


Train: 40000, Val: 10000, Test: 10000


In [5]:
# Define layers from scratch using NumPy

class Conv2DLayer:
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0):

        # Initialize parameters for Conv2D layer (channels, filters, kernel size, stride, padding)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding

        # Initialize weights and biases
        self.weights = np.random.randn(out_channels, in_channels, kernel_size, kernel_size).astype(np.float32) * 0.1
        self.biases = np.zeros(out_channels, dtype=np.float32)

        # Initialize gradients
        self.dweights = np.zeros_like(self.weights)
        self.dbiases = np.zeros_like(self.biases)
    
    # Forward propagation
    def forward(self, x):
        self.input = x

        # Get input dimensions (batch_size, in_channels, in_height, in_width)
        batch_size, in_c, in_h, in_w = x.shape

        # Calculate output dimensions and initialize output
        out_h = (in_h + 2 * self.padding - self.kernel_size) // self.stride + 1
        out_w = (in_w + 2 * self.padding - self.kernel_size) // self.stride + 1
        output = np.zeros((batch_size, self.out_channels, out_h, out_w), dtype=np.float32)
        
        # Padding input
        # Apply padding to the input if specified. Padding adds zeros around the spatial dimensions (height and width) 
        # to control the output size after convolution and help preserve spatial information.
        # np.pad uses ((batch_pad), (channel_pad), (height_pad), (width_pad)), so we pad only height and width.
        if self.padding > 0:
            x_padded = np.pad(x, ((0,0), (0,0), (self.padding,self.padding), (self.padding,self.padding)), mode='constant')
        else:
            x_padded = x
        
        # Loop over batch, output channels, and output spatial dimensions (height, width)
        for b in range(batch_size):
            for oc in range(self.out_channels):
                for i in range(out_h):
                    for j in range(out_w):

                        # Calculate the start and end indices for the current kernel window
                        h_start = i * self.stride
                        h_end = h_start + self.kernel_size
                        w_start = j * self.stride
                        w_end = w_start + self.kernel_size

                        # Extract the region of the input that the kernel will convolve with
                        region = x_padded[b, :, h_start:h_end, w_start:w_end]

                        # Perform the convolution: element-wise multiply region with kernel weights, sum, and add bias
                        output[b, oc, i, j] = np.sum(region * self.weights[oc]) + self.biases[oc]
        return output
    
    # Backward propagation
    def backward(self, d_out):
        # Get  output dimensions
        batch_size, _, out_h, out_w = d_out.shape

        # Initialize gradient for input
        d_input = np.zeros_like(self.input)

        # Use padding if necessary 
        if self.padding > 0:
            d_input_padded = np.pad(d_input, ((0,0), (0,0), (self.padding,self.padding), (self.padding,self.padding)), mode='constant')
        else:
            d_input_padded = d_input
        
        # Reset gradients
        self.dweights.fill(0)
        self.dbiases.fill(0)
        
        # Padding input for backward pass (if needed)
        if self.padding > 0:
            x_padded = np.pad(self.input, ((0,0), (0,0), (self.padding,self.padding), (self.padding,self.padding)), mode='constant')
        else:
            x_padded = self.input
        
        # Loop over batch, output channels, and output spatial dimensions (height, width)
        for b in range(batch_size):
            for oc in range(self.out_channels):
                for i in range(out_h):
                    for j in range(out_w):

                        # Calculate the start and end indices for the current kernel window
                        h_start = i * self.stride
                        h_end = h_start + self.kernel_size
                        w_start = j * self.stride
                        w_end = w_start + self.kernel_size
                        region = x_padded[b, :, h_start:h_end, w_start:w_end]
                        
                        # Compute gradients for input, weights, and biases 

                        # Update input gradients and weights gradients
                        d_input_padded[b, :, h_start:h_end, w_start:w_end] += self.weights[oc] * d_out[b, oc, i, j]
                        self.dweights[oc] += region * d_out[b, oc, i, j]
                # Update bias gradients
                self.dbiases[oc] += np.sum(d_out[b, oc])
        
        # Remove padding from d_input if applied
        if self.padding > 0:
            d_input = d_input_padded[:, :, self.padding:-self.padding, self.padding:-self.padding]
        else:
            d_input = d_input_padded
        return d_input

# Activation Function Layer
class ReLU:
    def forward(self, x):
        self.input = x
        return np.maximum(0, x)
    
    def backward(self, d_out):
        return d_out * (self.input > 0)

# Pooling Layer
class MaxPool2D:
    def __init__(self, kernel_size, stride):
        self.kernel_size = kernel_size
        self.stride = stride
    
    def forward(self, x):
        self.input = x
        batch_size, channels, in_h, in_w = x.shape

        # Calculate output dimensions and initialize output (like in Conv2DLayer)
        out_h = (in_h - self.kernel_size) // self.stride + 1
        out_w = (in_w - self.kernel_size) // self.stride + 1
        output = np.zeros((batch_size, channels, out_h, out_w))
        self.max_indices = np.zeros_like(output, dtype=int)
        
        # Perform max pooling
        for b in range(batch_size):
            for c in range(channels):
                for i in range(out_h):
                    for j in range(out_w):
                        h_start = i * self.stride
                        h_end = h_start + self.kernel_size
                        w_start = j * self.stride
                        w_end = w_start + self.kernel_size

                        # Retrieve the region to pool
                        region = x[b, c, h_start:h_end, w_start:w_end]
                        output[b, c, i, j] = np.max(region)

                        # Store the index of the max value for backpropagation
                        self.max_indices[b, c, i, j] = np.argmax(region)
        return output
    
    def backward(self, d_out):
        batch_size, channels, out_h, out_w = d_out.shape
        d_input = np.zeros_like(self.input)
        
        for b in range(batch_size):
            for c in range(channels):
                for i in range(out_h):
                    for j in range(out_w):
                        h_start = i * self.stride
                        h_end = h_start + self.kernel_size
                        w_start = j * self.stride
                        w_end = w_start + self.kernel_size

                        # Get the index of the max value from the forward pass
                        max_idx = self.max_indices[b, c, i, j]
                        d_input[b, c, h_start:h_end, w_start:w_end].flat[max_idx] += d_out[b, c, i, j]

        # We return the gradient with respect to the input of the pooling layer
        return d_input

# Fully Connected Layer
class FC:
    def __init__(self, in_features, out_features):
        self.weights = np.random.randn(out_features, in_features).astype(np.float32) * 0.1
        self.biases = np.zeros(out_features, dtype=np.float32)
        self.dweights = np.zeros_like(self.weights)
        self.dbiases = np.zeros_like(self.biases)
    
    def forward(self, x):
        self.input = x
        return np.dot(x, self.weights.T) + self.biases
    
    def backward(self, d_out):
        self.dweights = np.dot(d_out.T, self.input)
        self.dbiases = np.sum(d_out, axis=0)
        return np.dot(d_out, self.weights)

# Dropout Layer
class Dropout:
    def __init__(self, p=0.5):
        self.p = p
        self.mask = None
        self.training = True  # Set to False for inference
    
    def forward(self, x):
        if self.training and self.p > 0:
            self.mask = np.random.rand(*x.shape) > self.p
            return x * self.mask / (1 - self.p)
        return x
    
    def backward(self, d_out):
        if self.training and self.p > 0:
            return d_out * self.mask / (1 - self.p)
        return d_out

class CustomCNN:
    def __init__(self, dropout_rate=0.5):
        self.conv1 = Conv2DLayer(3, 32, 3, padding=1)
        self.relu1 = ReLU()
        self.pool1 = MaxPool2D(2, 2)
        self.conv2 = Conv2DLayer(32, 64, 3, padding=1)
        self.relu2 = ReLU()
        self.pool2 = MaxPool2D(2, 2)
        self.fc = FC(64 * 8 * 8, 10)
        self.dropout = Dropout(dropout_rate)
    
    # Forward propagation
    def forward(self, x):
        x = self.pool1.forward(self.relu1.forward(self.conv1.forward(x)))
        x = self.pool2.forward(self.relu2.forward(self.conv2.forward(x)))
        x = x.reshape(x.shape[0], -1)
        x = self.dropout.forward(self.fc.forward(x))
        return x
    
    # Backward propagation
    def backward(self, d_out):
        d_out = self.dropout.backward(d_out)
        d_out = self.fc.backward(d_out)
        d_out = d_out.reshape(d_out.shape[0], 64, 8, 8)
        d_out = self.pool2.backward(d_out)
        d_out = self.conv2.backward(self.relu2.backward(d_out))
        d_out = self.pool1.backward(d_out)
        d_out = self.conv1.backward(self.relu1.backward(d_out))
    
    # Update parameters with learning rate (gradient descent) and weight decay
    def update_params(self, lr, weight_decay=0):
        self.conv1.weights -= lr * (self.conv1.dweights + weight_decay * self.conv1.weights)
        self.conv1.biases -= lr * self.conv1.dbiases
        self.conv2.weights -= lr * (self.conv2.dweights + weight_decay * self.conv2.weights)
        self.conv2.biases -= lr * self.conv2.dbiases
        self.fc.weights -= lr * (self.fc.dweights + weight_decay * self.fc.weights)
        self.fc.biases -= lr * self.fc.dbiases
    
    def set_training(self, training=True):
        self.dropout.training = training

In [6]:
# Define loss function (Cross-Entropy)
def cross_entropy_loss(y_pred, y_true):
    batch_size = y_pred.shape[0]

    # Clip predictions to avoid log(0) error
    y_pred = np.clip(y_pred, 1e-7, 1 - 1e-7)
    log_probs = -np.log(y_pred[range(batch_size), y_true])
    return np.mean(log_probs)

# Softmax function
def softmax(x):
    exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
    return exp_x / np.sum(exp_x, axis=1, keepdims=True)

# Gradient of loss
def d_cross_entropy_loss(y_pred, y_true):
    batch_size = y_pred.shape[0]
    grad = y_pred.copy()

    # Subtract 1 from the predicted probabilities of the true classes 
    # to compute the gradient of the cross-entropy loss with respect to the predictions (softmax output).
    grad[range(batch_size), y_true] -= 1
    return grad / batch_size

In [7]:
# Training loop with fixed hyperparameters
import random
import time
np.random.seed(42)  # For reproducibility

# Fixed hyperparameters for performance
lr = 0.01  # Learning rate
bs = 128  # Batch size
dr = 0.5  # Dropout rate
wd = 1e-4  # Weight decay

model = CustomCNN(dropout_rate=dr)

num_epochs = 5  # Number of epochs
train_losses, val_losses, train_accs, val_accs = [], [], [], []

# Training and validation loop
for epoch in range(num_epochs):
    epoch_start = time.time()

    # Shuffle train data
    indices = np.random.permutation(len(train_images))
    train_images_shuf = train_images[indices]
    train_labels_shuf = train_labels[indices]
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    for i in range(0, len(train_images_shuf), bs):
        batch_x = train_images_shuf[i:i+bs]
        batch_y = train_labels_shuf[i:i+bs]
        
        # Apply data augmentation to training batch
        batch_x_aug = []
        for img in batch_x:
            img_hwc = img.transpose(1, 2, 0)  # (H, W, C)
            img_aug = augment_image(img_hwc)
            img_aug_chw = img_aug.transpose(2, 0, 1)  # (C, H, W)
            batch_x_aug.append(img_aug_chw)
        batch_x = np.array(batch_x_aug)
        
        # Forward
        out = model.forward(batch_x)
        probs = softmax(out)
        loss = cross_entropy_loss(probs, batch_y)
        
        # Backward
        d_loss = d_cross_entropy_loss(probs, batch_y)
        model.backward(d_loss)
        
        # Update
        model.update_params(lr, wd)
        
        running_loss += loss * len(batch_x)
        preds = np.argmax(probs, axis=1)
        correct += np.sum(preds == batch_y)
        total += len(batch_x)
    
    train_loss = running_loss / len(train_images)
    train_acc = 100 * correct / total
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    # Validation (disable dropout, no augmentation)
    model.set_training(False)
    val_bs = 64
    val_running_loss = 0.0
    val_correct = 0
    val_total = 0
    for j in range(0, len(val_images), val_bs):
        val_batch_x = val_images[j:j+val_bs]
        val_batch_y = val_labels[j:j+val_bs]
        val_out = model.forward(val_batch_x)
        val_probs = softmax(val_out)
        val_loss = cross_entropy_loss(val_probs, val_batch_y)
        val_preds = np.argmax(val_probs, axis=1)
        val_running_loss += val_loss * len(val_batch_x)
        val_correct += np.sum(val_preds == val_batch_y)
        val_total += len(val_batch_x)
    
    model.set_training(True)  # Re-enable for next epoch
    
    val_loss = val_running_loss / len(val_images)
    val_acc = 100 * val_correct / val_total
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    epoch_time = time.time() - epoch_start
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%, Time: {epoch_time:.2f}s")

KeyboardInterrupt: 

In [None]:
# Evaluate the CNN on the test dataset (unknown data)

# Load test data
test_images, test_labels = load_cifar10_batch(f'{data_dir}/test_batch')

# Normalize test images
test_images = (test_images / 255.0 - mean) / std
test_images = test_images.transpose(0, 3, 1, 2)  # (N, C, H, W)

model.set_training(False)  # Disable dropout for inference

# Evaluate on test set
test_out = model.forward(test_images)
test_probs = softmax(test_out)
test_preds = np.argmax(test_probs, axis=1)

test_acc = 100 * np.sum(test_preds == test_labels) / len(test_labels)
print(f"Test Accuracy: {test_acc:.2f}%")

# Confusion Matrix
cm = confusion_matrix(test_labels, test_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=[f'Class {i}' for i in range(10)], yticklabels=[f'Class {i}' for i in range(10)])
plt.title('Confusion Matrix on Test Set')
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

print(classification_report(test_labels, test_preds))

# Show some example predictions
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

# Denormalize for display
test_images_display = test_images.transpose(0, 2, 3, 1) * std + mean
test_images_display = np.clip(test_images_display, 0, 1)

# Select 5 random examples
np.random.seed(42)
indices = np.random.choice(len(test_images), 5, replace=False)

fig, axes = plt.subplots(1, 5, figsize=(15, 3))
for i, idx in enumerate(indices):
    img = test_images_display[idx]
    true_label = class_names[test_labels[idx]]
    pred_label = class_names[test_preds[idx]]
    axes[i].imshow(img)
    axes[i].set_title(f'True: {true_label}\nPred: {pred_label}')
    axes[i].axis('off')
plt.show()

In [None]:
# Model Evaluation
# Plot training vs validation curves using Seaborn
df = pd.DataFrame({
    'Epoch': range(1, len(train_losses) + 1),
    'Train Loss': train_losses,
    'Val Loss': val_losses,
    'Train Acc': train_accs,
    'Val Acc': val_accs
})

df_melted_loss = df.melt(id_vars='Epoch', value_vars=['Train Loss', 'Val Loss'], var_name='Type', value_name='Loss')
df_melted_acc = df.melt(id_vars='Epoch', value_vars=['Train Acc', 'Val Acc'], var_name='Type', value_name='Accuracy')

plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
sns.lineplot(data=df_melted_loss, x='Epoch', y='Loss', hue='Type')
plt.title('Loss Curves')

plt.subplot(1, 2, 2)
sns.lineplot(data=df_melted_acc, x='Epoch', y='Accuracy', hue='Type')
plt.title('Accuracy Curves')
plt.show()