In [2]:

import numpy as np
import pickle
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from typing import List, Tuple, Dict

# ---------------------------
# Utilities
# ---------------------------
def one_hot(y, n_classes=None):
    y = np.array(y).reshape(-1, 1)
    if n_classes is None:
        n_classes = np.max(y) + 1
    enc = OneHotEncoder(categories='auto', sparse_output=False)
    enc.fit(np.arange(n_classes).reshape(-1, 1))
    return enc.transform(y)

def accuracy(y_true, y_pred):
    return np.mean(y_true == y_pred)

# ---------------------------
# Weight initialization
# ---------------------------
def init_weights(shape, method='xavier'):
    fan_in, fan_out = shape[0], shape[1]
    if method == 'xavier':
        limit = np.sqrt(6.0 / (fan_in + fan_out))
        return np.random.uniform(-limit, limit, size=shape)
    elif method == 'he':
        std = np.sqrt(2.0 / fan_in)
        return np.random.randn(*shape) * std
    else:
        return np.random.randn(*shape) * 0.01

# ---------------------------
# Layers
# ---------------------------
class Layer:
    def forward(self, X, training=True):
        raise NotImplementedError
    def backward(self, grad):
        raise NotImplementedError
    def params(self):
        return []
    def grads(self):
        return []

class Linear(Layer):
    def __init__(self, in_dim, out_dim, weight_init='xavier', bias_init=0.0, l2=0.0):
        self.W = init_weights((in_dim, out_dim), method=weight_init)
        self.b = np.zeros((1, out_dim)) + bias_init
        self._cache = None
        self.l2 = l2  # weight decay multiplier

        # gradients
        self.dW = np.zeros_like(self.W)
        self.db = np.zeros_like(self.b)

    def forward(self, X, training=True):
        self._cache = X
        return X.dot(self.W) + self.b

    def backward(self, grad):
        X = self._cache  # (N, D)
        N = X.shape[0]
        self.dW = (X.T.dot(grad)) / N + self.l2 * self.W
        self.db = np.sum(grad, axis=0, keepdims=True) / N
        dX = grad.dot(self.W.T)
        return dX

    def params(self):
        return [self.W, self.b]

    def grads(self):
        return [self.dW, self.db]

class BatchNorm(Layer):
    def __init__(self, dim, eps=1e-5, momentum=0.9):
        self.gamma = np.ones((1, dim))
        self.beta = np.zeros((1, dim))
        self.eps = eps
        self.momentum = momentum
        self.running_mean = np.zeros((1, dim))
        self.running_var = np.ones((1, dim))
        self._cache = None
        self.dgamma = np.zeros_like(self.gamma)
        self.dbeta = np.zeros_like(self.beta)

    def forward(self, X, training=True):
        if training:
            mu = np.mean(X, axis=0, keepdims=True)
            var = np.var(X, axis=0, keepdims=True)
            x_hat = (X - mu) / np.sqrt(var + self.eps)
            out = self.gamma * x_hat + self.beta
            self._cache = (X, x_hat, mu, var)
            # update running stats
            self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mu
            self.running_var = self.momentum * self.running_var + (1 - self.momentum) * var
            return out
        else:
            x_hat = (X - self.running_mean) / np.sqrt(self.running_var + self.eps)
            return self.gamma * x_hat + self.beta

    def backward(self, dout):
        X, x_hat, mu, var = self._cache
        N = X.shape[0]
        std_inv = 1.0 / np.sqrt(var + self.eps)

        self.dgamma = np.sum(dout * x_hat, axis=0, keepdims=True)
        self.dbeta = np.sum(dout, axis=0, keepdims=True)

        dx_hat = dout * self.gamma
        dvar = np.sum(dx_hat * (X - mu) * -0.5 * std_inv**3, axis=0, keepdims=True)
        dmu = np.sum(dx_hat * -std_inv, axis=0, keepdims=True) + dvar * np.mean(-2.0 * (X - mu), axis=0, keepdims=True)

        dx = dx_hat * std_inv + dvar * 2.0 * (X - mu) / N + dmu / N
        return dx

    def params(self):
        return [self.gamma, self.beta]

    def grads(self):
        return [self.dgamma, self.dbeta]

