In [1]:
from __future__ import annotations
import numpy as np
from typing import List, Tuple, Optional, Dict


# ---------- Activations (stable) ----------

def sigmoid(x: np.ndarray) -> np.ndarray:
    x = np.clip(x, -500, 500)
    return 1.0 / (1.0 + np.exp(-x))


def sigmoid_derivative(z: np.ndarray) -> np.ndarray:
    s = sigmoid(z)
    return s * (1.0 - s)


def relu(x: np.ndarray) -> np.ndarray:
    return np.maximum(0.0, x)


def relu_derivative(z: np.ndarray) -> np.ndarray:
    return (z > 0.0).astype(z.dtype)


def softmax(z: np.ndarray, axis: int = -1) -> np.ndarray:
    # Works for 1D or 2D inputs; subtract max for numerical stability
    if z.ndim == 1:
        z_max = np.max(z)
        exps = np.exp(z - z_max)
        return exps / np.sum(exps)
    z_max = np.max(z, axis=axis, keepdims=True)
    exps = np.exp(z - z_max)
    return exps / np.sum(exps, axis=axis, keepdims=True)


ACTIVATION_FUNCTIONS: Dict[str, callable] = {
    "sigmoid": sigmoid,
    "relu": relu,
    "softmax": softmax,
}

ACTIVATION_DERIVATIVES: Dict[str, callable] = {
    "sigmoid": sigmoid_derivative,
    "relu": relu_derivative,
    # Softmax derivative is not used explicitly with cross-entropy; handled in backward
}


# ---------- Losses (stable) ----------

