In [14]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from keras.datasets import fashion_mnist

In [16]:
def create_validation_set(X, Y, val_ratio=0.2, seed=None):
    if seed is not None:
        np.random.seed(seed)
    
    n_samples = X.shape[0]
    indices = np.random.permutation(n_samples)
    split_index = int(n_samples * (1 - val_ratio))
    train_indices = indices[:split_index]
    val_indices = indices[split_index:]
    
    X_train = X[train_indices]
    Y_train = Y[train_indices]
    X_val = X[val_indices]
    Y_val = Y[val_indices]
    
    return X_train, X_val, Y_train, Y_val

(X_train, y_train), (X_test, y_test) = fashion_mnist.load_data()
(X_train, X_val, y_train, y_val) = create_validation_set(X_train, y_train, val_ratio=0.1, seed=42)

class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat', 
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

# Create a DataFrame for the training data
X_train_flat = X_train.reshape(X_train.shape[0], -1) / 255.0
train_df = pd.DataFrame(X_train_flat)
train_df['label'] = y_train
train_df['label_name'] = [class_names[label] for label in y_train]

# Create a DataFrame for the validation data
X_val_flat = X_val.reshape(X_val.shape[0], -1) / 255.0
val_df = pd.DataFrame(X_val_flat)
val_df['label'] = y_val
val_df['label_name'] = [class_names[label] for label in y_val]

# Create a DataFrame for the test data
X_test_flat = X_test.reshape(X_test.shape[0], -1) / 255.0
test_df = pd.DataFrame(X_test_flat)
test_df['label'] = y_test
test_df['label_name'] = [class_names[label] for label in y_test]

In [7]:

# Base Optimizer class
class Optimizer:
    def __init__(self, W, B, **kwargs):
        """
        Initialize optimizer with weights and biases shapes.
        
        Parameters:
        W (list): List of weight matrices
        B (list): List of bias vectors
        **kwargs: Optimizer-specific parameters
        """
        self.L = len(W)  # Number of layers
        self.params = kwargs
    
    def update(self, W, B, dW, dB, iteration):
        """
        Update weights and biases based on gradients.
        
        Parameters:
        W (list): Current weights
        B (list): Current biases
        dW (list): Weight gradients
        dB (list): Bias gradients
        iteration (int): Current iteration number
        
        Returns:
        tuple: (new_W, new_B) updated weights and biases
        """
        raise NotImplementedError("Each optimizer must implement this method")


class SGDOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.01, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
    
    def update(self, W, B, dW, dB, iteration):
        for i in range(self.L):
            W[i] -= self.learning_rate * dW[i]
            B[i] -= self.learning_rate * dB[i]
        return W, B


class MomentumOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.01, momentum=0.9, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
        self.momentum = momentum
        
        # Initialize velocity vectors
        self.v_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.v_B = [np.zeros_like(B[i]) for i in range(self.L)]
    
    def update(self, W, B, dW, dB, iteration):
        for i in range(self.L):
            # Update velocity
            self.v_W[i] = self.momentum * self.v_W[i] - self.learning_rate * dW[i]
            self.v_B[i] = self.momentum * self.v_B[i] - self.learning_rate * dB[i]
            
            # Update parameters
            W[i] += self.v_W[i]
            B[i] += self.v_B[i]
        
        return W, B


class NesterovOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.01, momentum=0.9, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
        self.momentum = momentum
        
        # Initialize velocity vectors
        self.v_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.v_B = [np.zeros_like(B[i]) for i in range(self.L)]
    
    def update(self, W, B, dW, dB, iteration):
        W_lookahead = [None] * self.L
        B_lookahead = [None] * self.L
        
        # Calculate lookahead position
        for i in range(self.L):
            W_lookahead[i] = W[i] + self.momentum * self.v_W[i]
            B_lookahead[i] = B[i] + self.momentum * self.v_B[i]
        
        # Update velocity
        for i in range(self.L):
            self.v_W[i] = self.momentum * self.v_W[i] - self.learning_rate * dW[i]
            self.v_B[i] = self.momentum * self.v_B[i] - self.learning_rate * dB[i]
            
            # Update parameters
            W[i] += self.v_W[i]
            B[i] += self.v_B[i]
        
        return W, B


class RMSpropOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.001, decay_rate=0.9, epsilon=1e-8, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
        self.decay_rate = decay_rate
        self.epsilon = epsilon
        
        # Initialize cache vectors
        self.cache_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.cache_B = [np.zeros_like(B[i]) for i in range(self.L)]
    
    def update(self, W, B, dW, dB, iteration):
        for i in range(self.L):
            # Update cache with squared gradients
            self.cache_W[i] = self.decay_rate * self.cache_W[i] + (1 - self.decay_rate) * np.square(dW[i])
            self.cache_B[i] = self.decay_rate * self.cache_B[i] + (1 - self.decay_rate) * np.square(dB[i])
            
            # Update parameters
            W[i] -= self.learning_rate * dW[i] / (np.sqrt(self.cache_W[i]) + self.epsilon)
            B[i] -= self.learning_rate * dB[i] / (np.sqrt(self.cache_B[i]) + self.epsilon)
        
        return W, B


class AdamOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        
        # Initialize moment vectors
        self.m_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.m_B = [np.zeros_like(B[i]) for i in range(self.L)]
        
        # Initialize velocity vectors
        self.v_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.v_B = [np.zeros_like(B[i]) for i in range(self.L)]
    
    def update(self, W, B, dW, dB, iteration):
        t = iteration + 1  # Timestep starts at 1
        
        for i in range(self.L):
            # Update biased first and second moment estimates
            self.m_W[i] = self.beta1 * self.m_W[i] + (1 - self.beta1) * dW[i]
            self.m_B[i] = self.beta1 * self.m_B[i] + (1 - self.beta1) * dB[i]
            
            self.v_W[i] = self.beta2 * self.v_W[i] + (1 - self.beta2) * np.square(dW[i])
            self.v_B[i] = self.beta2 * self.v_B[i] + (1 - self.beta2) * np.square(dB[i])
            
            # Compute bias-corrected first and second moment estimates
            m_W_corrected = self.m_W[i] / (1 - self.beta1**t)
            m_B_corrected = self.m_B[i] / (1 - self.beta1**t)
            
            v_W_corrected = self.v_W[i] / (1 - self.beta2**t)
            v_B_corrected = self.v_B[i] / (1 - self.beta2**t)
            
            # Update parameters
            W[i] -= self.learning_rate * m_W_corrected / (np.sqrt(v_W_corrected) + self.epsilon)
            B[i] -= self.learning_rate * m_B_corrected / (np.sqrt(v_B_corrected) + self.epsilon)
        
        return W, B


class NadamOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.epsilon = epsilon
        
        # Initialize moment vectors
        self.m_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.m_B = [np.zeros_like(B[i]) for i in range(self.L)]
        
        # Initialize velocity vectors
        self.v_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.v_B = [np.zeros_like(B[i]) for i in range(self.L)]
    
    def update(self, W, B, dW, dB, iteration):
        t = iteration + 1  # Timestep starts at 1
        
        for i in range(self.L):
            # Update biased first and second moment estimates
            self.m_W[i] = self.beta1 * self.m_W[i] + (1 - self.beta1) * dW[i]
            self.m_B[i] = self.beta1 * self.m_B[i] + (1 - self.beta1) * dB[i]
            
            self.v_W[i] = self.beta2 * self.v_W[i] + (1 - self.beta2) * np.square(dW[i])
            self.v_B[i] = self.beta2 * self.v_B[i] + (1 - self.beta2) * np.square(dB[i])
            
            # Compute bias-corrected first and second moment estimates
            m_W_corrected = self.m_W[i] / (1 - self.beta1**t)
            m_B_corrected = self.m_B[i] / (1 - self.beta1**t)
            
            v_W_corrected = self.v_W[i] / (1 - self.beta2**t)
            v_B_corrected = self.v_B[i] / (1 - self.beta2**t)
            
            # Apply Nesterov momentum to first moment estimate
            m_W_nesterov = self.beta1 * m_W_corrected + (1 - self.beta1) * dW[i] / (1 - self.beta1**t)
            m_B_nesterov = self.beta1 * m_B_corrected + (1 - self.beta1) * dB[i] / (1 - self.beta1**t)
            
            # Update parameters
            W[i] -= self.learning_rate * m_W_nesterov / (np.sqrt(v_W_corrected) + self.epsilon)
            B[i] -= self.learning_rate * m_B_nesterov / (np.sqrt(v_B_corrected) + self.epsilon)
        
        return W, B


# Example usage:
def one_hot_encode(labels, num_classes):
    """Convert numeric labels to one-hot encoded vectors"""
    m = labels.shape[0]
    one_hot = np.zeros((num_classes, m))
    for i in range(m):
        one_hot[labels[i], i] = 1
    return one_hot

