In [17]:
import pickle
import csv
import numpy as np
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

In [18]:
class ReLU:
    def forward(self, z):
        self.output = np.maximum(0, z)
        return self.output
    def backward(self, grad):
        return grad * (self.output > 0)

In [19]:
class Softmax:
    def forward(self, x):
        exps = np.exp(x - np.max(x, axis=1, keepdims=True)) # naile overflow hoi
        self.output = exps / np.sum(exps, axis=1, keepdims=True)
        return self.output
    def backward(self, grad):	
        return grad

Parameters for Batch Normalization

In [20]:
class Batch_params:
    def __init__(self, momentum=0.9, epsilon=1e-5):
        self.momentum = momentum
        self.epsilon = epsilon
        self.gamma = None
        self.beta = None

    def build(self, units):
        self.gamma = np.ones((1, units))
        self.beta = np.zeros((1, units))

    def update_params(self, learning_rate, grad_gamma, grad_beta):
        
        self.gamma -= learning_rate * grad_gamma
        self.beta -= learning_rate * grad_beta

Batch Normalization Statistics

In [21]:
class Batch_Stats:
    def __init__(self):
        self.batch_mean = 0
        self.batch_var =0
    def update_training(self, x):
        self.batch_mean = np.mean(x, axis=0, keepdims=True)
        self.batch_var = np.var(x, axis=0, keepdims=True)
	

In [22]:
class Batch_running:
    def __init__(self):
        self.running_mean = None
        self.running_var = None

    def build(self, units):
        # running mean and var
        self.running_mean = np.zeros((1, units))
        self.running_var = np.ones((1, units))

    def update_training(self, params, batch_stats):
        # Updating the running mean and var here 
        self.running_mean = params.momentum * self.running_mean + (1 - params.momentum) * batch_stats.batch_mean
        self.running_var = params.momentum * self.running_var + (1 - params.momentum) * batch_stats.batch_var


In [23]:
class BatchNormalization:
    def __init__(self, momentum=0.9, epsilon=1e-5):
        self.params = Batch_params(momentum, epsilon)
        self.running_stats = Batch_running()
        self.batch_stats = Batch_Stats()

    def build(self, units):
        self.params.build(units)
        self.running_stats.build(units)

    def forward(self, x, training=True):
        if training:
            self.batch_stats.update_training(x)
            self.running_stats.update_training(self.params, self.batch_stats)
            # Normalizing 
            self.x_centered = x - self.batch_stats.batch_mean
            self.std_inv = 1.0 / np.sqrt(self.batch_stats.batch_var + self.params.epsilon)
            self.x_normalized = self.x_centered * self.std_inv
        else:
            # for inference
            self.x_centered = x - self.running_stats.running_mean
            self.std_inv = 1.0 / np.sqrt(self.running_stats.running_var + self.params.epsilon)
            self.x_normalized = self.x_centered * self.std_inv

        # Scaling , shifting here
        self.out = self.params.gamma * self.x_normalized + self.params.beta
        return self.out


    def backward(self, grad_output):
        batch_size = grad_output.shape[0]
        self.update_gradients(grad_output)

        # Gradient w.r.t. normalized input
        grad_x_normalized = grad_output * self.params.gamma
        grad_var = np.sum(grad_x_normalized * self.x_centered * -0.5 * self.std_inv**3, axis=0, keepdims=True)

        # Gradient w.r.t. mean
        grad_mean = np.sum(grad_x_normalized * -self.std_inv, axis=0, keepdims=True) + grad_var * np.sum(-2 * self.x_centered, axis=0, keepdims=True) / batch_size
        grad_input = grad_x_normalized * self.std_inv + grad_var * 2 * self.x_centered / batch_size + grad_mean / batch_size

        return grad_input
    
    def update_gradients(self, grad_output):
        self.grad_gamma = np.sum(grad_output * self.x_normalized, axis=0, keepdims=True)
        self.grad_beta = np.sum(grad_output, axis=0, keepdims=True)
    

    def update_weights(self, learning_rate):
        self.params.update_params(learning_rate, self.grad_gamma, self.grad_beta)


