# Bonus Deep Learning Project 1

In [1]:
import numpy as np
import struct
import gzip
import urllib.request
import os

In [2]:
def download_mnist(path='mnist'):
    os.makedirs(path, exist_ok=True)
    base_url = "https://storage.googleapis.com/cvdf-datasets/mnist/"
    files = {
        "train_images": "train-images-idx3-ubyte.gz",
        "train_labels": "train-labels-idx1-ubyte.gz",
        "test_images": "t10k-images-idx3-ubyte.gz",
        "test_labels": "t10k-labels-idx1-ubyte.gz"
    }
    
    for key, filename in files.items():
        filepath = os.path.join(path, filename)
        
        if not os.path.exists(filepath):
            print(f"Downloading {filename}...")
            urllib.request.urlretrieve(base_url + filename, filepath)
            
    print("MNIST dataset downloaded.")

def load_mnist_images(filename):
    with gzip.open(filename, 'rb') as f:
        magic, num, rows, cols = struct.unpack(">IIII", f.read(16))
        images = np.frombuffer(f.read(), dtype=np.uint8).reshape(num, rows, cols)
        images = images.astype(np.float32) / 255.0  # Normalize to [0,1]
        return images

def load_mnist_labels(filename):
    with gzip.open(filename, 'rb') as f:
        magic, num = struct.unpack(">II", f.read(8))
        labels = np.frombuffer(f.read(), dtype=np.uint8)
        return labels

def load_mnist(path='mnist'):
    download_mnist(path)
    train_images = load_mnist_images(os.path.join(path, "train-images-idx3-ubyte.gz"))
    train_labels = load_mnist_labels(os.path.join(path, "train-labels-idx1-ubyte.gz"))
    test_images = load_mnist_images(os.path.join(path, "t10k-images-idx3-ubyte.gz"))
    test_labels = load_mnist_labels(os.path.join(path, "t10k-labels-idx1-ubyte.gz"))
    return train_images, train_labels, test_images, test_labels

In [3]:
# Load MNIST training images (for speed, we use a small subset)
train_images, train_labels, test_images, test_labels = load_mnist()

MNIST dataset downloaded.


# Core Layer Classes

In [4]:
class FullyConnected:
    def __init__(self, in_dim, out_dim):
        self.W = np.random.randn(in_dim, out_dim) * 0.02
        self.b = np.zeros((1, out_dim))

    def forward(self, x):
        self.x = x
        self.z = x @ self.W + self.b  
        
        return self.z 

    def backward(self, grad, lr):
        # Calculate gradients
        dW = self.x.T.dot(grad)
        db = np.sum(grad, axis = 0, keepdims = True)
        dx = grad.dot(self.W.T)

        # Gradient updates
        self.W -= lr * dW
        self.b -= lr * db

        return dx