# Example of how to use the framework
def example_usage():
    # Generate some synthetic data
    np.random.seed(42)
    X = np.random.randn(2, 500)  # 500 examples with 2 features
    y = (X[0, :] > 0).astype(int)  # Binary classification
    y_one_hot = one_hot_encode(y, 2)  # One-hot encode
    
    # Create a neural network with 2-5-2 architecture
    layer_sizes = [2, 5, 2]
    activation_functions = ['relu', 'softmax']
    
    nn = NeuralNetwork(layer_sizes, activation_functions)
    
    # Set optimizer and train
    nn.set_optimizer('adam', learning_rate=0.01, beta1=0.9, beta2=0.999)
    
    # Train the network
    history = nn.train(X, y_one_hot, batch_size=32, num_epochs=10, log_every=50)
    
    # Make predictions
    predictions = nn.predict(X)
    predicted_classes = np.argmax(predictions, axis=0)
    
    # Calculate accuracy
    accuracy = np.mean(predicted_classes == y)
    print(f"Accuracy: {accuracy * 100:.2f}%")
    
    return nn, history

# To add a new optimizer like Eve, simply create a new class:
class EveOptimizer(Optimizer):
    def __init__(self, W, B, learning_rate=0.001, beta1=0.9, beta2=0.999, beta3=0.999, k=0.1, K=10, **kwargs):
        super().__init__(W, B, **kwargs)
        self.learning_rate = learning_rate
        self.beta1 = beta1
        self.beta2 = beta2
        self.beta3 = beta3
        self.k = k
        self.K = K
        self.epsilon = 1e-8
        
        # Initialize Adam moment vectors
        self.m_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.m_B = [np.zeros_like(B[i]) for i in range(self.L)]
        self.v_W = [np.zeros_like(W[i]) for i in range(self.L)]
        self.v_B = [np.zeros_like(B[i]) for i in range(self.L)]
        
        # Eve-specific variables
        self.d = 1  # Initial value for d
        self.prev_loss = None  # Previous loss
        
    def update(self, W, B, dW, dB, iteration, current_loss=None):
        if current_loss is None:
            # If loss is not provided, use a placeholder (not ideal)
            current_loss = 1.0
            
        t = iteration + 1  # Timestep starts at 1
        
        # Update d based on loss change - Eve's special feature
        if self.prev_loss is not None:
            delta_loss = abs((current_loss / self.prev_loss) - 1)
            c = min(max(delta_loss, 1/self.K), self.k)
            
            if current_loss >= self.prev_loss:
                self.d *= (1 + c)
            else:
                self.d /= (1 + c)
        
        self.prev_loss = current_loss
        
        # Update parameters using Adam with the d factor
        for i in range(self.L):
            # Update biased first and second moment estimates (same as Adam)
            self.m_W[i] = self.beta1 * self.m_W[i] + (1 - self.beta1) * dW[i]
            self.m_B[i] = self.beta1 * self.m_B[i] + (1 - self.beta1) * dB[i]
            
            self.v_W[i] = self.beta2 * self.v_W[i] + (1 - self.beta2) * np.square(dW[i])
            self.v_B[i] = self.beta2 * self.v_B[i] + (1 - self.beta2) * np.square(dB[i])
            
            # Compute bias-corrected first and second moment estimates
            m_W_corrected = self.m_W[i] / (1 - self.beta1**t)
            m_B_corrected = self.m_B[i] / (1 - self.beta1**t)
            
            v_W_corrected = self.v_W[i] / (1 - self.beta2**t)
            v_B_corrected = self.v_B[i] / (1 - self.beta2**t)
            
            # Update parameters with Eve's d factor
            W[i] -= (self.learning_rate / self.d) * m_W_corrected / (np.sqrt(v_W_corrected) + self.epsilon)
            B[i] -= (self.learning_rate / self.d) * m_B_corrected / (np.sqrt(v_B_corrected) + self.epsilon)
        
        return W, B

# If needed, update the NeuralNetwork class to accept loss in the optimizer update
# by modifying the train method to pass current_loss to the optimizer