In [24]:
class Node:
    def __init__(self, input_dim, node_num, learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-8):
        # individual node weights and bias
        self.weights = np.random.randn(input_dim) * 0.01
        self.bias = 0
        self.node_num = node_num  # reference

        self.m_w = np.zeros_like(self.weights)  # First moment for weights
        self.v_w = np.zeros_like(self.weights)  # Second moment for weights
        self.m_b = 0                            # First moment for bias
        self.v_b = 0                            # Second moment for bias

        self.learning_rate = learning_rate
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.epsilon = epsilon
        self.t = 0  
        

    def update(self, all_weights, all_biases):
        self.weights = all_weights[:, self.node_num]
        self.bias = all_biases[0, self.node_num]

    
    def update_adam(self, grad_weights, grad_bias, t):
        self.t = t
        
        # Updating first and second moment estimates for weights
        self.m_w = self.beta_1 * self.m_w + (1 - self.beta_1) * grad_weights
        self.v_w = self.beta_2 * self.v_w + (1 - self.beta_2) * np.square(grad_weights)
        
        # Updating first and second moment estimates for bias
        self.m_b = self.beta_1 * self.m_b + (1 - self.beta_1) * grad_bias
        self.v_b = self.beta_2 * self.v_b + (1 - self.beta_2) * np.square(grad_bias)

        # Bias correction step -----------------------------------------------
        m_hat_w = self.m_w / (1 - self.beta_1 ** self.t)
        v_hat_w = self.v_w / (1 - self.beta_2 ** self.t)
        m_hat_b = self.m_b / (1 - self.beta_1 ** self.t)
        v_hat_b = self.v_b / (1 - self.beta_2 ** self.t)

        # using Adam Optimizer
        self.weights -= self.learning_rate * m_hat_w / (np.sqrt(v_hat_w) + self.epsilon)
        self.bias -= self.learning_rate * m_hat_b / (np.sqrt(v_hat_b) + self.epsilon)





class Dense:
    def __init__(self, units, activation=None, batch_normalization=True, dropout_rate=0.0, learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-8):
        self.units = units
        self.activation = activation
        self.input_dim = None  # the first forward pass e dekhboni
        if batch_normalization:
            self.batch_norm = BatchNormalization()
        else:
            self.batch_norm = None


        self.learning_rate = learning_rate
        self.beta_1 = beta_1
        self.beta_2 = beta_2
        self.epsilon = epsilon
        self.t = 0 


        self.dropout_rate = dropout_rate
        self.dropout_mask = None
        self.nodes=[]

    def build(self, input_dim):
        self.input_dim = input_dim
        np.random.seed(296)
        self.nodes = [Node(input_dim, i, self.learning_rate, self.beta_1, self.beta_2, self.epsilon) for i in range(self.units)]

        
        if self.activation == 'relu':
            self.activation_fn = ReLU()
        elif self.activation == 'softmax':
            self.activation_fn = Softmax()
        else:
            self.activation_fn = None

        if self.batch_norm is not None:
            self.batch_norm.build(self.units) # BATCH NORMALIZATION HERE ----------

        # print(self.weights.shape, self.bias.shape)

    def forward(self, x, training=True):
        if self.input_dim is None:
            self.build(x.shape[1])  # first pass e dekhboni
        self.input = x

        assembled_weights = np.array([node.weights for node in self.nodes]).T  # Shape (input_dim, units)
        assembled_bias = np.array([node.bias for node in self.nodes]).reshape(1, -1)
        self.output = np.dot(x, assembled_weights) + assembled_bias

        if self.batch_norm is not None:
            self.output = self.batch_norm.forward(self.output, training=training)

        if training and self.dropout_rate > 0:
            np.random.seed(246)
            self.dropout_mask = (np.random.rand(*self.output.shape) > self.dropout_rate).astype(np.float32)
            self.output = self.output * self.dropout_mask / (1 - self.dropout_rate)
        
        if self.activation_fn:
            self.output = self.activation_fn.forward(self.output)
        return self.output

    def backward(self, grad):
        if self.activation_fn:
            grad = self.activation_fn.backward(grad)

        # forward pass a dropout thakle oita handle korbo
        if self.dropout_rate > 0:
            grad = grad * self.dropout_mask / (1 - self.dropout_rate)
        
        if self.batch_norm is not None:
            grad = self.batch_norm.backward(grad)

        assembled_weights = np.array([node.weights for node in self.nodes]).T
        assembled_bias = np.array([node.bias for node in self.nodes]).reshape(1, -1)


        #  gradients of weights and bias
        self.grad_weights = np.dot(self.input.T, grad) / self.input.shape[0]
        self.grad_bias = np.sum(grad, axis=0, keepdims=True) / self.input.shape[0]
        grad_input = np.dot(grad, assembled_weights.T)
        
        return grad_input

    def update_weights(self, learning_rate = 0.005):

        self.t += 1
        # Updating weights and biases using Adam optimizer in nodes
        for i, node in enumerate(self.nodes):
            node.update_adam(self.grad_weights[:, i], self.grad_bias[:, i], self.t)


        if self.batch_norm:
            self.batch_norm.update_weights(self.learning_rate)

