In [219]:
import numpy as np
import pandas as pd
from neural_network import NeuralNetwork
from dense_layer import DenseLayer
from conv_layer import Convolutional2D
from reshape import Reshape
from maxpooling import MaxPooling2D
from activations import *

In [248]:
data = pd.read_csv("mnist_train.csv")
data = np.array(data)
m, n = data.shape

data_dev = data[0:1000].T
Y_dev = data_dev[0]
X_dev = data_dev[1:n]
X_dev = X_dev / 255.
X_dev = X_dev.T

data_train = data[1000:7000].T
Y_train = data_train[0]
X_train = data_train[1:n]
X_train = X_train / 255.
X_train = X_train.T
_,m_train = X_train.shape

In [249]:
X_train_reshaped = X_train.reshape(-1, 28, 28)

# Step 2: Add padding to convert 28x28 to 32x32
X_train_padded = np.pad(X_train_reshaped, ((0, 0), (2, 2), (2, 2)), 'constant')

# Step 3 (optional): Flatten the padded images back to 1D (if needed)
X_train = X_train_padded.reshape(-1, 32*32)

In [250]:
X_dev_reshaped = X_dev.reshape(-1, 28, 28)

# Step 2: Add padding to convert 28x28 to 32x32
X_dev_padded = np.pad(X_dev_reshaped, ((0, 0), (2, 2), (2, 2)), 'constant')

# Step 3 (optional): Flatten the padded images back to 1D (if needed)
X_dev = X_dev_padded.reshape(-1, 32*32)

In [251]:
import numpy as np
from scipy import signal

class Convolutional2D:
    def __init__(self, kernel_size, num_kernels, padding=0, stride=1, kernels=None):
        self.input_shape = None
        self.stride = stride
        self.padding = padding
        self.kernel_size = kernel_size
        self.num_kernels = num_kernels
        self.biases = np.random.randn(num_kernels)
        self.kernels = kernels
    
    def _init_params(self, input_shape):
        self.input_shape = input_shape[1:]
        input_height, input_width, input_channels = self.input_shape[0], self.input_shape[1], self.input_shape[2]
        self.output_height = (input_height + 2 * self.padding - self.kernel_size) // self.stride + 1
        self.output_width = (input_width + 2 * self.padding - self.kernel_size) // self.stride + 1
        
        self.kernels_shape = (self.kernel_size, self.kernel_size, input_channels, self.num_kernels)
        if self.kernels is None:
            self.kernels = np.random.randn(self.kernel_size, self.kernel_size, input_channels, self.num_kernels) * 0.01

    def _pad_input(self, input):
        if self.padding > 0:
            padded_input = np.pad(
                input,
                ((0, 0),
                 (self.padding, self.padding),
                 (self.padding, self.padding),
                 (0, 0)),
                mode='constant'
            )
            return padded_input
        return input

    def forward(self, input):
        if self.input_shape is None:
            self._init_params(input.shape)
        self.batch_size = input.shape[0]

        self.input = self._pad_input(input)
        self.output = np.zeros((self.batch_size, self.output_height, self.output_width, self.num_kernels))
        
        for i in range(self.batch_size):
            for k in range(self.num_kernels):
                conv_result = np.sum([
                    signal.correlate2d(self.input[i, :, :, c], self.kernels[:, :, c, k], mode='valid')
                    for c in range(self.input.shape[3])
                ], axis=0)
                self.output[i, :, :, k] = conv_result[::self.stride, ::self.stride] + self.biases[k]
        
        return self.output

    def backward(self, output_gradient, learning_rate):
        kernels_gradient = np.zeros_like(self.kernels)
        input_gradient = np.zeros_like(self.input)

        for i in range(self.batch_size):
            for k in range(self.num_kernels):
                upsampled_gradient = np.zeros((
                    (output_gradient.shape[1] - 1) * self.stride + 1,
                    (output_gradient.shape[2] - 1) * self.stride + 1
                ))
                upsampled_gradient[::self.stride, ::self.stride] = output_gradient[i, :, :, k]
                
                for c in range(self.input.shape[3]):
                    kernels_gradient[:, :, c, k] += signal.correlate2d(
                        self.input[i, :, :, c], upsampled_gradient, mode='valid'
                    )
                    input_gradient[i, :, :, c] += signal.convolve2d(
                        upsampled_gradient, self.kernels[:, :, c, k], mode='full'
                    )
        
        self.kernels -= learning_rate * kernels_gradient / self.batch_size
        self.biases -= learning_rate * np.sum(output_gradient, axis=(0, 1, 2)) / self.batch_size
        
        return input_gradient

In [262]:
import numpy as np
# from loss_function import LossFunction

