<a href="https://colab.research.google.com/github/datapirate09/Neural-Network-Builder/blob/main/NeuralNetwork.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import time
import random

In [22]:
class NeuralNetwork:
  def __init__(self, layers, activation='sigmoid'):
    #layers is a list of elements where each element shows no of neurons in that layer
    self.no_of_layers = len(layers)
    self.sizes = layers
    self.weights = [np.random.randn(y, x) for x, y in zip(layers[:-1], layers[1:])]
    self.biases = [np.random.randn(y,1) for y in layers[1:]]
    self.activation = activation


  def get_shape_of_weights(self):
    for i,layer in enumerate(self.weights):
      print(f"Layer {i} has shape {layer.shape}")


  def get_shape_of_bias(self):
    for i,layer in enumerate(self.biases):
      print(f"Layer {i} has shape {layer.shape}")


  def sigmoid(self, z):
    return 1.0/(1.0+np.exp(-z))


  def sigmoid_derivative(self, z):
    return self.sigmoid(z)*(1-self.sigmoid(z))


  def activation_function(self,activation_input):
    if self.activation == 'sigmoid':
      return self.sigmoid(activation_input)


  def activation_function_derivative(self, activation_input):
    if self.activation == 'sigmoid':
      return self.sigmoid_derivative(activation_input)


  def forward_propagation(self, a):
    for i in range(self.no_of_layers-1):
      a = self.activation_function(np.dot(self.weights[i],a) + self.biases[i])
    return a


  def update_weights_and_biases(self, input_data, batch_size=10, learning_rate=3.0):
    random.shuffle(input_data)
    mini_batches = [input_data[k:k+batch_size] for k in range(0, len(input_data), batch_size)]
    for batch in mini_batches:
        self.update_mini_batch(batch, learning_rate)


  def update_mini_batch(self, mini_batch, learning_rate=3.0):
    b_diff = [np.zeros(b.shape) for b in self.biases]
    w_diff = [np.zeros(w.shape) for w in self.weights]
    for data_item in mini_batch:
        x, y = data_item  # training input and label
        activations, z_vector = self.get_activations(x)
        b_error, w_error = self.back_propagation(activations, z_vector, y)
        b_diff = [bd + be for bd, be in zip(b_diff, b_error)]
        w_diff = [wd + we for wd, we in zip(w_diff, w_error)]

    self.weights = [w - (learning_rate / len(mini_batch)) * nw
                    for w, nw in zip(self.weights, w_diff)]
    self.biases = [b - (learning_rate / len(mini_batch)) * nb
                   for b, nb in zip(self.biases, b_diff)]


  def get_activations(self, input_data):
    activation = input_data.reshape(-1, 1)  # Ensure it's a column vector
    activations = [activation]
    z_vector = []

    for b, w in zip(self.biases, self.weights):
        z = np.dot(w, activation) + b
        z_vector.append(z)
        activation = self.activation_function(z)
        activations.append(activation)

    return activations, z_vector



  def cost_derivative(self, output_activations, y):
    return (output_activations-y)


  def back_propagation(self, activations, z_vector, y):
    b_error = [np.zeros(b.shape) for b in self.biases]
    w_error = [np.zeros(w.shape) for w in self.weights]
    delta = self.cost_derivative(activations[-1], y) * self.activation_function_derivative(z_vector[-1])
    b_error[-1] = delta
    w_error[-1] = np.dot(delta, activations[-2].transpose())
    for l in range(2, self.no_of_layers):
        z = z_vector[-l]
        sp = self.activation_function_derivative(z)
        delta = np.dot(self.weights[-l+1].transpose(), delta) * sp
        b_error[-l] = delta
        w_error[-l] = np.dot(delta, activations[-l-1].transpose())
    return (b_error, w_error)


  def fit(self, training_data, training_labels, validation_data=None, validation_labels=None, epochs=20, batch_size = 10, learning_rate = 3.0, callbacks=None):
    n = len(training_data)
    training_data_combined = list(zip(training_data, training_labels))
    if validation_data != None and validation_labels != None:
      validation_data_combined = list(zip(validation_data, validation_labels))
    early_stopping = None
    model_check_point = None
    reduce_lr_on_plateau = None
    if callbacks is None:
      callbacks = []
    for cb in callbacks:
      if isinstance(cb, EarlyStopping):
        early_stopping = cb
      if isinstance(cb, ModelCheckPoint):
        model_check_point = cb
      if isinstance(cb, ReduceLROnPlateau):
        reduce_lr_on_plateau = cb
    for iteration in range(epochs):
      start_time = time.time()
      self.update_weights_and_biases(training_data_combined, batch_size, learning_rate)
      accuracy, loss = self.evaluate(training_data_combined)
      logs = {"accuracy": accuracy, "loss": loss}
      duration = time.time() - start_time
      if validation_data != None and validation_labels != None:
        val_accuracy, val_loss = self.evaluate(validation_data_combined)
        logs["val_accuracy"] = val_accuracy
        logs["val_loss"] = val_loss
        print(f"{n}/{n} ━━━━━━━━━━━━━━━━━━━━ {duration:.0f}s  - accuracy: {accuracy:.4f} - loss: {loss:.4f} - val_accuracy: {val_accuracy:.4f} - val_loss: {val_loss:.4f}")
      else:
        print(f"{n}/{n} ━━━━━━━━━━━━━━━━━━━━ {duration:.0f}s  - accuracy: {accuracy:.4f} - loss: {loss:.4f}")
      if early_stopping:
        early_stopping.on_epoch_end(iteration, logs)
        if early_stopping.stop_training:
            break
      if model_check_point:
        model_check_point.on_epoch_end(iteration, logs)
        if model_check_point.show_check_point:
          best_epoch_details = model_check_point.get_best_epoch_details()
          print(f"accuracy: {best_epoch_details['accuracy']:.4f} - loss: {best_epoch_details['loss']:.4f} - val_accuracy: {best_epoch_details['val_accuracy']:.4f} - val_loss: {best_epoch_details['val_loss']:.4f}")
      if reduce_lr_on_plateau:
        factor = reduce_lr_on_plateau.on_epoch_end(iteration, logs)
        if factor is not None:
          new_lr = max(learning_rate * factor, reduce_lr_on_plateau.min_lr)
          print(f"Reducing learning rate from {learning_rate:.5f} to {new_lr:.5f}")
          learning_rate = new_lr


  def evaluate(self, data):
    correct = 0
    total_loss = 0
    for x, y in data:
        output = self.forward_propagation(x)
        predicted = np.argmax(output)
        actual = np.argmax(y)
        if predicted == actual:
            correct += 1
        total_loss += np.sum((output - y) ** 2)
    accuracy = correct / len(data)
    avg_loss = total_loss / len(data)
    return accuracy, avg_loss


  def predict(self, data):
    predictions = []
    for item in data:
      predictions.append(self.forward_propagation(item))
    return predictions


