# Convolutional Neural Networks (CNNs)

Convolutional Neural Networks (CNNs) derive their name from the convolution operation that occurs within them. This type of neural network builds upon the Multi-Layer Perceptron (MLP) model. 

## Why CNNs?
The limitation of MLPs is that they only work with flattened arrays of data. For example, an image must be flattened into a one-dimensional array to be processed by an MLP. However, this flattening process often results in the loss of spatial information. Additionally, most real-world data, such as images and audio, is multi-dimensional. 

### Example:
- Images are represented as matrices (e.g., a 1020x720 image has 1020 rows and 720 columns of pixels).
- Each pixel has an RGB value, adding a third dimension to the data.

Flattening such data into a single array can be computationally expensive and inefficient. CNNs address these challenges by preserving the spatial structure of the data and efficiently processing multi-dimensional inputs.

---

## CNN Architecture

CNNs are specialized neural networks designed for processing data with a grid-like topology, such as images. They consist of the following layers:

### 1. **Convolutional Layers**
- These layers apply convolutional filters (kernels) to the input data to extract local features such as edges, textures, and patterns.
- The kernels contain weights that are learned through backpropagation, similar to the MLP model.

### 2. **Pooling Layers**
- Pooling layers perform sub-sampling or down-sampling, reducing the dimensions of the input data. This helps the network recognize objects even when they are deformed or appear in different lighting conditions.
- **Max Pooling** is a common pooling technique. It extracts the maximum value within a selected region of the feature map.

#### Example of Max Pooling:
**Input Feature Map**:
\[
\begin{bmatrix}
1 & 3 & 2 & 4 \\
5 & 6 & 7 & 8 \\
9 & 2 & 4 & 3 \\
6 & 7 & 8 & 9
\end{bmatrix}
\]

If we perform max pooling with a stride of 2, the output feature map will be:

**Output Feature Map**:
\[
\begin{bmatrix}
6 & 8 \\
9 & 9
\end{bmatrix}
\]

### 3. **Fully Connected Layers**
- These layers are similar to those in MLPs and are typically used for the final output.
- They are used for tasks such as:
    - **Classification**: Predicting categories.
    - **Regression**: Predicting continuous values.
    - **Probability Estimation**: Outputting probabilities for different classes.

### 4. **Activation Layers**
- Activation layers, such as ReLU (Rectified Linear Unit), introduce non-linearity into the model.
- They can down-sample the output from previous layers into a range (e.g., 0 to 1) or compute binary values, depending on the task.

---

CNNs are powerful tools for processing multi-dimensional data, especially images, and have become a cornerstone of modern deep learning applications.

In [1]:
import numpy as np
import pandas as pd
import scipy
import scipy.signal
import pickle
import os
from typing import Union, List, Tuple, Dict, Any, Optional

# Base Layer Class for all layer types
class Layer:
    def __init__(self):
        self.input = None
        self.output = None
        
    def forward(self, input_data):
        # Forward pass - to be implemented by subclasses
        raise NotImplementedError
    
    def backward(self, output_gradient, learning_rate):
        # Backward pass - to be implemented by subclasses
        raise NotImplementedError

