<a href="https://colab.research.google.com/github/Carole1998/Tech_Talk_Demo/blob/main/assignement_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tensor

In [None]:
import numpy as np
# A 'Tensor' class in which the elements, the deltas, and a shape are stored.
class Tensor:
    def __init__(self, elements, deltas=None, shape=None):
        """
        Initialize a Tensor with elements, deltas, and shape.

        Args:
            elements: Input data (can be single sample or batch)
            deltas: Gradients (can be single sample or batch)
            shape: Shape of the tensor
        """
        self.elements = np.array(elements)
        if deltas is not None:
            self.deltas = np.array(deltas)
        else:
            self.deltas = None

        # Handle shape
        if shape is not None:
            self.shape = shape
        else:
            if self.elements.ndim == 1:
                self.shape = Shape(len(self.elements))
            else:
                self.shape = Shape(*self.elements.shape)

    @property
    def is_batch(self):
        """Check if tensor contains a batch of samples"""
        return self.elements.ndim > 1

    def get_batch_size(self):
        """Get the batch size if tensor contains a batch"""
        return self.elements.shape[0] if self.is_batch else 1

    def get_sample(self, index):
        """Get a single sample from the batch"""
        if not self.is_batch:
            return self
        return Tensor(
            elements=self.elements[index],
            deltas=self.deltas[index] if self.deltas is not None else None,
            shape=Shape(*self.elements.shape[1:])
        )

#  A shape class that specifies the dimension of the data
class Shape:
    def __init__(self, *args):
        self.dimensions = args

# A layer class
class Layer:
    def __init__(self):
        self.layer_type = self.__class__.__name__  # Automatically get the class name as layer type
        self.num = None  # Will be set when added to network

    def forward(self, inp: Tensor) -> Tensor:
        """Forward pass of the layer"""
        raise NotImplementedError

    def backward(self, grad_out: Tensor) -> Tensor:
        """Backward pass of the layer"""
        raise NotImplementedError

    def calc_delta_weights(self):
        """Calculate weight updates for the layer"""
        pass  # Default implementation does nothing

class InputLayer(Layer):
    def __init__(self):
        super().__init__()
        self.layer_type = "InputLayer"  # Explicitly set layer type

    def forward(self, inp) -> Tensor:
        """Transform input into a Tensor"""
        if isinstance(inp, Tensor):
            return inp

        # Convert input to Tensor if not already
        elements = np.array(inp)
        deltas = None
        shape = Shape(*elements.shape) if elements.ndim > 1 else Shape(len(elements))
        return Tensor(elements, deltas, shape)

    def backward(self, grad_out: Tensor) -> Tensor:
        """No transformation needed in backward pass"""
        return grad_out

    def calc_delta_weights(self):
        """Input layer has no weights to update"""
        pass