For Simple Update (Not ADAM)

In [25]:
# assembled_weights -= self.learning_rate * self.grad_weights
# assembled_bias = assembled_bias.astype(np.float64)
# assembled_bias -= self.learning_rate * self.grad_bias

# After updating weights and biases, update nodes with the new values
# for i, node in enumerate(self.nodes):
#     node.update(assembled_weights, assembled_bias)


In [26]:
# Loss function (Cross-Entropy Loss)
class CrossEntropyLoss:
    def forward(self, predictions, targets):
        predictions = np.clip(predictions, 1e-12, 1 - 1e-12)
        return -np.mean(np.sum(targets * np.log(predictions), axis=1))

    def backward(self, predictions, targets):
        return predictions - targets

In [27]:
# Defining the Sequential Model class
class Sequential:
    def __init__(self, layers):
        self.layers = layers
        self.loss_fn = None
        self.learning_rate = None

    def compile(self, loss, optimizer, learning_rate):
        # Set loss function and optimizer
        if loss == 'cross_entropy':
            self.loss_fn = CrossEntropyLoss()
        self.learning_rate = learning_rate
        self.optimizer = optimizer

    def forward(self, x, training=True):
        for layer in self.layers:
            x = layer.forward(x, training=training)
        return x

    def backward(self, grad):
        for layer in reversed(self.layers):
            grad = layer.backward(grad)

    def update_weights(self):
        for layer in self.layers:
            if hasattr(layer, 'update_weights'):
                layer.update_weights(self.learning_rate)

    def fit(self, X, y, epochs, batch_size):
        for epoch in range(epochs):
            indices = np.arange(X.shape[0])
            np.random.seed(148)
            np.random.shuffle(indices)
            X_shuffled = X[indices]
            y_shuffled = y[indices]
            epoch_loss = 0
            correct_predictions = 0

            for i in range(0, X.shape[0], batch_size):
                X_batch = X_shuffled[i:i + batch_size]
                y_batch = y_shuffled[i:i + batch_size]

               
                predictions = self.forward(X_batch, training=True)

                #  loss
                loss = self.loss_fn.forward(predictions, y_batch)
                epoch_loss += loss

                # Acc
                predicted_labels = np.argmax(predictions, axis=1)
                true_labels = np.argmax(y_batch, axis=1)
                correct_predictions += np.sum(predicted_labels == true_labels)

                # Back pass
                grad = self.loss_fn.backward(predictions, y_batch)
                self.backward(grad)

                # Update weights
                self.update_weights() # adam now

            epoch_loss /= (X.shape[0] // batch_size)
            accuracy = correct_predictions / X.shape[0]

            print(f"Epoch {epoch + 1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.4f}")

    def predict(self, X):
        # Forward pass without training
        predictions = self.forward(X, training=False)
        # floating probabilities to class predictions
        return np.argmax(predictions, axis=1)


In [28]:
# Defining three model architectures
def build_model(config):
    if config == 1:
        return Sequential([
            Dense(units=128, activation='relu', dropout_rate=0.4),
            Dense(units=64, activation='relu', dropout_rate=0.3),
            Dense(units=32, activation='relu', dropout_rate=0.2),
            Dense(units=10, activation='softmax')
        ])
    elif config == 2:
        return Sequential([
            Dense(units=256, activation='relu', dropout_rate=0.5),
            Dense(units=128, activation='relu', dropout_rate=0.4),
            Dense(units=10, activation='softmax')
        ])
    elif config == 3:
        return Sequential([
            Dense(units=256, activation='relu', dropout_rate=0.5),
            Dense(units=64, activation='relu', dropout_rate=0.2),
            Dense(units=32, activation='relu', dropout_rate=0.2),
            Dense(units=16, activation='relu', dropout_rate=0.1),
            Dense(units=10, activation='softmax')
        ])


In [29]:
# Read the CSV files into DataFrames
import pandas as pd
x_train = pd.read_csv("x_train.csv", header=None).values
y_train = pd.read_csv("y_train.csv", header=None).values
x_test = pd.read_csv("x_test.csv", header=None).values
y_test = pd.read_csv("y_test.csv", header=None).values

In [None]:
np.random.seed(120)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=10000, random_state=12)
x_train.shape, x_val.shape, y_train.shape, y_val.shape