class Dropout(Layer):
    def __init__(self, p=0.5):
        assert 0 <= p < 1
        self.p = p
        self.mask = None

    def forward(self, X, training=True):
        if not training or self.p == 0:
            return X
        self.mask = (np.random.rand(*X.shape) > self.p) / (1.0 - self.p)
        return X * self.mask

    def backward(self, grad):
        if self.mask is None:
            return grad
        return grad * self.mask

# ---------------------------
# Activations
# ---------------------------
class ReLU(Layer):
    def __init__(self):
        self._cache = None
    def forward(self, X, training=True):
        self._cache = X
        return np.maximum(0, X)
    def backward(self, grad):
        X = self._cache
        d = grad.copy()
        d[X <= 0] = 0
        return d

class LeakyReLU(Layer):
    def __init__(self, alpha=0.01):
        self.alpha = alpha
        self._cache = None
    def forward(self, X, training=True):
        self._cache = X
        return np.where(X > 0, X, self.alpha * X)
    def backward(self, grad):
        X = self._cache
        d = grad.copy()
        d[X <= 0] *= self.alpha
        return d

class Sigmoid(Layer):
    def __init__(self):
        self._cache = None
    def forward(self, X, training=True):
        s = 1.0 / (1.0 + np.exp(-X))
        self._cache = s
        return s
    def backward(self, grad):
        s = self._cache
        return grad * s * (1 - s)

class Tanh(Layer):
    def __init__(self):
        self._cache = None
    def forward(self, X, training=True):
        t = np.tanh(X)
        self._cache = t
        return t
    def backward(self, grad):
        t = self._cache
        return grad * (1 - t**2)

class Softmax(Layer):
    """
    Softmax layer outputs probabilities row-wise.
    Usually combined with cross-entropy loss for numerical stability.
    """
    def __init__(self):
        self._cache = None
    def forward(self, X, training=True):
        # numerically stable softmax
        exps = np.exp(X - np.max(X, axis=1, keepdims=True))
        probs = exps / np.sum(exps, axis=1, keepdims=True)
        self._cache = probs
        return probs
    def backward(self, grad):
        # grad expected to be dL/dY where Y is softmax output.
        # J = diag(p) - p p^T ; implementing full Jacobian-vector product per-sample
        p = self._cache
        dX = np.empty_like(grad)
        for i in range(p.shape[0]):
            pi = p[i].reshape(-1, 1)
            Ji = np.diagflat(pi) - pi.dot(pi.T)
            dX[i] = grad[i].dot(Ji)
        return dX

# ---------------------------
# Losses
# ---------------------------
class CrossEntropyLoss:
    """
    Combines softmax + cross-entropy in a numerically stable way.
    Assumes logits input (before softmax).
    """
    @staticmethod
    def forward(logits, y_onehot):
        # logits: (N, C), y_onehot: (N, C)
        # stable log-softmax
        shifted = logits - np.max(logits, axis=1, keepdims=True)
        log_probs = shifted - np.log(np.sum(np.exp(shifted), axis=1, keepdims=True))
        loss = -np.sum(y_onehot * log_probs) / logits.shape[0]
        probs = np.exp(log_probs)
        return loss, probs  # also returns probs

    @staticmethod
    def backward(probs, y_onehot):
        # dL/dlogits = (probs - y) / N
        N = probs.shape[0]
        return (probs - y_onehot) / N

class MSELoss:
    @staticmethod
    def forward(pred, target):
        loss = np.mean((pred - target) ** 2)
        return loss
    @staticmethod
    def backward(pred, target):
        N = pred.shape[0]
        return 2 * (pred - target) / N