class FullyConnectedLayer(Layer):
    """A fully connected layer that applies a linear transformation to the input
    attributes:
        input_size: the number of input neurons
        output_size: the number of output neurons
        weights: the weights of the layer initialized with random values in [-0.5, 0.5]
        biases: the biases of the layer initialized with random values in [-0.5, 0.5]
        input: the input to the layer
        output: the output of the layer
    """
    def __init__(self, input_size: int, output_size: int):
        super().__init__()
        self.layer_type = "FullyConnectedLayer"  # Explicitly set layer type
        self.input_size = input_size
        self.output_size = output_size

        # Initialize weight matrix with random values in [-0.5, 0.5]
        self.weights = Tensor(
            elements=np.random.uniform(-0.5, 0.5, (output_size, input_size)),
            deltas=np.zeros((output_size, input_size)),
            shape=Shape(output_size, input_size)
        )

        # Initialize bias vector with random values in [-0.5, 0.5]
        self.biases = Tensor(
            elements=np.random.uniform(-0.5, 0.5, output_size),
            deltas=np.zeros(output_size),
            shape=Shape(output_size)
        )

        self.input = None
        self.output = None

    def forward(self, inp: Tensor) -> Tensor:
        """Forward pass computes output = weights * input + biases"""
        self.input = inp

        if inp.is_batch:
            # Batch matrix multiplication
            output_elements = np.dot(inp.elements, self.weights.elements.T) + self.biases.elements
        else:
            # Single sample
            output_elements = np.dot(self.weights.elements, inp.elements) + self.biases.elements

        self.output = Tensor(
            elements=output_elements,
            deltas=np.zeros_like(output_elements),
            shape=Shape(self.output_size) if not inp.is_batch else Shape(inp.get_batch_size(), self.output_size)
        )
        return self.output

    def backward(self, grad_out: Tensor) -> Tensor:
        """Backward pass computes gradients for weights, biases and input"""
        if grad_out.is_batch:
            # Batch processing
            if self.input.is_batch:
                # Accumulate weight deltas
                self.weights.deltas += np.dot(grad_out.elements.T, self.input.elements)
                # Accumulate bias deltas
                self.biases.deltas += np.sum(grad_out.elements, axis=0)
                # Compute input gradients
                input_grads = np.dot(grad_out.elements, self.weights.elements)
            else:
                raise ValueError("Input and gradient batch sizes must match")
        else:
            # Single sample
            # Accumulate weight deltas
            self.weights.deltas += np.outer(grad_out.elements, self.input.elements)
            # Accumulate bias deltas
            self.biases.deltas += grad_out.elements
            # Compute input gradients
            input_grads = np.dot(grad_out.elements, self.weights.elements)

        return Tensor(
            elements=input_grads,
            deltas=None,
            shape=self.input.shape
        )

    def calc_delta_weights(self):
        """Reset deltas after applying updates"""
        self.weights.deltas = np.zeros_like(self.weights.elements)
        self.biases.deltas = np.zeros_like(self.biases.elements)


class ActivationLayer(Layer):
    def __init__(self, activation_fn):
        super().__init__()
        self.layer_type = "ActivationLayer"  # Explicitly set layer type
        self.activation_fn = activation_fn

    def forward(self, inp: Tensor) -> Tensor:
        """Forward pass applies activation function"""
        self.input = inp
        output_elements = self.activation_fn.forward(inp.elements)
        self.output = Tensor(
            elements=output_elements,
            deltas=np.zeros_like(output_elements),
            shape=inp.shape
        )
        return self.output

    def backward(self, grad_out: Tensor) -> Tensor:
        """Backward pass computes gradients for activation function"""
        input_grads = grad_out.elements * self.activation_fn.backward(self.input.elements)
        return Tensor(elements=input_grads, deltas=None, shape=self.input.shape)

    def calc_delta_weights(self):
        """Activation layer has no weights to update"""
        pass

class SigmoidActivation:
    @staticmethod
    def forward(x):
        return 1 / (1 + np.exp(-x))

    @staticmethod
    def backward(x):
        s = 1 / (1 + np.exp(-x))
        return s * (1 - s)

class SigmoidLayer(ActivationLayer):
    def __init__(self):
        super().__init__(SigmoidActivation())
        self.layer_type = "SigmoidLayer"  # Override parent's layer type

class SoftmaxLayer(Layer):
    def __init__(self):
        super().__init__()
        self.layer_type = "SoftmaxLayer"  # Explicitly set layer type
        self.input = None
        self.output = None

    def forward(self, inp: Tensor) -> Tensor:
        """Forward pass applies softmax function"""
        self.input = inp

        # Subtract max for numerical stability
        if inp.is_batch:
            exp_elements = np.exp(inp.elements - np.max(inp.elements, axis=1, keepdims=True))
            sum_exp = np.sum(exp_elements, axis=1, keepdims=True)
            output_elements = exp_elements / sum_exp
        else:
            exp_elements = np.exp(inp.elements - np.max(inp.elements))
            sum_exp = np.sum(exp_elements)
            output_elements = exp_elements / sum_exp

        self.output = Tensor(
            elements=output_elements,
            deltas=np.zeros_like(output_elements),
            shape=inp.shape
        )
        return self.output

    def backward(self, grad_out) -> Tensor:
        """Backward pass computes gradients for softmax"""
        if isinstance(grad_out, np.ndarray):
            grad_out = Tensor(elements=grad_out, deltas=None, shape=Shape(len(grad_out)))

        if grad_out.is_batch:
            # Vectorized
          input_grads = self.output.elements * (grad_out.elements - (grad_out.elements * self.output.elements).sum(axis=1, keepdims=True))

        else:
          input_grads = self.output.elements * (grad_out.elements - np.dot(grad_out.elements, self.output.elements))

        return Tensor(
            elements=input_grads,
            deltas=None,
            shape=self.input.shape
        )

    def calc_delta_weights(self):
        """Softmax layer has no weights to update"""
        pass