# Convolutional Layer
class Conv2D(Layer):
    def __init__(self, input_shape, kernel_size, depth):
        """
        Initialize convolutional layer
        
        :param input_shape: (height, width, channels)
        :param kernel_size: Size of the convolution kernel (height, width)
        :param depth: Number of kernels/filters
        """
        super().__init__()
        self.input_shape = input_shape
        self.input_height, self.input_width, self.input_channels = input_shape
        self.kernel_size = kernel_size
        self.depth = depth
        
        # Initialize filters with Xavier/Glorot initialization
        self.kernels_shape = (kernel_size[0], kernel_size[1], self.input_channels, depth)
        limit = np.sqrt(6 / (np.prod(kernel_size) * self.input_channels + np.prod(kernel_size) * depth))
        self.kernels = np.random.uniform(-limit, limit, self.kernels_shape)
        self.biases = np.zeros(depth)
        
        # For Adam optimizer
        self.m_kernels = np.zeros_like(self.kernels)
        self.v_kernels = np.zeros_like(self.kernels)
        self.m_biases = np.zeros_like(self.biases)
        self.v_biases = np.zeros_like(self.biases)
        
        # Calculate output dimensions
        self.output_shape = (
            self.input_height - kernel_size[0] + 1,
            self.input_width - kernel_size[1] + 1,
            depth
        )
    
    def forward(self, input_data):
        """
        Forward pass for convolutional layer
        
        :param input_data: Input data of shape (batch_size, height, width, channels)
        :return: Output of shape (batch_size, new_height, new_width, depth)
        """
        self.input = input_data
        batch_size = input_data.shape[0]
        
        # Initialize output array
        self.output = np.zeros((batch_size, *self.output_shape))
        
        # Perform convolution for each sample in batch
        for i in range(batch_size):
            for d in range(self.depth):
                for c in range(self.input_channels):
                    # Convolve each channel with corresponding kernel
                    self.output[i, :, :, d] += scipy.signal.convolve2d(
                        self.input[i, :, :, c], 
                        self.kernels[:, :, c, d], 
                        mode='valid'
                    )
                # Add bias
                self.output[i, :, :, d] += self.biases[d]
        
        return self.output
    
    def backward(self, output_gradient, learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8, t=1):
        """
        Backward pass for convolutional layer using Adam optimizer
        
        :param output_gradient: Gradient from next layer of shape (batch_size, height, width, depth)
        :param learning_rate: Learning rate for optimizer
        :param beta1: Exponential decay rate for 1st moment estimates
        :param beta2: Exponential decay rate for 2nd moment estimates
        :param epsilon: Small constant for numerical stability
        :param t: Timestep (for bias correction in Adam)
        :return: Gradient with respect to input
        """
        batch_size = output_gradient.shape[0]
        kernels_gradient = np.zeros_like(self.kernels)
        biases_gradient = np.zeros_like(self.biases)
        input_gradient = np.zeros_like(self.input)
        
        # Calculate gradients for each sample in batch
        for i in range(batch_size):
            for d in range(self.depth):
                # Gradient for biases - simple sum over height and width dimensions
                biases_gradient[d] += np.sum(output_gradient[i, :, :, d])
                
                for c in range(self.input_channels):
                    # Gradient for kernels - correlation between input and output gradient
                    kernels_gradient[:, :, c, d] += scipy.signal.correlate2d(
                        self.input[i, :, :, c],
                        output_gradient[i, :, :, d],
                        mode='valid'
                    )
                    
                    # Gradient for input - full convolution with rotated kernel
                    rotated_kernel = np.rot90(self.kernels[:, :, c, d], 2)
                    input_gradient[i, :, :, c] += scipy.signal.convolve2d(
                        output_gradient[i, :, :, d],
                        rotated_kernel,
                        mode='full'
                    )
        
        # Update kernels and biases using Adam optimizer
        # For kernels
        self.m_kernels = beta1 * self.m_kernels + (1 - beta1) * kernels_gradient
        self.v_kernels = beta2 * self.v_kernels + (1 - beta2) * (kernels_gradient ** 2)
        m_hat_kernels = self.m_kernels / (1 - beta1 ** t)
        v_hat_kernels = self.v_kernels / (1 - beta2 ** t)
        self.kernels -= learning_rate * m_hat_kernels / (np.sqrt(v_hat_kernels) + epsilon)
        
        # For biases
        self.m_biases = beta1 * self.m_biases + (1 - beta1) * biases_gradient
        self.v_biases = beta2 * self.v_biases + (1 - beta2) * (biases_gradient ** 2)
        m_hat_biases = self.m_biases / (1 - beta1 ** t)
        v_hat_biases = self.v_biases / (1 - beta2 ** t)
        self.biases -= learning_rate * m_hat_biases / (np.sqrt(v_hat_biases) + epsilon)
        
        return input_gradient