In [5]:
class Conv2D:
    def __init__(self, in_ch, out_ch, k, stride = 1, pad = 0):
        self.stride = stride
        self.pad = pad
        self.W = np.random.randn(out_ch, in_ch, k, k) * 0.02
        self.b = np.zeros((out_ch,))
        

    def forward(self, x):
        N, C, H, width = x.shape
        F, _, k, _ = self.W.shape

        h_out = ((H + (2 * self.pad) - k) // self.stride) + 1
        w_out = ((width + (2 * self.pad) - k) // self.stride) + 1

        # Now we pad x
        x_padded = np.pad(
                x,
                pad_width=((0,0), (0,0), (self.pad,self.pad), (self.pad,self.pad)),
                mode='constant',
                constant_values=0
            )

        out = np.zeros((N, self.W.shape[0], h_out, w_out), dtype = x.dtype)

        for m in range(N):
            for n in range(F):
                for i in range(h_out):
                    for j in range(w_out):
                        vertical = i * self.stride
                        horizontal = j * self.stride

                        area = x_padded[m, :, vertical : vertical + k, horizontal : horizontal + k]
                        out[m, n, i, j] = np.sum(area * self.W[n]) + self.b[n]

        self.x_padded = x_padded
        self.x_shape = x.shape
                        
        return out
                

    def backward(self, grad, lr):
        N, C, H_pad, W_pad = self.x_padded.shape
        x_padded = self.x_padded
        F, _, k, _ = self.W.shape 
        _, _, h_out, w_out = grad.shape

        # Initializing gradients
        dW = np.zeros_like(self.W)           
        db = np.zeros_like(self.b)         
        dx_padded = np.zeros_like(x_padded) 

        for m in range(N):
            for n in range(F):
                for i in range(h_out):
                    for j in range(w_out):
                        vert_start = i * self.stride
                        horiz_start = j * self.stride

                        area = x_padded[m, :, vert_start:vert_start + k, horiz_start:horiz_start + k]
                        dW[n] += grad[m, n, i, j] * area
                        db[n] += grad[m, n, i, j]

                        dx_padded[m, :, vert_start:vert_start + k, horiz_start:horiz_start + k] += grad[m, n, i, j] * self.W[n]
                        
        # Unpad
        if self.pad>0:
            dx = dx_padded[:,:,self.pad:-self.pad,self.pad:-self.pad]
        else:
            dx = dx_padded
            
        # Update Gradients
        self.W -= lr * dW
        self.b -= lr * db
        return dx

In [6]:
class AvgPool2D:
    def __init__(self, k, stride = 1, pad = 0):
        self.k = k
        self.stride = stride
        self.pad = pad

    def forward(self, x):
        N, C, H, W = x.shape
        k = self.k

        h_out = ((H + 2*self.pad - k) // self.stride) + 1
        w_out = ((W + 2*self.pad - k) // self.stride) + 1

        # Pad Input
        x_padded = np.pad(
            x,
            pad_width=((0,0), (0,0), (self.pad,self.pad), (self.pad,self.pad)),
            mode = 'constant',
            constant_values = 0
        )

        out = np.zeros((N, C, h_out, w_out), dtype = x.dtype)

        for m in range(N):
            for n in range(C):
                for i in range(h_out):
                    for j in range(w_out):
                        vert_start = i * self.stride
                        horiz_start = j * self.stride

                        region = x_padded[m, n, vert_start:vert_start+k, horiz_start:horiz_start+k]
                        out[m, n, i, j] = np.mean(region)

        self.x_padded = x_padded
        self.x_shape = x.shape

        return out

    def backward(self, grad):
        N, C, H_pad, W_pad = self.x_padded.shape
        k = self.k
        _, _, h_out, w_out = grad.shape

        dx_padded = np.zeros_like(self.x_padded)

        for m in range(N):
            for n in range(C):
                for i in range(h_out):
                    for j in range(w_out):
                        vert_start = i * self.stride
                        horiz_start = j * self.stride

                        dx_padded[m, n, vert_start:vert_start+k, horiz_start:horiz_start+k] += grad[m,n,i,j] / (k*k)

        # unpad
        if self.pad > 0:
            dx = dx_padded[:, :, self.pad:-self.pad, self.pad:-self.pad]
        else:
            dx = dx_padded

        return dx

# Activation Functions & Their Derivatives

In [7]:
def sigmoid(x): 
    return 1/(1+np.exp(-x))
    
def sigmoid_deriv(x):
    s = sigmoid(x)
    return s*(1-s)

In [8]:
def relu(x): 
    return np.maximum(0,x)
    
def relu_deriv(x): 
    return (x>0).astype(float)

In [9]:
def tanh(x): 
    return np.tanh(x)
    
def tanh_deriv(x): 
    return 1 - np.tanh(x)**2

# CNN Classifier

In [10]:
class CNN:
    def __init__(self):
        self.conv1 = Conv2D(in_ch = 1, out_ch = 2, k =3, stride = 1, pad = 1)
        self.pool = AvgPool2D(k = 2, stride = 2)
        self.conv2 = Conv2D(in_ch = 2, out_ch = 10, k = 1, stride = 1, pad = 0)

    def forward(self, x):
        self.z1 = self.conv1.forward(x)
        a1 = sigmoid(self.z1)               # (N,2,28,28)
        
        p  = self.pool.forward(a1)     # (N,2,14,14)
        z2 = self.conv2.forward(p)     # (N,10,14,14)
        
        self.logits = z2.mean(axis=(2,3)) # Global average pool to get (N, 10)

        # Softmax
        ex = np.exp(self.logits - self.logits.max(axis = 1,keepdims = True))
        self.probs = ex / ex.sum(axis = 1,keepdims = True)
        
        return self.probs

    def backward(self, y_true, lr):
        N = y_true.shape[0]

        grad_logits = (self.probs - y_true) / N
        grad_z2 = grad_logits[:, :, None, None] / (14 * 14) # Expanding the gradient across 14 x 14 spatial map

        grad_p = self.conv2.backward(grad_z2, lr) # Shape (N, 2, 14, 14)

        grad_a1 = self.pool.backward(grad_p)
        grad_z1 = grad_a1 * sigmoid_deriv(self.z1)

        _ = self.conv1.backward(grad_z1, lr) # Backpropogating through conv1

# Training Loop

In [11]:
# Binary Cross-Entropy Loss Function
def bce_loss(y_pred, y_true):
    eps = 1e-8
    return -np.mean(y_true * np.log(y_pred + eps) + (1 - y_true) * np.log(1 - y_pred + eps))

In [12]:
# Accuracy Function
def accuracy(probs, y_true):
    return np.mean(np.argmax(probs, axis=1) == np.argmax(y_true, axis=1))

In [13]:
def train(model, train_images, train_labels, test_images, test_labels, epochs = 50, batch_size = 64, lr = 1e-3):
    N = train_images.shape[0]
    
    # One-hot encode labels
    Y_train = np.eye(10)[train_labels]
    Y_test  = np.eye(10)[test_labels]

    history = {
        'train_loss': [], 'train_acc': [],
        'test_loss':  [], 'test_acc':  []
    }

    for epoch in range(1, epochs + 1):
        perm = np.random.permutation(N)
        losses, accs = [], []

        for i in range(0, N, batch_size):
            idx = perm[i:i+batch_size]
            x  = train_images[idx][:, None, :, :]  
            y  = Y_train[idx]

            probs = model.forward(x)
            loss  = bce_loss(probs, y)
            acc   = accuracy(probs, y)
            model.backward(y, lr)

            losses.append(loss)
            accs.append(acc)

        train_loss = np.mean(losses)
        train_acc  = np.mean(accs)

        probs_test = model.forward(test_images[:, None, :, :])
        test_loss  = bce_loss(probs_test, Y_test)
        test_acc   = accuracy(probs_test, Y_test)

        # Record values
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['test_loss'].append(test_loss)
        history['test_acc'].append(test_acc)

        print(f"Epoch {epoch}/{epochs}  "
              f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f}  "
              f"test_loss={test_loss:.4f}, test_acc={test_acc:.4f}")

    # Plotting
    epochs_range = np.arange(1, epochs+1)
    plt.figure(figsize=(12,4))

    plt.subplot(1,2,1)
    plt.plot(epochs_range, history['train_loss'], label='Train Loss')
    plt.plot(epochs_range, history['test_loss'],  label='Test Loss')
    plt.xlabel('Epoch'); plt.ylabel('Loss')
    plt.title('Loss'); plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epochs_range, history['train_acc'], label='Train Accuracy')
    plt.plot(epochs_range, history['test_acc'],  label='Test Accuracy')
    plt.xlabel('Epoch'); plt.ylabel('Accuracy')
    plt.title('Accuracy'); plt.legend()

    plt.tight_layout()
    plt.show()

    return history

# Main

In [None]:
cnn = CNN()
history = train(
    cnn,
    train_images, train_labels,
    test_images,  test_labels,
    epochs = 100,
    batch_size = 128,
    lr = 1e-3
)