# Network



In [None]:
  from typing import List
  import numpy as np
  import os
  import pickle

  class Network:
      def __init__(self, layers: List[Layer]=None, loss_fn=None, activation_fn=None, learning_rate=0.01):
          """Initialize network with layers, loss function, activation and learning rate"""
          self.layers = layers if layers is not None else []
          self.loss_fn = loss_fn
          self.activation_fn = activation_fn
          self.learning_rate = learning_rate

          # Add unique identifiers to each layer for saving/loading
          for i, layer in enumerate(self.layers):
              layer.layer_type = layer.__class__.__name__
              layer.num = i

      def add_layer(self, layer):
          """Add a layer to the network and assign it a unique number"""
          layer.num = len(self.layers)  # Assign the next available number
          self.layers.append(layer)

      def forward(self, x):
          """
          Perform forward pass through all layers.

          Args:
              x: Input data

          Returns:
              Tensor: Network output
          """
          current = x
          for layer in self.layers:
              current = layer.forward(current)
          return current

      def backward(self, grad):
          """
          Perform backward pass (backpropagation) through all layers.

          Args:
              grad: Gradient from the loss function
          """
          current_grad = grad
          for layer in reversed(self.layers):
              current_grad = layer.backward(current_grad)

      def update_weights(self):
          """
          Update network weights using Stochastic Gradient Descent (SGD).
          Updates both weights and biases for all layers that have them.
          """
          for layer in self.layers:
              if hasattr(layer, 'weights'):
                  layer.weights.elements -= self.learning_rate * layer.weights.deltas

                  # Update biases using SGD
                  layer.biases.elements -= self.learning_rate * layer.biases.deltas

              layer.calc_delta_weights()

      def compute_loss(self, output, target):
          """
          Compute the loss between network output and target.
          Supports both cross-entropy (for classification) and MSE (for regression).

          Args:
              output: Network output tensor
              target: Target values tensor

          Returns:
              float: Computed loss value
          """
          if isinstance(target.elements, np.ndarray) and target.elements.ndim > 1:
              # Cross entropy for classification tasks
              epsilon = 1e-15  # Small value to avoid log(0)
              output_clipped = np.clip(output.elements, epsilon, 1 - epsilon)
              return -np.sum(target.elements * np.log(output_clipped))
          else:
              # Mean Squared Error for regression tasks
              return np.mean((np.array(output.elements) - np.array(target.elements)) ** 2)

      def compute_loss_gradient(self, output, target):
          """
          Compute the gradient of the loss function.

          Args:
              output: Network output tensor
              target: Target values tensor

          Returns:
              numpy.ndarray: Gradient of the loss
          """
          if isinstance(target.elements, np.ndarray) and target.elements.ndim > 1:
              # Cross entropy gradient
              return -target.elements/output.elements
          else:
              # MSE gradient
              return 2 * (np.array(output.elements) - np.array(target.elements)) / len(output.elements)

      def save_params(self, folder_path):
          """Save network parameters to files"""
          os.makedirs(folder_path, exist_ok=True)

          for layer in self.layers:
              if hasattr(layer, 'weights'):
                  # Create unique identifier for the layer
                  layer_id = f"{layer.layer_type}_{layer.num}"

                  # Save weights
                  weights_file = os.path.join(folder_path, f"{layer_id}_weights.pkl")
                  with open(weights_file, 'wb') as f:
                      pickle.dump(layer.weights.elements, f)

                  # Save biases
                  biases_file = os.path.join(folder_path, f"{layer_id}_biases.pkl")
                  with open(biases_file, 'wb') as f:
                      pickle.dump(layer.biases.elements, f)

      def load_params(self, folder_path):
          """Load network parameters from files"""
          for layer in self.layers:
              if hasattr(layer, 'weights'):
                  # Create unique identifier for the layer
                  layer_id = f"{layer.layer_type}_{layer.num}"

                  # Load weights
                  weights_file = os.path.join(folder_path, f"{layer_id}_weights.pkl")
                  with open(weights_file, 'rb') as f:
                      layer.weights.elements = pickle.load(f)

                  # Load biases
                  biases_file = os.path.join(folder_path, f"{layer_id}_biases.pkl")
                  with open(biases_file, 'rb') as f:
                      layer.biases.elements = pickle.load(f)

      def train_step(self, x, y, load_existing=False, save_path=None):
          """
          Perform a single training step.

          Args:
              x: Input data
              y: Target values
              load_existing: Whether to load existing weights
              save_path: Path to save/load weights

          Returns:
              float: Loss value for this step
          """
          # Load existing weights if requested
          if load_existing and save_path:
              self.load_params(save_path)
              return None

          # Forward pass
          output = self.forward(x)

          # Compute loss and gradient
          loss = self.compute_loss(output, y)
          grad = self.compute_loss_gradient(output, y)

          # Backward pass
          self.backward(grad)

          # Update weights
          self.update_weights()

          # Save if path provided
          if save_path:
              self.save_params(save_path)

          return loss