# MaxPooling Layer
class MaxPool2D(Layer):
    def __init__(self, pool_size=(2, 2), stride=None):
        """
        Initialize max pooling layer
        
        :param pool_size: Size of the pooling window (height, width)
        :param stride: Stride of the pooling operation, defaults to pool_size
        """
        super().__init__()
        self.pool_size = pool_size
        self.stride = stride if stride is not None else pool_size
        self.max_indices = None  # To store indices of max values for backprop
    
    def forward(self, input_data):
        """
        Forward pass for max pooling layer
        
        :param input_data: Input data of shape (batch_size, height, width, channels)
        :return: Output after max pooling
        """
        self.input = input_data
        batch_size, h_in, w_in, channels = input_data.shape
        h_pool, w_pool = self.pool_size
        h_stride, w_stride = self.stride
        
        # Calculate output dimensions
        h_out = (h_in - h_pool) // h_stride + 1
        w_out = (w_in - w_pool) // w_stride + 1
        
        output = np.zeros((batch_size, h_out, w_out, channels))
        self.max_indices = np.zeros((batch_size, h_out, w_out, channels, 2), dtype=int)
        
        # Perform max pooling
        for b in range(batch_size):
            for i in range(h_out):
                for j in range(w_out):
                    for c in range(channels):
                        h_start = i * h_stride
                        h_end = h_start + h_pool
                        w_start = j * w_stride
                        w_end = w_start + w_pool
                        
                        # Get the region to pool from
                        pool_region = input_data[b, h_start:h_end, w_start:w_end, c]
                        
                        # Find max value and its position within the pool region
                        max_val = np.max(pool_region)
                        max_pos = np.unravel_index(np.argmax(pool_region), pool_region.shape)
                        
                        # Store max value and its position for backprop
                        output[b, i, j, c] = max_val
                        self.max_indices[b, i, j, c] = max_pos
        
        self.output = output
        return output
    
    def backward(self, output_gradient, learning_rate=None, **kwargs):
        """
        Backward pass for max pooling layer
        
        :param output_gradient: Gradient from next layer
        :param learning_rate: Not used for pooling layer
        :return: Gradient with respect to input
        """
        batch_size, h_out, w_out, channels = output_gradient.shape
        h_in, w_in = self.input.shape[1:3]
        h_pool, w_pool = self.pool_size
        h_stride, w_stride = self.stride
        
        input_gradient = np.zeros_like(self.input)
        
        # Distribute gradient only to max elements
        for b in range(batch_size):
            for i in range(h_out):
                for j in range(w_out):
                    for c in range(channels):
                        h_start = i * h_stride
                        w_start = j * w_stride
                        h_max, w_max = self.max_indices[b, i, j, c]
                        
                        # Add gradient to the position where the max was found
                        input_gradient[b, h_start + h_max, w_start + w_max, c] += output_gradient[b, i, j, c]
        
        return input_gradient

# Flatten Layer
class Flatten(Layer):
    def __init__(self):
        super().__init__()
        self.input_shape = None
    
    def forward(self, input_data):
        """
        Forward pass for flatten layer
        
        :param input_data: Input data of shape (batch_size, height, width, channels)
        :return: Flattened data of shape (batch_size, height*width*channels)
        """
        self.input = input_data
        self.input_shape = input_data.shape
        batch_size = input_data.shape[0]
        flattened_dim = np.prod(input_data.shape[1:])
        
        self.output = input_data.reshape(batch_size, flattened_dim)
        return self.output
    
    def backward(self, output_gradient, learning_rate=None, **kwargs):
        """
        Backward pass for flatten layer
        
        :param output_gradient: Gradient from next layer
        :param learning_rate: Not used for flatten layer
        :return: Gradient with respect to input
        """
        return output_gradient.reshape(self.input_shape)