class EarlyStopping:
  def __init__(self, monitor='val_loss', patience=0):
    self.patience = patience
    self.current_count = 0
    self.monitor = monitor
    self.best_value = None
    self.stop_training = False

  def on_epoch_end(self, epoch, epoch_info):
    current = epoch_info.get(self.monitor)
    if current is None:
      return
    if self.best_value == None or current < self.best_value:
      self.best_value = current
      self.current_count = 0
    else:
      self.current_count += 1
      if self.current_count > self.patience:
        self.stop_training = True


class ModelCheckPoint:
  def __init__(self, monitor='val_loss'):
    self.monitor = monitor
    self.show_check_point = False
    self.previous_best = None
    self.improved_epoch_details = []
  def on_epoch_end(self, epoch, epoch_info):
    current = epoch_info.get(self.monitor)
    if self.previous_best == None or current < self.previous_best:
      self.previous_best = current
      self.store_epoch_details(epoch_info)
      self.show_check_point = True
    else:
      self.show_check_point = False
  def store_epoch_details(self, epoch_info):
    self.improved_epoch_details.append(epoch_info)
  def get_best_epoch_details(self):
    return self.improved_epoch_details[-1]


class ReduceLROnPlateau:
  def __init__(self, monitor='val_loss', factor=0.1, patience=10, mode='min', min_lr=0.0):
    self.monitor = monitor
    self.factor = factor
    self.patience = patience
    self.mode = mode
    self.min_lr = min_lr
    self.current_count = 0
    self.optimal_val = None

  def on_epoch_end(self, epoch, epoch_info):
    current = epoch_info.get(self.monitor)
    if current is None:
      return None
    if self.optimal_val == None or self.check_optimal_condition(current):
      self.optimal_val = current
      self.current_count = 0
      return None
    else:
      self.current_count += 1
      if self.current_count > self.patience:
        return self.factor

  def check_optimal_condition(self, value):
    if self.mode == 'min':
      if value < self.optimal_val:
        return True
    elif self.mode == 'max':
      if value > self.optimal_val:
        return True
    return False


In [16]:
!wget https://github.com/mnielsen/neural-networks-and-deep-learning/raw/master/data/mnist.pkl.gz -O mnist.pkl.gz