In [None]:
learning_rates = [0.001, 0.005, 0.01, 0.02]

best_accuracy = 0
best_model = None
best_config = None

for config_num in range(1, 4):
	for lr in learning_rates:
		model = build_model(config_num)
		model.compile(loss='cross_entropy', optimizer='Adam', learning_rate=lr)
		model.fit(x_train, y_train, epochs=10, batch_size=64)

		predictions = model.predict(x_val)
		test_accuracy = np.mean(predictions == np.argmax(y_val, axis=1))


		if test_accuracy > best_accuracy:
			best_accuracy = test_accuracy
			best_model = model
			best_config = (config_num, lr)
		
		true_labels = np.argmax(y_val, axis=1)
		predicted_labels = predictions

		conf_matrix = confusion_matrix(true_labels, predicted_labels)
		conf_matrix_filename = f'confusion_matrix_model_{config_num}_lr_{lr}.csv'
		np.savetxt(conf_matrix_filename, conf_matrix, delimiter=',')

		precision = precision_score(true_labels, predicted_labels, average=None)
		recall = recall_score(true_labels, predicted_labels, average=None)
		f1 = f1_score(true_labels, predicted_labels, average=None)

		precision_macro = precision_score(true_labels, predicted_labels, average='macro')
		recall_macro = recall_score(true_labels, predicted_labels, average='macro')
		f1_macro = f1_score(true_labels, predicted_labels, average='macro')

		metrics_filename = f'metrics_model_{config_num}_lr_{lr}.csv'
		with open(metrics_filename, 'w', newline='') as file:
			writer = csv.writer(file)
    		# Write header
			writer.writerow(['Class', 'Precision', 'Recall', 'F1-score'])
    
    		# Write class-level metrics
			for i, (p, r, f) in enumerate(zip(precision, recall, f1)):
				writer.writerow([f'Class {i}', p, r, f])
    
    		# Write macro metrics
			writer.writerow(['Macro Average', precision_macro, recall_macro, f1_macro])

# the best model configuration and accuracy
print(f"Best model configuration: Model {best_config[0]} with learning rate {best_config[1]}")
print(f"Best accuracy: {best_accuracy:.4f}")


In [None]:
# Define model architecture using Sequential
model = Sequential([
    Dense(units=128, activation='relu', dropout_rate=0.4),
    Dense(units=64, activation='relu', dropout_rate=0.3),
    Dense(units=32, activation='relu', dropout_rate=0.2),
    Dense(units=10, activation='softmax')
])