# Dense (Fully Connected) Layer
class Dense(Layer):
    def __init__(self, input_size, output_size, activation='relu'):
        """
        Initialize dense (fully connected) layer
        
        :param input_size: Number of input features
        :param output_size: Number of output features
        :param activation: Activation function ('sigmoid', 'relu', 'leaky_relu', 'linear')
        """
        super().__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.activation_type = activation
        
        # Xavier/Glorot initialization
        limit = np.sqrt(6 / (input_size + output_size))
        self.weights = np.random.uniform(-limit, limit, (input_size, output_size))
        self.biases = np.zeros(output_size)
        
        # For Adam optimizer
        self.m_weights = np.zeros_like(self.weights)
        self.v_weights = np.zeros_like(self.weights)
        self.m_biases = np.zeros_like(self.biases)
        self.v_biases = np.zeros_like(self.biases)
        
        # Set activation function
        self.activation_func = self._get_activation(self.activation_type)
        self.activation_derivative = self._get_activation_derivative(self.activation_type)
    
    def _get_activation(self, name):
        if name == 'sigmoid':
            return lambda x: 1 / (1 + np.exp(-np.clip(x, -500, 500)))
        elif name == 'relu':
            return lambda x: np.maximum(0, x)
        elif name == 'leaky_relu':
            return lambda x: np.where(x > 0, x, x * 0.01)
        elif name == 'softmax':
            def softmax(x):
                exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
                return exp_x / (np.sum(exp_x, axis=-1, keepdims=True) + 1e-9)
            return softmax
        elif name == 'linear':
            return lambda x: x
        else:
            raise ValueError(f"Unknown activation function: '{name}'")
    
    def _get_activation_derivative(self, name):
        if name == 'sigmoid':
            return lambda x: x * (1 - x)
        elif name == 'relu':
            return lambda x: np.where(x > 0, 1, 0)
        elif name == 'leaky_relu':
            return lambda x: np.where(x > 0, 1, 0.01)
        elif name == 'linear':
            return lambda x: np.ones_like(x)
        elif name == 'softmax':
            return lambda x: x * (1 - x)  # Simplified for when used with cross-entropy
        else:
            raise ValueError(f"Unknown activation function derivative: '{name}'")
    
    def forward(self, input_data):
        """
        Forward pass for dense layer
        
        :param input_data: Input data of shape (batch_size, input_size)
        :return: Output after dense layer and activation
        """
        self.input = input_data
        self.z = np.dot(input_data, self.weights) + self.biases
        self.output = self.activation_func(self.z)
        return self.output
    
    def backward(self, output_gradient, learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8, t=1):
        """
        Backward pass for dense layer using Adam optimizer
        
        :param output_gradient: Gradient from next layer
        :param learning_rate: Learning rate for optimizer
        :param beta1: Exponential decay rate for 1st moment estimates
        :param beta2: Exponential decay rate for 2nd moment estimates
        :param epsilon: Small constant for numerical stability
        :param t: Timestep (for bias correction in Adam)
        :return: Gradient with respect to input
        """
        # Calculate gradient through activation function
        if self.activation_type == 'softmax':
            # Special case for softmax (assuming cross-entropy loss)
            delta = output_gradient
        else:
            delta = output_gradient * self.activation_derivative(self.output)
        
        # Calculate gradients for weights and biases
        weights_gradient = np.dot(self.input.T, delta)
        biases_gradient = np.sum(delta, axis=0)
        
        # Calculate gradient to pass to previous layer
        input_gradient = np.dot(delta, self.weights.T)
        
        # Update weights and biases using Adam optimizer
        # For weights
        self.m_weights = beta1 * self.m_weights + (1 - beta1) * weights_gradient
        self.v_weights = beta2 * self.v_weights + (1 - beta2) * (weights_gradient ** 2)
        m_hat_weights = self.m_weights / (1 - beta1 ** t)
        v_hat_weights = self.v_weights / (1 - beta2 ** t)
        self.weights -= learning_rate * m_hat_weights / (np.sqrt(v_hat_weights) + epsilon)
        
        # For biases
        self.m_biases = beta1 * self.m_biases + (1 - beta1) * biases_gradient
        self.v_biases = beta2 * self.v_biases + (1 - beta2) * (biases_gradient ** 2)
        m_hat_biases = self.m_biases / (1 - beta1 ** t)
        v_hat_biases = self.v_biases / (1 - beta2 ** t)
        self.biases -= learning_rate * m_hat_biases / (np.sqrt(v_hat_biases) + epsilon)
        
        return input_gradient