# ANN Class
class NeuralNetwork:
    def __init__(self):
        self.layers = []
        self.loss_functions = {
            'mse': (LossFunction.mse, LossFunction.mse_derivative),
            'log_loss': (LossFunction.log_loss, LossFunction.log_loss_derivative)
        }
    
    def add_layer(self, layer):
        self.layers.append(layer)
    
    def compile(self, loss):
        self.loss = loss
        if loss not in self.loss_functions:
            raise ValueError("Loss function not supported.")
        self.loss_function, self.loss_derivative = self.loss_functions[loss]
    
    def forward(self, input):
        output = input
        for layer in self.layers:
            output = layer.forward(output)
        return output
    
    def backward(self, output):
        for layer in reversed(self.layers):
            output = layer.backward(output, self.learning_rate)
    
    def train(self, X, Y, epochs = 10000, learning_rate = 0.1, batch_size = 50, softmax_logloss=False, isOne_hot=False):
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        n_batches = int(np.ceil(X.shape[0] / batch_size))
        y = Y.copy()
        if isOne_hot:
            Y = one_hot(Y)

        for epoch in range(epochs):
            total_error = 0
            for batch_idx in range(n_batches):
                start = batch_idx * batch_size
                end = min(start + batch_size, X.shape[0])
                
                X_batch = X[start:end]
                y_batch = Y[start:end]

                # Forward pass for the entire batch
                output = self.forward(X_batch)
                
                # Calculate batch loss and gradients
                error = self.loss_function(y_batch, output)
                if softmax_logloss:
                    grad = (output - y_batch) / batch_size
                else:
                    grad = self.loss_derivative(y_batch, output)
                
                # Backward pass
                self.backward(grad)
                
                total_error += error
            total_error /= n_batches
            output = self.forward(X)
            pred = get_predictions(output)
            print(f'Epoch {epoch}, Loss: {total_error}')
            if (one_hot):
                print(f'Epoch {epoch}, accuracy: {get_accuracy(pred, y)}')
            # if (epoch % 10 == 0):
            #     output = self.forward(X)
            #     pred = get_predictions(output)
            #     print(f'Epoch {epoch}, Loss: {total_error}')
            #     if (one_hot):
            #         print(f'Epoch {epoch}, accuracy: {get_accuracy(pred, y)}')
    
    def predict(self, X):
        return self.forward(X)

def print_parameters(layer, num_elements=5):
    print(f"First {num_elements} weights: {layer.weights.flatten()[:num_elements]}")
    print(f"First {num_elements} biases: {layer.biases.flatten()[:num_elements]}")

def get_predictions(A2):
    return np.argmax(A2, axis=1)

def get_accuracy(predictions, Y):
    print(predictions[:10])
    print(Y[:10])
    return np.sum(predictions == Y) / Y.size

def one_hot(Y):
    one_hot_Y = np.zeros((Y.size, Y.max() + 1))
    one_hot_Y[np.arange(Y.size), Y] = 1
    return one_hot_Y

import numpy as np

class LossFunction:
    @staticmethod
    def mse(y_true, y_pred):
        return np.mean((y_true - y_pred) ** 2)
    
    @staticmethod
    def mse_derivative(y_true, y_pred):
        return 2 * (y_pred - y_true) / y_true.size
    
    @staticmethod
    def log_loss(y_true, y_pred, epsilon=1e-15):
        # Clip the predictions to avoid log(0)
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
        return np.mean(-y_true * np.log(y_pred) - (1 - y_true) * np.log(1 - y_pred))
    
    @staticmethod
    def log_loss_derivative(y_true, y_pred, epsilon=1e-15):
        # Clip the predictions to avoid division by 0
        y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
        return ((1 - y_true) / (1 - y_pred) - y_true / y_pred) / np.size(y_true)

In [263]:
import numpy as np

class Reshape:
    def __init__(self, output_shape):
        self.input_shape = None
        self.output_shape = output_shape

    def forward(self, input):
        if self.input_shape is None:
            self.input_shape = input.shape[1:]
        self.batch_size = input.shape[0]
        if type(self.output_shape) is int:
            self.output_shape = (self.output_shape,)
        return np.reshape(input, (self.batch_size, *self.output_shape))

    def backward(self, output_gradient, learning_rate):
        return np.reshape(output_gradient, (self.batch_size, *self.input_shape))

In [264]:
import numpy as np