--2025-04-08 14:39:54--  https://github.com/mnielsen/neural-networks-and-deep-learning/raw/master/data/mnist.pkl.gz
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/mnielsen/neural-networks-and-deep-learning/master/data/mnist.pkl.gz [following]
--2025-04-08 14:39:54--  https://raw.githubusercontent.com/mnielsen/neural-networks-and-deep-learning/master/data/mnist.pkl.gz
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17051982 (16M) [application/octet-stream]
Saving to: ‘mnist.pkl.gz’


2025-04-08 14:39:55 (131 MB/s) - ‘mnist.pkl.gz’ saved [17051982/17051982]



In [17]:
import gzip
import pickle

file_path = "mnist.pkl.gz"

with gzip.open(file_path, 'rb') as f:
    mnist_data = pickle.load(f, encoding='latin1')

training_data, validation_data, test_data = mnist_data
print(training_data[0].shape)
print(training_data[1].shape)

def vectorized_label(j):
    e = np.zeros((10, 1))
    e[j] = 1.0
    return e

X_train = [x.reshape(-1, 1) for x in training_data[0]]
y_train = [vectorized_label(y) for y in training_data[1]]

X_val = [x.reshape(-1, 1) for x in validation_data[0]]
y_val = [vectorized_label(y) for y in validation_data[1]]

(50000, 784)
(50000,)


In [24]:
model = NeuralNetwork([784,30,10])

In [25]:
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
model_check_point = ModelCheckPoint(monitor='val_loss')
model.fit(X_train, y_train, X_val, y_val, callbacks=[early_stopping,model_check_point])


50000/50000 ━━━━━━━━━━━━━━━━━━━━ 9s  - accuracy: 0.9093 - loss: 0.1502 - val_accuracy: 0.9131 - val_loss: 0.1409
accuracy: 0.9093 - loss: 0.1502 - val_accuracy: 0.9131 - val_loss: 0.1409
50000/50000 ━━━━━━━━━━━━━━━━━━━━ 8s  - accuracy: 0.9233 - loss: 0.1296 - val_accuracy: 0.9278 - val_loss: 0.1230
accuracy: 0.9233 - loss: 0.1296 - val_accuracy: 0.9278 - val_loss: 0.1230


KeyboardInterrupt: 

In [None]:
X_test = [x.reshape(-1, 1) for x in test_data[0]]
y_test = [vectorized_label(y) for y in test_data[1]]
testing_data_combined = list(zip(X_test, y_test))

accuracy, avg_loss = model.evaluate(testing_data_combined)
print(accuracy)
print(avg_loss)

predictions = model.predict(X_test)
print(predictions[0])


0.9429
0.09415478334754462
[[1.71364994e-06]
 [2.36814271e-08]
 [6.89708166e-07]
 [7.66631390e-06]
 [6.93306109e-08]
 [1.21063479e-08]
 [1.71949977e-12]
 [9.99832671e-01]
 [5.54620956e-10]
 [1.13528195e-04]]


In [None]:
import tensorflow
from tensorflow.keras.datasets import fashion_mnist
import numpy as np

def vectorized_label(j):
    e = np.zeros((10, 1))
    e[j] = 1.0
    return e

(X_train_tf, y_train_tf), (X_test_tf, y_test_tf) = fashion_mnist.load_data()

X_train_tf = [x.reshape(-1, 1) / 255.0 for x in X_train_tf]
y_train_tf = [vectorized_label(y) for y in y_train_tf]

X_test_tf = [x.reshape(-1, 1) / 255.0 for x in X_test_tf]
y_test = [vectorized_label(y) for y in y_test_tf]

model = NeuralNetwork([784, 64, 10])
model.fit(X_train_tf, y_train_tf, epochs=10, batch_size=32, learning_rate=3.0)





60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.7493 - loss: 0.3547
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.7744 - loss: 0.3132
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.7856 - loss: 0.3009
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.7882 - loss: 0.2916
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.7971 - loss: 0.2780
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.7972 - loss: 0.2803
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.8072 - loss: 0.2646
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.8101 - loss: 0.2587
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.8092 - loss: 0.2620
60000/60000 ━━━━━━━━━━━━━━━━━━━━ 17s  - accuracy: 0.8123 - loss: 0.2603


In [None]:
testing_data_combined = list(zip(X_test_tf, y_test))

accuracy, avg_loss = model.evaluate(testing_data_combined)
print(accuracy)
print(avg_loss)

0.7957
0.28854674926965623
