In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/mnist-in-csv/mnist_test.csv
/kaggle/input/mnist-in-csv/mnist_train.csv


In [6]:
import numpy as np
from tqdm import tqdm
import pandas as pd
import random
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import time
import os
from scipy.signal import correlate2d
from skimage.measure import block_reduce


In [13]:

class CNN:
    def __init__(self, input_size=(28, 28), num_filters=8, filter_size=3, pool_size=2, num_classes=10, learning_rate=0.01):
        self.input_size = input_size
        self.num_filters = num_filters
        self.filter_size = filter_size
        self.pool_size = pool_size
        self.num_classes = num_classes
        self.lr = learning_rate

        # Initialize filters and weights
        self.filters = np.random.randn(num_filters, filter_size, filter_size) * 0.1
        self.fc_weights = np.random.randn(num_filters * ((input_size[0] - filter_size + 1) // pool_size) * 
                                          ((input_size[1] - filter_size + 1) // pool_size), num_classes) * 0.1
        self.fc_bias = np.zeros((1, num_classes))

    def convolve2D(self, image, kernel):
        return correlate2d(image, kernel, mode='valid')
        # kernel_size = kernel.shape[0]
        # output_size = (image.shape[0] - kernel_size + 1, image.shape[1] - kernel_size + 1)
        # output = np.zeros(output_size)

        # for i in range(output_size[0]):
        #     for j in range(output_size[1]):
        #         region = image[i:i + kernel_size, j:j + kernel_size]
        #         output[i, j] = np.sum(region * kernel)

        # return output

    def max_pooling(self, feature_map, size):
        h, w = feature_map.shape
        output_size = (h // size, w // size)
        output = np.zeros(output_size)
        self.pool_cache = {}

        for i in range(output_size[0]):
            for j in range(output_size[1]):
                region = feature_map[i * size:(i + 1) * size, j * size:(j + 1) * size]
                max_val = np.max(region)
                max_pos = np.unravel_index(np.argmax(region), region.shape)
                output[i, j] = max_val
                self.pool_cache[(i, j)] = (i * size + max_pos[0], j * size + max_pos[1])  # Store relative positions

        return output

    def sigmoid(self, x):
        x = np.clip(x, -500, 500)  # Prevent overflow
        return 1 / (1 + np.exp(-x))

    def sigmoid_derivative(self, x):
        return x * (1 - x)

    def softmax(self, x):
        exps = np.exp(x - np.max(x))
        return exps / np.sum(exps, axis=1, keepdims=True)

    def cross_entropy_loss(self, probs, label):
        probs = np.clip(probs, 1e-10, 1)  # Prevent log(0)
        return -np.log(probs[0, label])  # Corrected label indexing

    def forward(self, image):
        self.input_image = image
        self.feature_maps = np.array([self.convolve2D(image, kernel) for kernel in self.filters])
        self.sigmoid_maps = np.array([self.sigmoid(x) for x in self.feature_maps])
        self.pooled_maps = np.array([self.max_pooling(fm, self.pool_size) for fm in self.sigmoid_maps])

        self.flattened = self.pooled_maps.flatten().reshape(1, -1)
        self.scores = np.dot(self.flattened, self.fc_weights) + self.fc_bias
        self.probs = self.softmax(self.scores)

        return self.probs

    def backward(self, label):
        d_scores = self.probs.copy()  # Don't modify self.probs
        d_scores[0, label] -= 1  # Compute gradient of loss w.r.t. scores

        d_fc_weights = np.dot(self.flattened.T, d_scores)
        d_fc_bias = np.sum(d_scores, axis=0, keepdims=True)
        d_flattened = np.dot(d_scores, self.fc_weights.T)

        d_pooled = d_flattened.reshape(self.pooled_maps.shape)

        # Backprop through pooling
        d_sigmoid_maps = np.zeros_like(self.sigmoid_maps)
        for i in range(self.num_filters):
            for (py, px), (y, x) in self.pool_cache.items():
                d_sigmoid_maps[i, y, x] = d_pooled[i, py, px]  # Use stored pooling indices

        # Backprop through sigmoid
        d_feature_maps = d_sigmoid_maps * self.sigmoid_derivative(self.sigmoid_maps)

        # Backprop through convolution
        d_filters = np.zeros_like(self.filters)
        for i in range(self.num_filters):
            for y in range(self.filter_size):
                for x in range(self.filter_size):
                    region = self.input_image[y:y + d_feature_maps.shape[1], x:x + d_feature_maps.shape[2]]
                    d_filters[i, y, x] = np.sum(region * d_feature_maps[i])

        # Gradient clipping to prevent instability
        d_fc_weights = np.clip(d_fc_weights, -1, 1)
        d_fc_bias = np.clip(d_fc_bias, -1, 1)
        d_filters = np.clip(d_filters, -1, 1)

        # Update weights
        self.fc_weights -= self.lr * d_fc_weights
        self.fc_bias -= self.lr * d_fc_bias
        self.filters -= self.lr * d_filters


    def predict(self, image):
        """ Returns predicted class label """
        output = self.forward(image)
        return np.argmax(output)

    def train(self, dataset, labels, epochs=10, batch_size=32):
        num_samples = dataset.shape[0]
        history = {'loss': [], 'f1_score': [], 'training_time': []}
        
        for epoch in range(epochs):
            start_time = time.time()
            loss = 0
            num_batches = num_samples//batch_size
            all_preds = []
            all_labels = []
            
            with tqdm(total=num_batches, desc=f"Epoch {epoch + 1}/{epochs}") as pbar:
                for i in range(0, num_samples, batch_size):
                    X_batch = dataset[i:i+batch_size]
                    y_batch = labels[i:i+batch_size]
                    batch_loss = 0
            
                    for image, label in zip(X_batch, y_batch):
                        #image, label = dataset[i], labels[i]
                        output = self.forward(image)
                        batch_loss += self.cross_entropy_loss(output, label)
                        self.backward(label)

                        pred_label = np.argmax(output)
                        all_preds.append(pred_label)
                        all_labels.append(label)
                    loss+= batch_loss/len(X_batch)
                    pbar.update(1)

            avg_loss = loss / len(dataset)
            f1 = f1_score(all_labels, all_preds, average="weighted")
            epoch_time = time.time() - start_time

            history['loss'].append(avg_loss)
            history['f1_score'].append(f1)
            history['training_time'].append(epoch_time)

            print(f"Epoch {epoch + 1}/{epochs}, Loss: {avg_loss:.4f}")
        return history


    def evaluate(self, dataset, labels):
        preds = np.array([self.predict(image) for image in dataset])
        labels = np.array(labels)
        
        accuracy = np.mean(preds == labels)
        f1 = f1_score(labels, preds, average="weighted")
        conf_matrix = confusion_matrix(labels, preds)

        print(f"Accuracy: {accuracy * 100:.2f}%")
        print(f"F1 Score: {f1:.4f}")
        print(f"Confusion Matrix:\n{conf_matrix}")

        return {"accuracy": accuracy, "f1_score": f1, "confusion_matrix": conf_matrix}



In [9]:
# Load MNIST dataset (CSV format)
train_data = pd.read_csv('/kaggle/input/mnist-in-csv/mnist_train.csv')
test_data = pd.read_csv('/kaggle/input/mnist-in-csv/mnist_test.csv')


# Split features and labels
X_train = train_data.iloc[0:10000, 1:].values  # Pixels
y_train = train_data.iloc[0:10000, 0].values   # Labels

X_test = test_data.iloc[0:10000, 1:].values
y_test = test_data.iloc[0:10000, 0].values

# Normalize pixel values (0 to 1)
X_train = X_train / 255.0
X_test = X_test / 255.0

# Reshape to (28, 28) for CNN input
X_train = X_train.reshape(-1, 28, 28)
X_test = X_test.reshape(-1, 28, 28)

# Define number of classes
num_classes = 10

# One-hot encode labels
def one_hot_encode(y, num_classes):
    encoded = np.zeros((len(y), num_classes))
    encoded[np.arange(len(y)), y] = 1
    return encoded

y_train_one_hot = one_hot_encode(y_train, num_classes)
y_test_one_hot = one_hot_encode(y_test, num_classes)



In [15]:
cnn = CNN(input_size=(28, 28), num_filters=8, filter_size=3, pool_size=2, num_classes=10, learning_rate=0.01)

# Train CNN on MNIST
history = cnn.train(X_train, y_train, epochs=10)  # Using only 5000 samples for faster training


Epoch 1/10: 313it [03:04,  1.69it/s]                         


Epoch 1/10, Loss: 0.0282


Epoch 2/10: 313it [03:06,  1.68it/s]                         


Epoch 2/10, Loss: 0.0143


Epoch 3/10: 313it [03:05,  1.69it/s]                         


Epoch 3/10, Loss: 0.0132


Epoch 4/10: 313it [03:06,  1.68it/s]                         


Epoch 4/10, Loss: 0.0126


Epoch 5/10: 313it [03:05,  1.69it/s]                         


Epoch 5/10, Loss: 0.0121


Epoch 6/10: 313it [03:06,  1.68it/s]                         


Epoch 6/10, Loss: 0.0118


Epoch 7/10: 313it [03:06,  1.68it/s]                         


Epoch 7/10, Loss: 0.0114


Epoch 8/10: 313it [03:07,  1.67it/s]                         


Epoch 8/10, Loss: 0.0110


Epoch 9/10: 313it [03:07,  1.67it/s]                         


Epoch 9/10, Loss: 0.0106


Epoch 10/10: 313it [03:06,  1.68it/s]                         

Epoch 10/10, Loss: 0.0101





In [16]:

performance = cnn.evaluate(X_test, y_test)


Accuracy: 90.52%
F1 Score: 0.9056
Confusion Matrix:
[[ 955    0   10    1    0    5    3    1    5    0]
 [   0 1098   15    3    0    1    2    0   16    0]
 [   4    5  975    5    6    1    3    7   22    4]
 [   3    0   60  898    2   14    0    6   21    6]
 [   2    7   18    1  901    1    9    0   16   27]
 [  11    6   17   45    4  758    5    3   39    4]
 [  11    3   52    1   33   17  834    1    6    0]
 [   1   20   64   11   12    2    0  888    6   24]
 [   7    8   26   18   10   16    1    2  878    8]
 [   9   11    7   14   61    6    0   17   17  867]]


In [17]:
print("History:\n", history)

History:
 {'loss': [0.028240715566814886, 0.014333379747239642, 0.013206571228700928, 0.012591525695632561, 0.012141445638313389, 0.011757770604437778, 0.011388234525467377, 0.010994051605974588, 0.010550400096438888, 0.010050936174886463], 'f1_score': [0.713698450703459, 0.8599361914492653, 0.8748145480959454, 0.8810671610034746, 0.8863219831635631, 0.8898598391662282, 0.8945812403945363, 0.8986032019612461, 0.9019000755081572, 0.9061251273591663], 'training_time': [184.7057752609253, 186.10485792160034, 185.55939626693726, 186.48927998542786, 185.32050371170044, 186.5687997341156, 186.1742067337036, 187.11037826538086, 187.37517499923706, 186.08647418022156]}


In [18]:
total = 0

for hist in history['training_time']:
    total+= hist

print(total)

1861.4948470592499