# Activation Layer as a separate layer
class Activation(Layer):
    def __init__(self, activation):
        """
        Initialize activation layer
        
        :param activation: Activation function name
        """
        super().__init__()
        self.activation_type = activation
        self.activation_func = self._get_activation(activation)
        self.activation_derivative = self._get_activation_derivative(activation)
    
    def _get_activation(self, name):
        if name == 'sigmoid':
            return lambda x: 1 / (1 + np.exp(-np.clip(x, -500, 500)))
        elif name == 'relu':
            return lambda x: np.maximum(0, x)
        elif name == 'leaky_relu':
            return lambda x: np.where(x > 0, x, x * 0.01)
        elif name == 'softmax':
            def softmax(x):
                exp_x = np.exp(x - np.max(x, axis=-1, keepdims=True))
                return exp_x / (np.sum(exp_x, axis=-1, keepdims=True) + 1e-9)
            return softmax
        elif name == 'linear':
            return lambda x: x
        else:
            raise ValueError(f"Unknown activation function: '{name}'")
    
    def _get_activation_derivative(self, name):
        if name == 'sigmoid':
            return lambda x: x * (1 - x)
        elif name == 'relu':
            return lambda x: np.where(x > 0, 1, 0)
        elif name == 'leaky_relu':
            return lambda x: np.where(x > 0, 1, 0.01)
        elif name == 'linear':
            return lambda x: np.ones_like(x)
        elif name == 'softmax':
            return lambda x: x * (1 - x)  # Simplified for when used with cross-entropy
        else:
            raise ValueError(f"Unknown activation function derivative: '{name}'")
    
    def forward(self, input_data):
        """
        Forward pass for activation layer
        
        :param input_data: Input data
        :return: Output after activation
        """
        self.input = input_data
        self.output = self.activation_func(input_data)
        return self.output
    
    def backward(self, output_gradient, learning_rate=None, **kwargs):
        """
        Backward pass for activation layer
        
        :param output_gradient: Gradient from next layer
        :param learning_rate: Not used for activation layer
        :return: Gradient with respect to input
        """
        if self.activation_type == 'softmax':
            # Special case for softmax (assuming cross-entropy loss)
            return output_gradient
        return output_gradient * self.activation_derivative(self.output)

# CNN Model
class CNN:
    def __init__(self, learning_rate=0.001):
        """
        Initialize CNN model
        
        :param learning_rate: Learning rate for optimizer
        """
        self.layers = []
        self.learning_rate = learning_rate
        self.t = 0  # Time step for Adam optimizer
    
    def add(self, layer):
        """
        Add a layer to the model
        
        :param layer: Layer to add
        """
        self.layers.append(layer)
    
    def predict(self, input_data):
        """
        Make predictions with the model
        
        :param input_data: Input data
        :return: Model predictions
        """
        # Ensure input data is in batch format
        if input_data.ndim == 3:  # Single image (height, width, channels)
            input_data = np.expand_dims(input_data, axis=0)
        
        output = input_data
        for layer in self.layers:
            output = layer.forward(output)
        
        return output
    
    def train(self, X_train, y_train, epochs, batch_size=32, verbose=True):
        """
        Train the model
        
        :param X_train: Training data
        :param y_train: Training labels
        :param epochs: Number of epochs
        :param batch_size: Batch size
        :param verbose: Whether to print progress
        """
        # Ensure X_train is in batch format
        if X_train.ndim == 3:  # Single image (height, width, channels)
            X_train = np.expand_dims(X_train, axis=0)
        
        # Ensure y_train is in batch format
        if y_train.ndim == 1:  # Single label
            y_train = np.expand_dims(y_train, axis=0)
        
        n_samples = X_train.shape[0]
        
        for epoch in range(epochs):
            # Shuffle the data
            indices = np.random.permutation(n_samples)
            X_shuffled = X_train[indices]
            y_shuffled = y_train[indices]
            
            loss = 0
            
            # Train in batches
            for i in range(0, n_samples, batch_size):
                # Get batch
                X_batch = X_shuffled[i:min(i + batch_size, n_samples)]
                y_batch = y_shuffled[i:min(i + batch_size, n_samples)]
                
                # Forward pass
                output = self.predict(X_batch)
                
                # Compute loss
                loss += self._compute_loss(y_batch, output)
                
                # Backward pass
                self.t += 1  # Increment time step for Adam optimizer
                grad = self._compute_loss_gradient(y_batch, output)
                
                for layer in reversed(self.layers):
                    grad = layer.backward(grad, self.learning_rate, t=self.t)
            
            # Average loss over all batches
            loss /= n_samples / batch_size
            
            if verbose and (epoch % 10 == 0 or epoch == epochs - 1):
                print(f"Epoch {epoch+1}/{epochs}, Loss: {loss:.4f}")
    
    def _compute_loss(self, y_true, y_pred):
        """
        Compute loss
        
        :param y_true: True labels
        :param y_pred: Predicted labels
        :return: Loss value
        """
        # Mean Squared Error
        return np.mean(np.sum((y_true - y_pred) ** 2, axis=1))
    
    def _compute_loss_gradient(self, y_true, y_pred):
        """
        Compute gradient of loss
        
        :param y_true: True labels
        :param y_pred: Predicted labels
        :return: Gradient of loss
        """
        # Gradient of Mean Squared Error
        return 2 * (y_pred - y_true) / y_true.shape[0]
    
    def save_model(self, filename):
        """
        Save model to file
        
        :param filename: Filename to save to
        """
        with open(filename, 'wb') as f:
            pickle.dump(self, f)
    
    @classmethod
    def load_model(cls, filename):
        """
        Load model from file
        
        :param filename: Filename to load from
        :return: Loaded model
        """
        with open(filename, 'rb') as f:
            return pickle.load(f)