In [184]:
class NeuralNetwork:
    def __init__(self, layer_sizes=[784, 17, 19, 10], activation_functions=['sigmoid', 'sigmoid', 'softmax']):
        assert len(layer_sizes) - 1 == len(activation_functions), "Number of layers (excluding input layer) and activations must match"
        self.layer_sizes = layer_sizes
        self.L = len(layer_sizes) - 1
        self.activation_functions = activation_functions
        
        self.W = [np.random.uniform(-0.5, 0.5, (self.layer_sizes[i], self.layer_sizes[i - 1])) for i in range(1, len(self.layer_sizes))]
        self.B = [np.zeros(self.layer_sizes[i]).reshape(1,-1) for i in range(1, len(self.layer_sizes))]
        
        self.optimizer = None
    
    def set_optimizer(self, optimizer_name, **kwargs):
        optimizer_map = {
            'sgd': SGDOptimizer,
            'momentum': MomentumOptimizer,
            'nesterov': NesterovOptimizer,
            'rmsprop': RMSpropOptimizer,
            'adam': AdamOptimizer,
            'nadam': NadamOptimizer
        }
        
        if optimizer_name not in optimizer_map:
            raise ValueError(f"Unsupported optimizer: {optimizer_name}")
        
        self.optimizer = optimizer_map[optimizer_name](self.W, self.B, **kwargs)
    
    def activate(self, A, activation):
        if activation == 'sigmoid':
            return 1 / (1 + np.exp(-A))
        elif activation == 'relu':
            return np.maximum(0, A)
        elif activation == 'tanh':
            return np.tanh(A)
        elif activation == 'softmax':
            # For numerical stability, subtract max value
            exps = np.exp(A - np.max(A, axis=-1, keepdims=True))
            return exps / np.sum(exps, axis=-1, keepdims=True)
        else:
            raise ValueError(f"Unsupported activation function: {activation}")
    
    def _activate_derivative(self, Z, A, activation):
        """Calculate derivative of activation function"""
        if activation == 'sigmoid':
            return A * (1 - A)
        elif activation == 'relu':
            return (Z > 0).astype(float)
        elif activation == 'tanh':
            return 1 - A**2
        elif activation == 'softmax':
            # This is already handled in backpropagation for softmax+cross entropy
            return 1
        else:
            raise ValueError(f"Unsupported activation function: {activation}")
    
    def forward_propagation(self, X):
        H = [X]
        A = []
        
        for i in range(self.L):
            A.append(np.dot(H[i], self.W[i].T) + self.B[i])
            H.append(self.activate(A[i], self.activation_functions[i]))
        
        return H, A
    
    def compute_loss(self, H_final, y, loss_type='cross_entropy'):
        if y.ndim == 1:
            y = self.one_hot(y)
        m = y.shape[0]  # Number of examples
        
        if loss_type == 'cross_entropy':
            # Add small epsilon to avoid log(0)
            epsilon = 1e-15
            loss = -np.sum(y * np.log(H_final + epsilon)) / m
        elif loss_type == 'mse':
            loss = np.sum((H_final - y)**2) / (2 * m)
        else:
            raise ValueError(f"Unsupported loss function: {loss_type}")
            
        return loss
    
    def _loss_derivative(self, A_final, y, loss_type):
        if loss_type == 'cross_entropy':
            epsilon = 1e-15
            return -y / (A_final + epsilon)
        elif loss_type == 'mse':
            return A_final - y
        else:
            raise ValueError(f"Unsupported loss function: {loss_type}")
    
    def one_hot(self, y):
        one_hot_y = np.zeros((y.size, self.layer_sizes[-1]))
        one_hot_y[np.arange(y.size), y] = 1
        return one_hot_y
    
    def back_propagation(self, X, y, H, A, loss_type='cross_entropy'):
        assert len(H) == self.L + 1 and len(A) == self.L
        N = X.shape[0]
        assert N==y.size and self.L==len(A) and self.L + 1==len(H)
        
        dW, dB = [None] * self.L, [None] * self.L
        
        if loss_type == 'cross_entropy' and self.activation_functions[-1] == 'softmax':
            # Gradient simplifies when using softmax + cross entropy
            dA = H[-1] - self.one_hot(y)
        else:
            dH = self._loss_derivative(H[-1], y, loss_type)
            dA = dH * self._activate_derivative(A[-1], H[-1], self.activation_functions[-1])
            
        dW[-1] = np.dot(dA.T, H[-2]) / N
        dB[-1] = np.sum(dA, axis=0, keepdims=True) / N
        
        for k in range(self.L-2, -1, -1):
            
            dA = np.dot(dA, self.W[k+1]) * self._activate_derivative(A[k], H[k+1], self.activation_functions[k])
            
            dWk = (np.dot(dA.T, H[k])) / N
            dBk = np.sum(dA, axis=0, keepdims=True) / N
            dW[k] = dWk
            dB[k] = dBk
               
        return dW, dB
    
    def predict(self, X):
        H, _ = self.forward_propagation(X)
        return H[-1]
    
    def compute_accuracy(self, X, y):
        y_pred = np.argmax(self.predict(X), axis=1)
        return np.mean(y_pred == y)
    
    def train(self, X_train, y_train, X_val=None, y_val=None, 
              batch_size=32, num_epochs=100, loss_type='cross_entropy', 
              log_every=100):
        
        if self.optimizer is None:
            self.set_optimizer('sgd', learning_rate=0.01)
        
        num_datapoints = X_train.shape[0]
        num_batches = int(np.ceil(num_datapoints / batch_size))
        
        spacer_1 = int(np.log10(num_epochs)+1)
        spacer_2 = int(np.log10(num_batches)+1)
        
        history = {
            'train_loss' : [],
            'val_loss' : [] if X_val is not None else None
        }
        
        iteration = 0
        
        for epoch in range(num_epochs):
            
            permutation = np.random.permutation(num_datapoints)
            X_train = X_train[permutation]
            y_train = y_train[permutation]
            
            for batch in range(num_batches):
                start_idx = batch * batch_size
                end_idx = min((batch + 1) * batch_size, num_datapoints)
                X_batch = X_train[start_idx:end_idx]
                y_batch = y_train[start_idx:end_idx]
                
                H, A = self.forward_propagation(X_batch)
                dW, dB = self.back_propagation(X_batch, y_batch, H, A, loss_type)
                
                self.W, self.B = self.optimizer.update(self.W, self.B, dW, dB, iteration)
                
                if iteration % log_every == 0:
                    train_loss = self.compute_loss(H[-1], y_batch, loss_type)
                    history['train_loss'].append(train_loss)
                    
                    if X_val is not None and y_val is not None:
                        val_loss = self.compute_loss(self.predict(X_val), y_val, loss_type)
                        history['val_loss'].append(val_loss)
                        print(f"Epoch {epoch+1 :>{spacer_1}}/{num_epochs}, Iteration {iteration%num_batches :>{spacer_2}}/{num_batches} --> Train Loss: {train_loss:.5f}, Val Loss: {val_loss:.5f}")
                    else:
                        print(f"Epoch {epoch+1 :>{spacer_1}}/{num_epochs}, Iteration {iteration%num_batches :>{spacer_2}}/{num_batches} --> Train Loss: {train_loss:.5f}")
                
                iteration += 1
                
        return history