def mse_loss(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    return float(np.mean((y_true - y_pred) ** 2))


def binary_cross_entropy(y_true: np.ndarray, y_pred: np.ndarray, eps: float = 1e-7) -> float:
    y_pred_c = np.clip(y_pred, eps, 1.0 - eps)
    return float(-np.mean(y_true * np.log(y_pred_c) + (1.0 - y_true) * np.log(1.0 - y_pred_c)))


def categorical_cross_entropy(y_true: np.ndarray, y_pred: np.ndarray, eps: float = 1e-7) -> float:
    # y_true expected one-hot
    y_pred_c = np.clip(y_pred, eps, 1.0 - eps)
    return float(-np.mean(np.sum(y_true * np.log(y_pred_c), axis=1)))


# ---------- Neural Network ----------

class NeuralNetwork:
    """
    A clean NumPy MLP with mini-batch training, L2 regularization, and multiple losses/activations.
    """

    def __init__(
        self,
        layer_sizes: List[int],
        activations: List[str],
        learning_rate: float = 0.01,
        l2: float = 0.0,
        init: str = "auto",  # 'auto', 'xavier', 'he'
        seed: Optional[int] = None,
    ):
        """
        Parameters:
            layer_sizes: sizes per layer including input and output (e.g., [n_in, h1, h2, n_out])
            activations: activation name for each non-input layer (len == len(layer_sizes)-1)
            learning_rate: step size for gradient descent
            l2: L2 regularization coefficient
            init: 'auto' selects Xavier for sigmoid/softmax and He for ReLU; or force 'xavier'/'he'
            seed: random seed for reproducibility
        """
        if seed is not None:
            np.random.seed(seed)

        assert len(activations) == len(layer_sizes) - 1, "activations must match number of layers-1"

        self.layer_sizes = layer_sizes
        self.activation_names = activations
        self.activations = [ACTIVATION_FUNCTIONS[a] for a in activations]
        self.activation_derivatives = [ACTIVATION_DERIVATIVES.get(a, None) for a in activations]
        self.num_layers = len(layer_sizes) - 1
        self.learning_rate = learning_rate
        self.l2 = l2

        self.weights: List[np.ndarray] = []
        self.biases: List[np.ndarray] = []

        for i in range(self.num_layers):
            fan_in = layer_sizes[i]
            fan_out = layer_sizes[i + 1]
            act_name = activations[i]

            chosen_init = init
            if init == "auto":
                if act_name in ("relu",):
                    chosen_init = "he"
                else:
                    chosen_init = "xavier"

            if chosen_init == "he":
                scale = np.sqrt(2.0 / fan_in)
            elif chosen_init == "xavier":
                # Glorot normal (variance 1/fan_in) is a common simplification
                scale = np.sqrt(1.0 / fan_in)
            else:
                raise ValueError("init must be 'auto', 'xavier', or 'he'")

            W = np.random.randn(fan_in, fan_out) * scale
            b = np.zeros((fan_out,), dtype=W.dtype)
            self.weights.append(W)
            self.biases.append(b)

    # ----- Forward -----

    def forward(
        self,
        X: np.ndarray,
        weights: Optional[List[np.ndarray]] = None,
        biases: Optional[List[np.ndarray]] = None,
    ) -> Tuple[np.ndarray, List[np.ndarray], List[np.ndarray]]:
        W = self.weights if weights is None else weights
        b = self.biases if biases is None else biases

        activations: List[np.ndarray] = [X]
        pre_activations: List[np.ndarray] = []

        a = X
        for l in range(self.num_layers):
            z = a @ W[l] + b[l]
            pre_activations.append(z)
            a = self.activations[l](z)
            activations.append(a)

        return a, activations, pre_activations

    # ----- Backward -----

    def _output_delta(
        self,
        y_true: np.ndarray,
        a_L: np.ndarray,
        z_L: np.ndarray,
        loss: str,
        last_act_name: str,
    ) -> np.ndarray:
        # BCE + sigmoid and CCE + softmax use simplified gradient: a_L - y_true
        if loss in ("bce", "binary_cross_entropy") and last_act_name == "sigmoid":
            return a_L - y_true
        if loss in ("cce", "categorical_cross_entropy") and last_act_name == "softmax":
            return a_L - y_true

        # Otherwise include activation derivative (e.g., MSE with sigmoid, etc.)
        deriv = self.activation_derivatives[-1]
        if deriv is None:
            raise ValueError(f"No derivative defined for output activation '{last_act_name}' with '{loss}'")
        return (a_L - y_true) * deriv(z_L)

    def backward(
        self,
        y_true: np.ndarray,
        activations: List[np.ndarray],
        pre_activations: List[np.ndarray],
        loss: str,
    ) -> Tuple[List[np.ndarray], List[np.ndarray]]:
        m = y_true.shape[0]
        dW: List[np.ndarray] = [None] * self.num_layers  # type: ignore
        db: List[np.ndarray] = [None] * self.num_layers  # type: ignore

        a_L = activations[-1]
        z_L = pre_activations[-1]
        last_act_name = self.activation_names[-1]

        delta = self._output_delta(y_true, a_L, z_L, loss, last_act_name)

        for l in reversed(range(self.num_layers)):
            a_prev = activations[l]
            dW[l] = (a_prev.T @ delta) / m + (self.l2 * self.weights[l])
            db[l] = np.sum(delta, axis=0) / m

            if l > 0:
                deriv = self.activation_derivatives[l - 1]
                if deriv is None:
                    raise ValueError(f"No derivative for activation '{self.activation_names[l-1]}'")
                delta = (delta @ self.weights[l].T) * deriv(pre_activations[l - 1])

        return dW, db

    # ----- Parameters update -----

    def update_parameters(self, dW: List[np.ndarray], db: List[np.ndarray]) -> None:
        for l in range(self.num_layers):
            self.weights[l] -= self.learning_rate * dW[l]
            self.biases[l] -= self.learning_rate * db[l]

    # ----- Loss wrapper -----

    def compute_loss(self, y_true: np.ndarray, y_pred: np.ndarray, loss: str) -> float:
        if loss in ("mse",):
            base = mse_loss(y_true, y_pred)
        elif loss in ("bce", "binary_cross_entropy"):
            base = binary_cross_entropy(y_true, y_pred)
        elif loss in ("cce", "categorical_cross_entropy"):
            base = categorical_cross_entropy(y_true, y_pred)
        else:
            raise ValueError(f"Unknown loss '{loss}'")

        # Add L2 penalty
        if self.l2 > 0.0:
            l2_sum = sum(np.sum(W * W) for W in self.weights)
            base += 0.5 * self.l2 * l2_sum
        return base

    # ----- Training -----

    def train(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray,
        epochs: int = 1000,
        batch_size: Optional[int] = None,
        loss: str = "mse",
        verbose: bool = True,
        validation_data: Optional[Tuple[np.ndarray, np.ndarray]] = None,
    ) -> Dict[str, List[float]]:
        X_train = np.asarray(X_train, dtype=np.float64)
        y_train = np.asarray(y_train, dtype=np.float64)

        # Ensure 2D y for consistent math
        if y_train.ndim == 1:
            y_train = y_train.reshape(-1, 1)

        n = X_train.shape[0]
        if batch_size is None:
            batch_size = n

        history = {"train_loss": [], "val_loss": []}

        for epoch in range(epochs):
            idx = np.random.permutation(n)
            X_shuf = X_train[idx]
            y_shuf = y_train[idx]

            epoch_loss = 0.0
            batches = 0

            for i in range(0, n, batch_size):
                X_batch = X_shuf[i : i + batch_size]
                y_batch = y_shuf[i : i + batch_size]

                y_pred, acts, pre_acts = self.forward(X_batch)
                batch_loss = self.compute_loss(y_batch, y_pred, loss)
                dW, db = self.backward(y_batch, acts, pre_acts, loss)
                self.update_parameters(dW, db)

                epoch_loss += batch_loss
                batches += 1

            avg_loss = epoch_loss / max(1, batches)
            history["train_loss"].append(avg_loss)

            if validation_data is not None:
                X_val, y_val = validation_data
                X_val = np.asarray(X_val, dtype=np.float64)
                y_val = np.asarray(y_val, dtype=np.float64)
                if y_val.ndim == 1:
                    y_val = y_val.reshape(-1, 1)

                val_pred, _, _ = self.forward(X_val)
                val_loss = self.compute_loss(y_val, val_pred, loss)
                history["val_loss"].append(val_loss)

            if verbose and (epoch % max(1, epochs // 10) == 0 or epoch == epochs - 1):
                if validation_data is not None:
                    print(f"Epoch {epoch+1}/{epochs} - loss: {avg_loss:.6f} - val_loss: {val_loss:.6f}")
                else:
                    print(f"Epoch {epoch+1}/{epochs} - loss: {avg_loss:.6f}")

        return history

    # ----- Inference -----

    def predict(self, X: np.ndarray) -> np.ndarray:
        out, _, _ = self.forward(np.asarray(X, dtype=np.float64))
        return out

    def predict_classes(self, X: np.ndarray) -> np.ndarray:
        out = self.predict(X)
        last_act = self.activation_names[-1]
        if last_act == "sigmoid":
            return (out >= 0.5).astype(int)
        if last_act == "softmax":
            return np.argmax(out, axis=1)
        # Fallback for linear/MSE use-cases
        return out

    def evaluate(self, X: np.ndarray, y: np.ndarray, loss: str = "mse") -> float:
        y = np.asarray(y, dtype=np.float64)
        if y.ndim == 1:
            y = y.reshape(-1, 1)
        y_pred = self.predict(X)
        return self.compute_loss(y, y_pred, loss)