#Model being implemented: CNN (on the raw extracted dataset)

In [11]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from scipy.signal import correlate2d

In [4]:
print("Implementing CNN from scratch using NumPy (CPU-only)")

# Load dataset
X_features = np.load("X_features.npy")
y_labels = np.load("y_labels.npy")

Implementing CNN from scratch using NumPy (CPU-only)


#Data Preprocessing

In [9]:
# Data preprocessing
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_features)

# Reshape to 2D "images"
height = int(np.sqrt(X_scaled.shape[1]))
width = int(np.ceil(X_scaled.shape[1] / height))
padding = height * width - X_scaled.shape[1]
X_padded = np.pad(X_scaled, ((0, 0), (0, padding)), mode='constant')
X_reshaped = X_padded.reshape(-1, 1, height, width)  # (N, C, H, W)

# Split into train (60%), validation (20%), and test (20%)
X_train, X_temp, y_train, y_temp = train_test_split(X_reshaped, y_labels, test_size=0.4, random_state=42, stratify=y_labels)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

In [20]:
class ScratchCNN:
    def __init__(self, input_shape, num_classes):
        self.params = {}
        # Conv Layer 1: 3x3 kernel, 8 filters
        self.params['W1'] = np.random.randn(8, 1, 3, 3) * 0.1
        self.params['b1'] = np.zeros(8)
        # Conv Layer 2: 3x3 kernel, 16 filters
        self.params['W2'] = np.random.randn(16, 8, 3, 3) * 0.1
        self.params['b2'] = np.zeros(16)
        # FC Layer
        flattened_size = 16 * (height//4) * (width//4)
        self.params['W3'] = np.random.randn(flattened_size, num_classes) * 0.1
        self.params['b3'] = np.zeros(num_classes)

    def relu(self, x):
        return np.maximum(0, x)

    def softmax(self, x):
        exps = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exps / np.sum(exps, axis=1, keepdims=True)

    def _conv2d(self, x, W, b, stride=1, pad=1):
        N, C, H, W_in = x.shape
        F, _, HH, WW = W.shape

        x_pad = np.pad(x, ((0,0), (0,0), (pad,pad), (pad,pad)), mode='constant')
        H_out = (H + 2*pad - HH) // stride + 1
        W_out = (W_in + 2*pad - WW) // stride + 1

        out = np.zeros((N, F, H_out, W_out))

        for f in range(F):
            for i in range(H_out):
                for j in range(W_out):
                    ii, jj = i*stride, j*stride
                    out[:, f, i, j] = np.sum(
                        x_pad[:, :, ii:ii+HH, jj:jj+WW] * W[f],
                        axis=(1, 2, 3))
            out[:, f] += b[f]
        return out

    def _maxpool2d(self, x, pool_size=2, stride=2):
        N, C, H, W = x.shape
        H_out = (H - pool_size) // stride + 1
        W_out = (W - pool_size) // stride + 1

        out = np.zeros((N, C, H_out, W_out))

        for i in range(H_out):
            for j in range(W_out):
                ii, jj = i*stride, j*stride
                out[:, :, i, j] = np.max(
                    x[:, :, ii:ii+pool_size, jj:jj+pool_size],
                    axis=(2, 3))
        return out

    def forward(self, x):
        self.cache = {}
        self.cache['Z1'] = self._conv2d(x, self.params['W1'], self.params['b1'])
        self.cache['A1'] = self.relu(self.cache['Z1'])
        self.cache['P1'] = self._maxpool2d(self.cache['A1'])

        self.cache['Z2'] = self._conv2d(self.cache['P1'], self.params['W2'], self.params['b2'])
        self.cache['A2'] = self.relu(self.cache['Z2'])
        self.cache['P2'] = self._maxpool2d(self.cache['A2'])

        self.cache['F'] = self.cache['P2'].reshape(self.cache['P2'].shape[0], -1)
        self.cache['Z3'] = np.dot(self.cache['F'], self.params['W3']) + self.params['b3']
        self.cache['A3'] = self.softmax(self.cache['Z3'])
        return self.cache['A3']

    def compute_loss(self, outputs, y_true):
        correct_probs = outputs[np.arange(len(y_true)), y_true]
        return -np.mean(np.log(correct_probs + 1e-10))

    def backward(self, x, y_true, lr=0.001):
        m = len(y_true)
        grads = {key: np.zeros_like(val) for key, val in self.params.items()}

        # Output layer gradient
        dZ3 = self.cache['A3'].copy()
        dZ3[np.arange(m), y_true] -= 1
        dZ3 /= m

        # FC layer gradients
        grads['W3'] = np.dot(self.cache['F'].T, dZ3)
        grads['b3'] = np.sum(dZ3, axis=0)

        dF = np.dot(dZ3, self.params['W3'].T)
        dP2 = dF.reshape(self.cache['P2'].shape)

        # MaxPool2 backward
        dA2 = np.zeros_like(self.cache['A2'])
        for n in range(dP2.shape[0]):
            for c in range(dP2.shape[1]):
                for i in range(dP2.shape[2]):
                    for j in range(dP2.shape[3]):
                        ii, jj = i * 2, j * 2
                        window = self.cache['A2'][n, c, ii:ii+2, jj:jj+2]
                        mask = (window == np.max(window))
                        dA2[n, c, ii:ii+2, jj:jj+2] += mask * dP2[n, c, i, j]

        # Conv2 backward
        dZ2 = dA2 * (self.cache['Z2'] > 0)
        P1_padded = np.pad(self.cache['P1'], ((0,0),(0,0),(1,1),(1,1)), mode='constant')

        for f in range(self.params['W2'].shape[0]):
            for c in range(self.params['W2'].shape[1]):
                grad = np.zeros_like(self.params['W2'][f,c])
                for i in range(dZ2.shape[2]):
                    for j in range(dZ2.shape[3]):
                        input_patch = P1_padded[:, c, i:i+3, j:j+3]
                        grad_contrib = dZ2[:, f, i, j][:, np.newaxis, np.newaxis]
                        grad += np.sum(input_patch * grad_contrib, axis=0)
                grads['W2'][f,c] = grad / m
            grads['b2'][f] = np.sum(dZ2[:, f]) / m

        # dP1 calculation (Conv2 backward to Conv1 input)
        dP1 = np.zeros_like(self.cache['P1'])
        dZ2_padded = np.pad(dZ2, ((0,0), (0,0), (1,1), (1,1)), mode='constant')
        for n in range(x.shape[0]):
            for f in range(self.params['W2'].shape[0]):
                for c in range(self.params['W2'].shape[1]):
                    flipped_kernel = np.rot90(self.params['W2'][f, c], 2)
                    for i in range(dZ2.shape[2]):
                        for j in range(dZ2.shape[3]):
                            ii, jj = i, j
                            if ii + 3 <= dP1.shape[2] and jj + 3 <= dP1.shape[3]:
                                dP1[n, c, ii:ii+3, jj:jj+3] += dZ2[n, f, i, j] * flipped_kernel


        # MaxPool1 backward
        dA1 = np.zeros_like(self.cache['A1'])
        for n in range(dP1.shape[0]):
            for c in range(dP1.shape[1]):
                for i in range(dP1.shape[2]):
                    for j in range(dP1.shape[3]):
                        ii, jj = i * 2, j * 2
                        window = self.cache['A1'][n, c, ii:ii+2, jj:jj+2]
                        mask = (window == np.max(window))
                        dA1[n, c, ii:ii+2, jj:jj+2] += mask * dP1[n, c, i, j]

        # Conv1 backward
        dZ1 = dA1 * (self.cache['Z1'] > 0)
        x_pad = np.pad(x, ((0,0), (0,0), (1,1), (1,1)), mode='constant')

        for f in range(self.params['W1'].shape[0]):
            for c in range(self.params['W1'].shape[1]):
                grad = np.zeros_like(self.params['W1'][f,c])
                for i in range(dZ1.shape[2]):
                    for j in range(dZ1.shape[3]):
                        input_patch = x_pad[:, c, i:i+3, j:j+3]
                        grad_contrib = dZ1[:, f, i, j][:, np.newaxis, np.newaxis]
                        grad += np.sum(input_patch * grad_contrib, axis=0)
                grads['W1'][f,c] = grad / m
            grads['b1'][f] = np.sum(dZ1[:, f]) / m

        # Gradient Clipping (to avoid explosion)
        for param in self.params:
            grads[param] = np.clip(grads[param], -1, 1)
            self.params[param] -= lr * grads[param]


In [None]:
# Initialize and train
num_classes = len(np.unique(y_labels))
cnn = ScratchCNN(input_shape=(1, height, width), num_classes=num_classes)

# Training loop with accuracy tracking
epochs = 10
batch_size = 32
train_losses = []
val_accuracies = []
train_accuracies = []

for epoch in range(epochs):
    # Training
    epoch_loss = 0
    train_preds = []

    for i in range(0, len(X_train), batch_size):
        batch_X = X_train[i:i+batch_size]
        batch_y = y_train[i:i+batch_size]

        # Forward pass
        outputs = cnn.forward(batch_X)
        loss = cnn.compute_loss(outputs, batch_y)
        epoch_loss += loss

        # Backward pass
        cnn.backward(batch_X, batch_y, lr=0.001)

        # Collect training predictions
        train_preds.extend(np.argmax(outputs, axis=1))

    # Calculate metrics
    avg_loss = epoch_loss / (len(X_train) // batch_size)
    train_losses.append(avg_loss)

    train_acc = accuracy_score(y_train, train_preds)
    train_accuracies.append(train_acc)

    # Validation evaluation
    val_outputs = cnn.forward(X_val)
    val_preds = np.argmax(val_outputs, axis=1)
    val_acc = accuracy_score(y_val, val_preds)
    val_accuracies.append(val_acc)

    print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")

In [None]:
# Final evaluation on all datasets
def evaluate_model(X, y, name):
    outputs = cnn.forward(X)
    preds = np.argmax(outputs, axis=1)
    acc = accuracy_score(y, preds)
    print(f"{name} Accuracy: {acc:.4f}")
    return preds, acc

In [None]:
# Evaluate on all sets
print("\nFinal Evaluation Results:")
val_preds, val_acc = evaluate_model(X_val, y_val, "Validation")
test_preds, test_acc = evaluate_model(X_test, y_test, "Test")
train_preds, train_acc = evaluate_model(X_train[:2000], y_train[:2000], "Train (subset)")  # Smaller subset for faster evaluation

# Confusion matrix
from sklearn.metrics import confusion_matrix, classification_report
print("\nValidation Set Classification Report:")
print(classification_report(y_val, val_preds))

In [None]:
# Loss plot
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss')
plt.legend()

# Accuracy plot
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(val_accuracies, label='Validation Accuracy')
plt.axhline(y=test_acc, color='r', linestyle='--', label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Model Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Performance comparison plot
plt.figure(figsize=(10, 5))
plt.bar(['Train', 'Validation', 'Test'], [train_acc, val_acc, test_acc])
plt.ylim(0, 1)
plt.ylabel('Accuracy')
plt.title('Model Performance Comparison')
for i, v in enumerate([train_acc, val_acc, test_acc]):
    plt.text(i, v + 0.02, f"{v:.4f}", ha='center')
plt.show()