In [185]:
nn = NeuralNetwork(layer_sizes=[784, 19, 37, 10], activation_functions=['sigmoid', 'sigmoid', 'softmax'])

In [186]:
H, A = nn.forward_propagation(X_train_flat)
loss = nn.compute_loss(H[-1], y_train)
nn.compute_accuracy(X_val_flat, y_val)

0.09766666666666667

In [187]:
num_trial_datapoints = 10000

nn.train(X_train_flat[:num_trial_datapoints], 
         y_train[:num_trial_datapoints], 
         X_val_flat, y_val, 
         batch_size=64, 
         num_epochs=1000, 
         loss_type='cross_entropy', 
         log_every=5000)
print('--'*20,'DONE','--'*20)
print(nn.compute_accuracy(X_test_flat, y_test))

Epoch    1/1000, Iteration   0/157 --> Train Loss: 2.65683, Val Loss: 2.65174
Epoch   32/1000, Iteration 133/157 --> Train Loss: 1.01875, Val Loss: 1.08887
Epoch   64/1000, Iteration 109/157 --> Train Loss: 0.96116, Val Loss: 0.83183
Epoch   96/1000, Iteration  85/157 --> Train Loss: 1.02493, Val Loss: 0.72283
Epoch  128/1000, Iteration  61/157 --> Train Loss: 0.64649, Val Loss: 0.65294
Epoch  160/1000, Iteration  37/157 --> Train Loss: 0.66520, Val Loss: 0.60245
Epoch  192/1000, Iteration  13/157 --> Train Loss: 0.46197, Val Loss: 0.56630
Epoch  223/1000, Iteration 146/157 --> Train Loss: 0.42672, Val Loss: 0.54031
Epoch  255/1000, Iteration 122/157 --> Train Loss: 0.45493, Val Loss: 0.52193
Epoch  287/1000, Iteration  98/157 --> Train Loss: 0.51880, Val Loss: 0.50789
Epoch  319/1000, Iteration  74/157 --> Train Loss: 0.44559, Val Loss: 0.49679
Epoch  351/1000, Iteration  50/157 --> Train Loss: 0.50553, Val Loss: 0.48795
Epoch  383/1000, Iteration  26/157 --> Train Loss: 0.27686, Val 

In [188]:
nn.compute_accuracy(X_test_flat, y_test)

0.8375

In [189]:
t = 5
a = 129386128746312
# print a with t spaces
print(f'{a :>{t}}')

129386128746312