# Example usage with MNIST-like data (28x28)
def generate_dummy_data(n_samples=100, img_size=(28, 28, 1)):
    # Generate dummy images (random noise)
    X = np.random.random((n_samples, *img_size))
    # Generate dummy labels (binary classification)
    y = np.random.randint(0, 2, (n_samples, 1))
    return X, y

# Create a simple CNN model for binary classification
def create_simple_cnn():
    model = CNN(learning_rate=0.001)
    
    # Add layers
    # Conv layer: input_shape=(28,28,1), kernel_size=(3,3), 8 filters
    model.add(Conv2D(input_shape=(28, 28, 1), kernel_size=(3, 3), depth=8))
    model.add(Activation('relu'))
    
    # MaxPooling layer: pool_size=(2,2)
    model.add(MaxPool2D(pool_size=(2, 2)))
    
    # Conv layer: kernel_size=(3,3), 16 filters
    model.add(Conv2D(input_shape=(13, 13, 8), kernel_size=(3, 3), depth=16))
    model.add(Activation('relu'))
    
    # MaxPooling layer: pool_size=(2,2)
    model.add(MaxPool2D(pool_size=(2, 2)))
    
    # Flatten layer
    model.add(Flatten())
    
    # Dense layer: 128 neurons
    model.add(Dense(16 * 5 * 5, 128, activation='relu'))
    
    # Output layer: 1 neuron (binary classification)
    model.add(Dense(128, 1, activation='sigmoid'))
    
    return model

# Test with dummy data
if __name__ == "__main__":
    # Generate dummy data
    X, y = generate_dummy_data(n_samples=100)
    
    # Create model
    model = create_simple_cnn()
    
    # Train model
    model.train(X, y, epochs=50, batch_size=10)
    
    # Make predictions
    predictions = model.predict(X[:5])
    print("Predictions:")
    print(predictions)
    print("Actual:")
    print(y[:5])

Epoch 1/50, Loss: 0.2587
Epoch 11/50, Loss: 0.2111
Epoch 11/50, Loss: 0.2111
Epoch 21/50, Loss: 0.1373
Epoch 21/50, Loss: 0.1373
Epoch 31/50, Loss: 0.0936
Epoch 31/50, Loss: 0.0936
Epoch 41/50, Loss: 0.0889
Epoch 41/50, Loss: 0.0889
Epoch 50/50, Loss: 0.0505
Predictions:
[[0.95822283]
 [0.15689718]
 [0.88956841]
 [0.05826617]
 [0.85981537]]
Actual:
[[1]
 [0]
 [1]
 [0]
 [1]]
Epoch 50/50, Loss: 0.0505
Predictions:
[[0.95822283]
 [0.15689718]
 [0.88956841]
 [0.05826617]
 [0.85981537]]
Actual:
[[1]
 [0]
 [1]
 [0]
 [1]]