# training

In [None]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
import time
import os


def load_mnist():
    # Load MNIST dataset
    print("Loading MNIST dataset...")
    X, y = fetch_openml('mnist_784', version=1, return_X_y=True, as_frame=False)

    # Normalize pixel values to [0, 1]
    X = X / 255.0

    # Convert labels to one-hot encoding
    encoder = OneHotEncoder(sparse_output=False)
    y = encoder.fit_transform(y.reshape(-1, 1))

    # Split into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    return X_train, X_test, y_train, y_test

def create_network():
    # Create network architecture
    network = Network(learning_rate=0.01)  # Reduced learning rate

    # Input layer (784 neurons for MNIST images)
    network.add_layer(InputLayer())

    # Hidden layer 1 (256 neurons)
    network.add_layer(FullyConnectedLayer(784, 256))
    network.add_layer(SigmoidLayer())

    # Hidden layer 2 (128 neurons)
    network.add_layer(FullyConnectedLayer(256, 128))
    network.add_layer(SigmoidLayer())

    # Output layer (10 neurons for digits 0-9)
    network.add_layer(FullyConnectedLayer(128, 10))
    network.add_layer(SoftmaxLayer())

    return network

def evaluate_accuracy(network, X, y, batch_size=1000):
    correct = 0
    total = len(X)

    # Process in batches for faster evaluation
    for i in range(0, total, batch_size):
        batch_X = X[i:i+batch_size]
        batch_y = y[i:i+batch_size]

        # Process each sample in the batch
        x_tensor = Tensor(elements=batch_X, shape=Shape(batch_size, 784))
        output = network.forward(x_tensor)
        predictions = np.argmax(output.elements, axis=1)
        actuals = np.argmax(batch_y, axis=1)
        correct += np.sum(predictions == actuals)

    return correct / total

def train_mnist(epochs=10, batch_size=64):  # Reduced batch size
    # Load data
    X_train, X_test, y_train, y_test = load_mnist()

    # Create network
    network = create_network()

    # Create results directory
    results_dir = "mnist_results"
    os.makedirs(results_dir, exist_ok=True)

    # Training loop
    print("\nStarting training...")
    results = []

    for epoch in range(epochs):
        epoch_start = time.time()
        epoch_loss = 0
        num_batches = len(X_train) // batch_size

        # Shuffle training data
        indices = np.random.permutation(len(X_train))
        X_train_shuffled = X_train[indices]
        y_train_shuffled = y_train[indices]

        # Mini-batch training
        for i in range(0, len(X_train), batch_size):
            batch_X = X_train_shuffled[i:i+batch_size]
            batch_y = y_train_shuffled[i:i+batch_size]

            # Convert batch to tensors
            x_tensor = Tensor(elements=batch_X, deltas=None, shape=Shape(batch_size, 784))
            y_tensor = Tensor(elements=batch_y, deltas=None, shape=Shape(batch_size, 10))

            loss = network.train_step(x_tensor, y_tensor)
            epoch_loss += loss

        # Calculate average loss for epoch
        avg_loss = epoch_loss / num_batches

        # Calculate training and test accuracy (on smaller subset for speed)
        eval_size = min(1000, len(X_train))
        train_accuracy = evaluate_accuracy(network, X_train[:eval_size], y_train[:eval_size])
        test_accuracy = evaluate_accuracy(network, X_test[:eval_size], y_test[:eval_size])

        # Calculate epoch runtime
        epoch_time = time.time() - epoch_start

        # Store results
        results.append({
            'epoch': epoch + 1,
            'runtime': epoch_time,
            'loss': avg_loss,
            'train_accuracy': train_accuracy,
            'test_accuracy': test_accuracy
        })

        print(f"Epoch {epoch + 1}/{epochs}")
        print(f"Runtime: {epoch_time:.2f}s")
        print(f"Average Loss: {avg_loss:.4f}")
        print(f"Training Accuracy: {train_accuracy:.4f}")
        print(f"Test Accuracy: {test_accuracy:.4f}\n")

    # Save network weights
    network.save_params(os.path.join(results_dir, "weights"))

    # Save results to file
    with open(os.path.join(results_dir, "training_results.txt"), "w") as f:
        f.write("Epoch\tRuntime(s)\tLoss\tTrain Accuracy\tTest Accuracy\n")
        for result in results:
            f.write(f"{result['epoch']}\t{result['runtime']:.2f}\t{result['loss']:.4f}\t{result['train_accuracy']:.4f}\t{result['test_accuracy']:.4f}\n")

    return results