# ---------------------------
# Optimizers
# ---------------------------
class SGD_Momentum:
    def __init__(self, params: List[np.ndarray], lr=1e-3, momentum=0.9):
        self.lr = lr
        self.momentum = momentum
        self.v = [np.zeros_like(p) for p in params]
        self.params = params

    def step(self, grads: List[np.ndarray]):
        for i, (p, g) in enumerate(zip(self.params, grads)):
            self.v[i] = self.momentum * self.v[i] - self.lr * g
            p += self.v[i]  # in-place update

    def set_lr(self, lr):
        self.lr = lr

class Adam:
    def __init__(self, params: List[np.ndarray], lr=1e-3, b1=0.9, b2=0.999, eps=1e-8):
        self.lr = lr
        self.b1 = b1
        self.b2 = b2
        self.eps = eps
        self.m = [np.zeros_like(p) for p in params]
        self.v = [np.zeros_like(p) for p in params]
        self.t = 0
        self.params = params

    def step(self, grads: List[np.ndarray]):
        self.t += 1
        for i, (p, g) in enumerate(zip(self.params, grads)):
            self.m[i] = self.b1 * self.m[i] + (1 - self.b1) * g
            self.v[i] = self.b2 * self.v[i] + (1 - self.b2) * (g * g)
            m_hat = self.m[i] / (1 - self.b1 ** self.t)
            v_hat = self.v[i] / (1 - self.b2 ** self.t)
            p -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps)

    def set_lr(self, lr):
        self.lr = lr

# ---------------------------
# Model
# ---------------------------
class NeuralNet:
    def __init__(self, layers: List[Layer]):
        self.layers = layers

    def forward(self, X, training=True):
        out = X
        for layer in self.layers:
            out = layer.forward(out, training=training)
        return out

    def backward(self, grad):
        dout = grad
        for layer in reversed(self.layers):
            dout = layer.backward(dout)
        return dout

    def params_and_grads(self) -> Tuple[List[np.ndarray], List[np.ndarray]]:
        ps, gs = [], []
        for layer in self.layers:
            for p, g in zip(layer.params(), layer.grads()):
                ps.append(p)
                gs.append(g)
        return ps, gs

    def predict(self, X):
        logits = self.forward(X, training=False)
        preds = np.argmax(logits, axis=1)
        return preds

    def save(self, path):
        # Save parameters
        params = [p for layer in self.layers for p in layer.params()]
        with open(path, 'wb') as f:
            pickle.dump(params, f)

    def load(self, path):
        with open(path, 'rb') as f:
            params = pickle.load(f)
        i = 0
        for layer in self.layers:
            layer_params = layer.params()
            for j in range(len(layer_params)):
                layer_params[j][...] = params[i]
                i += 1