model.compile(loss='cross_entropy', optimizer='Adam', learning_rate=0.005)

# Train the model
model.fit(x_train, y_train, epochs=10, batch_size=64)

predictions = model.predict(x_test)
test_accuracy = np.mean(predictions == np.argmax(y_test, axis=1))
print(f"Test Accuracy: {test_accuracy:.4f}")


In [None]:
# true labels and predictions
true_labels = np.argmax(y_test, axis=1)
predicted_labels = predictions

# confusion matrix
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print("Confusion Matrix:\n", conf_matrix)

# precision, recall, and F1-score for 10 class
precision = precision_score(true_labels, predicted_labels, average=None)
recall = recall_score(true_labels, predicted_labels, average=None)
f1 = f1_score(true_labels, predicted_labels, average=None)

print("\nPrecision per class:", precision)
print("Recall per class:", recall)
print("F1-score per class:", f1)

# macro avg
precision_macro = precision_score(true_labels, predicted_labels, average='macro')
recall_macro = recall_score(true_labels, predicted_labels, average='macro')
f1_macro = f1_score(true_labels, predicted_labels, average='macro')

print("\nOverall Precision (Macro):", precision_macro)
print("Overall Recall (Macro):", recall_macro)
print("Overall F1-score (Macro):", f1_macro)

In [34]:
model = best_model
weights_data = []
for layer in model.layers:
    layer_weights = [node.weights for node in layer.nodes]  # Collecting weights for nodes in each layer
    layer_biases = [node.bias for node in layer.nodes]      # Collecting biases for nodes in each layer
    layer_gamma = layer.batch_norm.params.gamma
    layer_beta = layer.batch_norm.params.beta
    layer_running_mean = layer.batch_norm.running_stats.running_mean
    layer_running_var = layer.batch_norm.running_stats.running_var
    layer_epsilon = layer.batch_norm.params.epsilon
    weights_data.append({'weights': layer_weights, 
                         'biases': layer_biases,
                         'gamma':layer_gamma,
                         'beta':layer_beta,
                         'r_mean':layer_running_mean,
                         'r_var':layer_running_var,
                         'epsilon':layer_epsilon
						 })
    

In [35]:
# Save weights_data using pickle

model_data = {
    'weights_data': weights_data,
    'best_config': best_config
}



with open('model_1905012.pkl', 'wb') as file:
    pickle.dump(model_data, file)

In [36]:
# Loading weights

with open('model_1905012.pkl', 'rb') as file:
    model_data = pickle.load(file)


weights_data = model_data['weights_data']
best_config = model_data['best_config']


# best model architecture
model_now = build_model(best_config[0])
model_now.compile(loss='cross_entropy', optimizer='Adam', learning_rate=best_config[1])

input_dim = 784  #  28x28 image 
for i, layer in enumerate(model_now.layers):
    layer.build(input_dim)    # Build layer to initialize nodes
    input_dim = layer.units   

for layer, layer_data in zip(model_now.layers, weights_data):
    for node, weights, bias in zip(layer.nodes, layer_data['weights'], layer_data['biases']):
        node.weights = np.array(weights)
        node.bias = np.array(bias)
    
    # batch normalization parameters EKHANE ----------------------------------
    if layer.batch_norm is not None:
        layer.batch_norm.params.gamma = np.array(layer_data['gamma'])
        layer.batch_norm.params.beta = np.array(layer_data['beta'])
        layer.batch_norm.running_stats.running_mean = np.array(layer_data['r_mean'])
        layer.batch_norm.running_stats.running_var = np.array(layer_data['r_var'])
        layer.batch_norm.params.epsilon = layer_data['epsilon']


In [None]:
# Predicting labels for the test set
predictions = model_now.predict(x_test)
test_accuracy = np.mean(predictions == np.argmax(y_test, axis=1))
print(f"Test Accuracy: {test_accuracy:.4f}")