In [None]:
train_mnist()

Loading MNIST dataset...

Starting training...
Epoch 1/10
Runtime: 9.56s
Average Loss: 27.6570
Training Accuracy: 0.9520
Test Accuracy: 0.9400

Epoch 2/10
Runtime: 9.62s
Average Loss: 11.5308
Training Accuracy: 0.9770
Test Accuracy: 0.9580

Epoch 3/10
Runtime: 7.07s
Average Loss: 8.3621
Training Accuracy: 0.9840
Test Accuracy: 0.9650

Epoch 4/10
Runtime: 9.56s
Average Loss: 6.5684
Training Accuracy: 0.9850
Test Accuracy: 0.9710

Epoch 5/10
Runtime: 7.11s
Average Loss: 5.3839
Training Accuracy: 0.9880
Test Accuracy: 0.9690

Epoch 6/10
Runtime: 9.29s
Average Loss: 4.5206
Training Accuracy: 0.9930
Test Accuracy: 0.9640

Epoch 7/10
Runtime: 9.62s
Average Loss: 3.8445
Training Accuracy: 0.9930
Test Accuracy: 0.9670

Epoch 8/10
Runtime: 6.99s
Average Loss: 3.2080
Training Accuracy: 0.9900
Test Accuracy: 0.9770

Epoch 9/10
Runtime: 9.56s
Average Loss: 2.7062
Training Accuracy: 0.9950
Test Accuracy: 0.9750

Epoch 10/10
Runtime: 9.61s
Average Loss: 2.2863
Training Accuracy: 0.9940
Test Accuracy

[{'epoch': 1,
  'runtime': 9.56056547164917,
  'loss': np.float64(27.657029216683352),
  'train_accuracy': np.float64(0.952),
  'test_accuracy': np.float64(0.94)},
 {'epoch': 2,
  'runtime': 9.620229005813599,
  'loss': np.float64(11.530846238585422),
  'train_accuracy': np.float64(0.977),
  'test_accuracy': np.float64(0.958)},
 {'epoch': 3,
  'runtime': 7.07407283782959,
  'loss': np.float64(8.362061521596807),
  'train_accuracy': np.float64(0.984),
  'test_accuracy': np.float64(0.965)},
 {'epoch': 4,
  'runtime': 9.56212329864502,
  'loss': np.float64(6.568394656364397),
  'train_accuracy': np.float64(0.985),
  'test_accuracy': np.float64(0.971)},
 {'epoch': 5,
  'runtime': 7.1144537925720215,
  'loss': np.float64(5.383941144777574),
  'train_accuracy': np.float64(0.988),
  'test_accuracy': np.float64(0.969)},
 {'epoch': 6,
  'runtime': 9.285333633422852,
  'loss': np.float64(4.520633843740853),
  'train_accuracy': np.float64(0.993),
  'test_accuracy': np.float64(0.964)},
 {'epoch': 

# Evaluation