# ---------------------------
# Training loop
# ---------------------------
def train(model: NeuralNet, X_train, y_train_onehot, X_val, y_val, epochs=100, batch_size=64,
          optimizer_type='adam', lr=1e-3, weight_decay=0.0, lr_decay_step=50, lr_decay_gamma=0.5,
          verbose=1):
    # collect parameters and grads
    params, grads = model.params_and_grads()
    if optimizer_type.lower() == 'adam':
        opt = Adam(params, lr=lr)
    else:
        opt = SGD_Momentum(params, lr=lr, momentum=0.9)

    n = X_train.shape[0]
    history = {'train_loss': [], 'val_acc': []}

    for epoch in range(1, epochs + 1):
        # shuffle
        perm = np.random.permutation(n)
        X_shuf = X_train[perm]
        y_shuf = y_train_onehot[perm]

        epoch_loss = 0.0
        for i in range(0, n, batch_size):
            xb = X_shuf[i:i+batch_size]
            yb = y_shuf[i:i+batch_size]

            logits = model.forward(xb, training=True)
            loss, probs = CrossEntropyLoss.forward(logits, yb)
            epoch_loss += loss * xb.shape[0]

            # backward
            dlogits = CrossEntropyLoss.backward(probs, yb)
            model.backward(dlogits)

            # collect grads and params (they reference actual layer arrays)
            _, grads = model.params_and_grads()

            # step optimizer
            opt.step(grads)

        epoch_loss /= n

        # LR scheduler step decay
        if epoch % lr_decay_step == 0:
            new_lr = opt.lr * lr_decay_gamma
            opt.set_lr(new_lr)

        # validation accuracy
        val_preds = model.predict(X_val)
        val_acc = accuracy(y_val, val_preds)
        history['train_loss'].append(epoch_loss)
        history['val_acc'].append(val_acc)

        if verbose and (epoch % max(1, epochs // 10) == 0 or epoch == 1 or epoch == epochs):
            print(f"Epoch {epoch}/{epochs} - loss: {epoch_loss:.4f} - val_acc: {val_acc:.4f} - lr: {opt.lr:.6f}")

    return history

# ---------------------------
# Example usage
# ---------------------------
if __name__ == "__main__":
    # Create a synthetic multiclass dataset
    X, y = make_classification(n_samples=1500, n_features=20, n_informative=15,
                               n_redundant=2, n_classes=3, random_state=42)
    X = (X - X.mean(axis=0)) / (X.std(axis=0) + 1e-9)  # simple normalization

    X_train, X_tmp, y_train, y_tmp = train_test_split(X, y, test_size=0.3, random_state=1)
    X_val, X_test, y_val, y_test = train_test_split(X_tmp, y_tmp, test_size=0.5, random_state=1)

    y_train_oh = one_hot(y_train, n_classes=3)
    y_val_oh = one_hot(y_val, n_classes=3)

    # Build model: Input(20) -> Linear(128) -> BatchNorm -> ReLU -> Dropout -> Linear(64) -> ReLU -> Linear(3)
    layers = [
        Linear(20, 128, weight_init='he', l2=1e-4),
        BatchNorm(128),
        ReLU(),
        Dropout(p=0.2),
        Linear(128, 64, weight_init='he', l2=1e-4),
        ReLU(),
        Linear(64, 3, weight_init='xavier')
    ]
    net = NeuralNet(layers)

    # Train
    history = train(net, X_train, y_train_oh, X_val, y_val,
                    epochs=80, batch_size=64, optimizer_type='adam', lr=1e-3,
                    lr_decay_step=30, lr_decay_gamma=0.5, verbose=1)

    # Final test accuracy
    preds_test = net.predict(X_test)
    print("Test accuracy:", accuracy(y_test, preds_test))

    # Save model
    net.save("model_params.pkl")
    print("Saved model to model_params.pkl")


Epoch 1/80 - loss: 1.0692 - val_acc: 0.6222 - lr: 0.001000
Epoch 8/80 - loss: 0.5520 - val_acc: 0.7867 - lr: 0.001000
Epoch 16/80 - loss: 0.3793 - val_acc: 0.8400 - lr: 0.001000
Epoch 24/80 - loss: 0.3033 - val_acc: 0.8667 - lr: 0.001000
Epoch 32/80 - loss: 0.2488 - val_acc: 0.9022 - lr: 0.000500
Epoch 40/80 - loss: 0.2123 - val_acc: 0.8978 - lr: 0.000500
Epoch 48/80 - loss: 0.1946 - val_acc: 0.9067 - lr: 0.000500
Epoch 56/80 - loss: 0.1851 - val_acc: 0.9067 - lr: 0.000500
Epoch 64/80 - loss: 0.1790 - val_acc: 0.9067 - lr: 0.000250
Epoch 72/80 - loss: 0.1709 - val_acc: 0.9156 - lr: 0.000250
Epoch 80/80 - loss: 0.1652 - val_acc: 0.9289 - lr: 0.000250
Test accuracy: 0.9111111111111111
Saved model to model_params.pkl