class MaxPooling2D:
    def __init__(self, pool_size=(2, 2), stride=(2, 2)):
        self.pool_size = pool_size
        self.stride = stride

    def forward(self, input):
        self.batch_size, self.input_height, self.input_width, self.input_channels = input.shape
        self.input = input

        self.output_height = (self.input_height - self.pool_size[0]) // self.stride[0] + 1
        self.output_width = (self.input_width - self.pool_size[1]) // self.stride[1] + 1

        # Initialize output tensor
        output = np.zeros((self.batch_size, self.output_height, self.output_width, self.input_channels))

        for i in range(self.output_height):
            for j in range(self.output_width):
                h_start = i * self.stride[0]
                h_end = h_start + self.pool_size[0]
                w_start = j * self.stride[1]
                w_end = w_start + self.pool_size[1]

                region = self.input[:, h_start:h_end, w_start:w_end, :]
                output[:, i, j, :] = np.max(region, axis=(1, 2))
        
        return output

    def backward(self, output_gradient, learning_rate):
        input_gradient = np.zeros_like(self.input)

        for i in range(self.output_height):
            for j in range(self.output_width):
                h_start = i * self.stride[0]
                h_end = h_start + self.pool_size[0]
                w_start = j * self.stride[1]
                w_end = w_start + self.pool_size[1]

                region = self.input[:, h_start:h_end, w_start:w_end, :]
                max_mask = (region == np.max(region, axis=(1, 2), keepdims=True))
                
                input_gradient[:, h_start:h_end, w_start:w_end, :] += max_mask * output_gradient[:, i, j, :][:, None, None, :]
        
        return input_gradient


In [265]:
# import tensorflow as tf
# from keras import layers, models, optimizers

# print(X_train.shape)
# X_train_reshaped = np.reshape(X_train, (X_train.shape[0], 32, 32, 1))
# X_test_reshaped = np.reshape(X_dev, (X_dev.shape[0], 32, 32, 1))

# # Build the LeNet-5 model
# model = models.Sequential([
#     layers.InputLayer(input_shape=(32, 32, 1)),
    
#     # C1 Convolutional Layer
#     layers.Conv2D(filters=6, kernel_size=(5, 5), activation='tanh', padding='same'),
    
#     # S2 Subsampling Layer (Average Pooling)
#     layers.MaxPooling2D(pool_size=(2, 2), strides=2),
    
#     # C3 Convolutional Layer
#     layers.Conv2D(filters=16, kernel_size=(5, 5), activation='tanh'),
    
#     # S4 Subsampling Layer (Average Pooling)
#     layers.MaxPooling2D(pool_size=(2, 2), strides=2),
    
#     # C5 Fully Connected Convolutional Layer
#     layers.Conv2D(filters=120, kernel_size=(5, 5), activation='tanh'),
    
#     # Flatten the output for the fully connected layers
#     layers.Flatten(),
    
#     # F6 Fully Connected Layer
#     layers.Dense(units=84, activation='tanh'),
    
#     # Output Layer
#     layers.Dense(units=10, activation='softmax')
# ])

# # Compile the model
# model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# # Summary of the model
# history = model.fit(X_train_reshaped, Y_train, epochs=10, batch_size=32, validation_data=(X_test_reshaped, Y_dev))

In [266]:
y_pred = model.predict(X_test_reshaped)
y_pred = get_predictions(y_pred)
print(get_accuracy(y_pred, Y_dev))

[1m32/32[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step
[3 0 4 1 9 2 1 3 1 4]
[5 0 4 1 9 2 1 3 1 4]
0.904


In [268]:
# ann = NeuralNetwork()
# ann.add_layer(DenseLayer(10, init="Xavier"))
# ann.add_layer(Tanh())
# ann.add_layer(DenseLayer(10, init="Xavier"))
# ann.add_layer(Softmax(softmax_logloss=True))
# ann.compile(loss="log_loss")
# ann.train(X_train, Y_train, epochs=500, learning_rate=1, batch_size=50, softmax_logloss=True, isOne_hot=True)

In [269]:
ann = NeuralNetwork()
ann.add_layer(Reshape((32, 32, 1)))
ann.add_layer(Convolutional2D(5, 5))
ann.add_layer(Tanh())
ann.add_layer(MaxPooling2D(pool_size=(2,2), stride=(2,2)))
ann.add_layer(Convolutional2D(5, 16))
ann.add_layer(Tanh())
ann.add_layer(MaxPooling2D(pool_size=(2,2), stride=(2,2)))
ann.add_layer(Convolutional2D(5, 120))
ann.add_layer(Tanh())
ann.add_layer(Reshape(120))
ann.add_layer(DenseLayer(84, init="Xavier"))
ann.add_layer(Tanh())
ann.add_layer(DenseLayer(10, init="Xavier"))
ann.add_layer(Softmax(softmax_logloss=True))
ann.compile(loss="log_loss")
ann.train(X_train, Y_train, epochs=10, learning_rate=0.1, batch_size=32, softmax_logloss=True, isOne_hot=True)

Epoch 0, Loss: 0.32583533809614645
[1 1 1 1 1 1 1 1 1 1]
[0 7 1 1 4 9 4 3 4 8]
Epoch 0, accuracy: 0.11133333333333334


KeyboardInterrupt: 