# Assignment 3

This project compares three feedforward neural network training algorithms: Stochastic Gradient Descent (SGD), Scaled Conjugate Gradient (SCG), and LeapFrog. Using six datasets—three for classification and three for regression—the study evaluates convergence speed, stability, and predictive accuracy. Each network has a single hidden layer, with experiments across different hidden layer sizes and hyperparameters. Performance is measured using accuracy and F1-score for classification, and MSE, RMSE, and R² for regression, alongside training time and convergence behavior. The results highlight the strengths and weaknesses of each optimizer across problems of varying complexity.

## Setup

In [None]:
! pip3 install -r requirements.txt

In [193]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.datasets import load_iris, fetch_california_housing, fetch_openml
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
import torch
import torch.nn as nn
import torch.optim as optim
from typing import List, Tuple, Optional, Callable
from ucimlrepo import fetch_ucirepo

## Data and Pre Processing

In [None]:
def build_preprocessing_pipeline(X, classification=True):
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns
    categorical_features = X.select_dtypes(include=['object', 'category']).columns

    numeric_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])

    categorical_transformer = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('onehot',  OneHotEncoder(drop='first'))
    ])

    preprocessor = ColumnTransformer([
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ])

    return preprocessor

In [196]:
def preprocess_mnist(X, y, test_size=0.2, random_state=42):
    # Normalize pixel values to [0, 1]
    X_norm = X / 255.0

    # Flatten images (if not already flat)
    if len(X_norm.shape) > 2:
        X_flat = X_norm.reshape(X_norm.shape[0], -1)
    else:
        X_flat = X_norm

    # Encode target labels
    from sklearn.preprocessing import LabelEncoder
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    # Stratified split
    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(
        X_flat, y_encoded, test_size=test_size, random_state=random_state, stratify=y_encoded
    )

    return X_train, X_test, y_train, y_test

In [195]:
def preprocess_california_housing(X, y, skewed_features=['MedInc'], log_target=True, test_size=0.2, random_state=42):
    # Log-transform skewed features
    X_processed = X.copy()
    for col in skewed_features:
        X_processed[col] = np.log1p(X_processed[col])

    # Log-transform target if needed
    if log_target:
        y_processed = np.log1p(y)
    else:
        y_processed = y.copy()

    # Build and fit pipeline
    preprocessor = build_preprocessing_pipeline(X_processed, classification=False)
    X_scaled = preprocessor.fit_transform(X_processed)

    # Train/test split
    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y_processed, test_size=test_size, random_state=random_state
    )

    return X_train, X_test, y_train, y_test

In [197]:
def preprocess_data(X, y, classification=True, test_size=0.2, random_state=42, stratify=True):
    """
    General-purpose preprocessing: applies pipeline and splits data.

    Args:
        X: Features (DataFrame)
        y: Target (Series or array)
        classification: True for classification, False for regression
        test_size: Fraction for test split
        random_state: Seed for reproducibility
        stratify: Use stratified split for classification

    Returns:
        X_train, X_test, y_train, y_test
    """
    # Build pipeline
    preprocessor = build_preprocessing_pipeline(X, classification=classification)
    X_processed = preprocessor.fit_transform(X)

    # Encode target for classification
    if classification:
        from sklearn.preprocessing import LabelEncoder
        le = LabelEncoder()
        y_processed = le.fit_transform(y)
    else:
        y_processed = y

    # Split
    if classification and stratify:
        X_train, X_test, y_train, y_test = train_test_split(
            X_processed, y_processed, test_size=test_size, random_state=random_state, stratify=y_processed
        )
    else:
        X_train, X_test, y_train, y_test = train_test_split(
            X_processed, y_processed, test_size=test_size, random_state=random_state
        )

    return X_train, X_test, y_train, y_test

### Classification

In [None]:
# Iris Dataset
def load_iris_data():
    iris = sns.load_dataset("iris")
    print(iris.head())

    X_iris = iris.drop("species", axis=1)
    y_iris = iris["species"]
    print(X_iris.head())
    print(y_iris.head())
    return X_iris, y_iris

In [None]:
# Stroke Prediction Dataset
def load_stroke_data():
    stroke_data = pd.read_csv("../data/healthcare-dataset-stroke-data.csv")
    print(stroke_data.shape)
    X_stroke = stroke_data.drop("stroke", axis=1)
    y_stroke = stroke_data["stroke"]
    print(X_stroke.head())
    print(y_stroke.head())
    return X_stroke, y_stroke

In [199]:
# MNIST Dataset
def load_mnist_subset(n_samples=10000, random_state=42):
    mnist = fetch_openml('mnist_784', version=1)
    X = pd.DataFrame(mnist.data)
    y = pd.Series(mnist.target, name="digit")
    # Sample 10k rows
    X_mnist = X.sample(n=n_samples, random_state=random_state)
    y_mnist = y.loc[X_mnist.index]
    print(X_mnist.shape)
    print(X_mnist.head())
    print(y_mnist.head())
    return X_mnist, y_mnist

### Function approx.

In [None]:
# Synthetic Sine Wave Dataset
def generate_sine_wave_data(num_samples: int = 1000, noise_level: float = 0.1) -> Tuple[np.ndarray, np.ndarray]:
    X = np.linspace(0, 2 * np.pi, num_samples)
    y = np.sin(X) + noise_level * np.random.randn(num_samples)
    return X.reshape(-1, 1), y.reshape(-1, 1)

In [184]:
# Wine quality dataset from UCI ML Repository
def load_wine_quality_data():
    wine_quality = fetch_ucirepo(id=186) 
    
    # data (as pandas dataframes) 
    X_wine = wine_quality.data.features 
    y_wine = wine_quality.data.targets 
    print(wine_quality.metadata)
    print(wine_quality.variables)
    print(X_wine.head())
    print(y_wine.head())
    print (X_wine.shape, y_wine.shape)
    return X_wine, y_wine

In [200]:
# California Housing Dataset
def load_california_housing_data(n_samples=10000, random_state=42):
    california = fetch_california_housing()
    X = pd.DataFrame(california.data, columns=california.feature_names)
    y = pd.Series(california.target, name="MedHouseVal")
    # Sample 10k rows
    X_california = X.sample(n=n_samples, random_state=random_state)
    y_california = y.loc[X_california.index]
    print(X_california.shape)
    print(X_california.head())
    print(y_california.head())
    return X_california, y_california

## Model

In [50]:
class FeedforwardNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, activation_fn=nn.ReLU):
        super(FeedforwardNN, self).__init__()
        self.hidden = nn.Linear(input_dim, hidden_dim)
        self.activation = activation_fn()
        self.output = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.hidden(x)
        x = self.activation(x)
        x = self.output(x)
        return x

### Training Algorithms

In [143]:
def train_sgd(model, X_train, y_train, X_test, y_test, epochs=50, lr=0.01, batch_size=32, classification=True):
    criterion = nn.CrossEntropyLoss() if classification else nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    train_losses = []
    test_losses = []

    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    # Fix: Use float32 for regression targets
    if classification:
        y_train_tensor = torch.tensor(y_train, dtype=torch.long)
        y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    else:
        y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
        y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

    dataset = torch.utils.data.TensorDataset(X_train_tensor, y_train_tensor)
    loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for xb, yb in loader:
            optimizer.zero_grad()
            outputs = model(xb)
            loss = criterion(outputs, yb)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * xb.size(0)
        train_loss = running_loss / len(loader.dataset)
        train_losses.append(train_loss)

        # Evaluate on test set
        model.eval()
        with torch.no_grad():
            outputs = model(X_test_tensor)
            loss = criterion(outputs, y_test_tensor)
            test_losses.append(loss.item())

    return train_losses, test_losses

In [None]:
# # Example usage of SGD - function approximation
# X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X_sine, y_sine, classification=False)
# model = FeedforwardNN(input_dim=1, hidden_dim=16, output_dim=1)
# train_losses, test_losses = train_sgd(model, X_train_scaled, y_train.astype(np.float32), X_test_scaled, y_test.astype(np.float32), classification=False)
# print("Train Losses:", train_losses)
# print("Test Losses:", test_losses)

Train Losses: [0.2641366285085678, 0.18634218513965606, 0.1823194345831871, 0.18047621548175813, 0.1780942678451538, 0.17647044152021407, 0.17483941376209258, 0.1724669712781906, 0.1709962460398674, 0.16852614551782608, 0.1671838653087616, 0.1653331932425499, 0.16404469221830367, 0.16185179799795152, 0.16005868524312972, 0.15840239375829696, 0.15699963361024857, 0.1552102318406105, 0.1534101501107216, 0.15173113882541656, 0.15004020422697067, 0.14871203601360322, 0.1467205587029457, 0.14426895141601562, 0.14394105732440948, 0.14217049181461333, 0.14105167210102082, 0.13878014475107192, 0.13741614788770676, 0.13608859330415726, 0.13445573180913925, 0.1326293858885765, 0.13131160914897919, 0.1296436882019043, 0.12849194914102555, 0.12695381611585618, 0.12564386934041977, 0.12439741939306259, 0.12267718315124512, 0.1213734084367752, 0.11990109443664551, 0.11816814631223678, 0.11703598380088806, 0.11588238775730134, 0.11457890421152114, 0.11346655637025833, 0.11196885854005814, 0.110629622

In [None]:
# # Example usage of SGD - Classification
# X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X_iris, y_iris, classification=True)
# model = FeedforwardNN(input_dim=4, hidden_dim=16, output_dim=3)
# train_losses, test_losses = train_sgd(model, X_train_scaled, y_train, X_test_scaled, y_test)
# print("Train Losses:", train_losses)
# print("Test Losses:", test_losses)

Train Losses: [1.1347683350245157, 1.1181134939193726, 1.1024710416793824, 1.087291669845581, 1.072637923558553, 1.0583802620569864, 1.0443703571955363, 1.0308705568313599, 1.0174931327501933, 1.004461932182312, 0.9920138756434123, 0.9796104272206624, 0.9673454403877259, 0.9554144382476807, 0.9436902324358623, 0.9318963646888733, 0.9203847408294678, 0.9090999245643616, 0.8978511293729147, 0.8869513034820556, 0.8754758238792419, 0.8644565780957539, 0.8537913044293721, 0.8428460081418355, 0.832503100236257, 0.8221273938814799, 0.812004538377126, 0.801518440246582, 0.7915170073509217, 0.7815437197685242, 0.7717524011929829, 0.7620238184928894, 0.752192231019338, 0.7431997259457906, 0.7337142109870911, 0.7247867425282796, 0.7159688472747803, 0.7070787469546, 0.6985208590825399, 0.6902126948038737, 0.6821887731552124, 0.6737266699473063, 0.6658616582552592, 0.658047862847646, 0.6503567099571228, 0.6428790926933289, 0.6355992595354716, 0.628311284383138, 0.6214141488075257, 0.614430097738901

In [112]:
# SCG Training Function
def train_scg(
    model: nn.Module,
    X_train: torch.Tensor,
    y_train: torch.Tensor,
    X_test: torch.Tensor,
    y_test: torch.Tensor,
    max_epochs: int = 1000,
    tolerance: float = 1e-6,
    sigma: float = 5e-5,
    lambda_init: float = 5e-7,
    verbose: bool = True,
    eval_freq: int = 10
) -> Tuple[List[float], List[float]]:
    """
    Train a PyTorch neural network using Scaled Conjugate Gradient algorithm.
    
    Args:
        model: PyTorch neural network model
        X_train: Training input data
        y_train: Training target data
        X_test: Test input data  
        y_test: Test target data
        max_epochs: Maximum number of training epochs
        tolerance: Convergence tolerance for gradient norm
        sigma: Parameter for Hessian approximation
        lambda_init: Initial regularization parameter
        verbose: Whether to print progress
        eval_freq: Frequency of evaluation and printing
        
    Returns:
        Tuple of (train_losses, test_losses)
    """
    
    device = next(model.parameters()).device
    X_train = X_train.to(device)
    y_train = y_train.to(device)
    X_test = X_test.to(device)
    y_test = y_test.to(device)
    
    # Determine loss function based on model output
    if y_train.dtype == torch.long or (y_train.ndim == 1 and len(torch.unique(y_train)) <= 10):
        criterion = nn.CrossEntropyLoss()
        task_type = 'classification'
    else:
        criterion = nn.MSELoss()
        task_type = 'regression'
    
    # Get total number of parameters
    n_params = sum(p.numel() for p in model.parameters())
    
    # Helper functions
    def get_weights():
        """Extract all model parameters as a single vector"""
        return torch.cat([p.view(-1) for p in model.parameters()])
    
    def set_weights(weights):
        """Set model parameters from a single vector"""
        idx = 0
        for p in model.parameters():
            param_length = p.numel()
            p.data = weights[idx:idx + param_length].view(p.shape)
            idx += param_length
    
    def compute_loss_and_gradient(weights):
        """Compute loss and gradient for given weights"""
        set_weights(weights)
        model.zero_grad()
        
        outputs = model(X_train)
        if task_type == 'classification':
            loss = criterion(outputs, y_train)
        else:
            loss = criterion(outputs, y_train)
        
        loss.backward()
        
        # Extract gradients
        grad = torch.cat([p.grad.view(-1) for p in model.parameters()])
        
        return loss.item(), grad
    
    def evaluate_model():
        """Evaluate model on train and test sets"""
        model.eval()
        with torch.no_grad():
            # Training loss
            train_outputs = model(X_train)
            if task_type == 'classification':
                train_loss = criterion(train_outputs, y_train).item()
            else:
                train_loss = criterion(train_outputs, y_train).item()
            
            # Test loss
            test_outputs = model(X_test)
            if task_type == 'classification':
                test_loss = criterion(test_outputs, y_test).item()
            else:
                test_loss = criterion(test_outputs, y_test).item()
        
        model.train()
        return train_loss, test_loss
    
    # Initialize SCG variables
    w_k = get_weights()
    f_k, g_k = compute_loss_and_gradient(w_k)
    r_k = g_k.clone()
    r_k_prev = None  # Will store previous gradient for beta calculation
    p_k = -r_k.clone()
    
    lambda_k = lambda_init
    lambda_bar = 0.0
    success = True
    k = 0
    
    train_losses = []
    test_losses = []
    
    # Initial evaluation
    train_loss, test_loss = evaluate_model()
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    
    if verbose:
        print(f"Initial - Train Loss: {train_loss:.6f}, Test Loss: {test_loss:.6f}")
    
    # Main SCG loop
    for epoch in range(max_epochs):
        # Step 1: Calculate scaling parameter if successful step
        if success:
            sigma_k = sigma / torch.sqrt(torch.dot(p_k, p_k))
        
        # Step 2: Approximate Hessian-vector product
        w_temp = w_k + sigma_k * p_k
        _, g_temp = compute_loss_and_gradient(w_temp)
        s_k = (g_temp - g_k) / sigma_k
        
        # Step 3 & 4: Scale the search direction
        delta_k = torch.dot(p_k, s_k)
        
        if delta_k <= 0:
            s_k = s_k + (lambda_k - delta_k) * p_k
            delta_k = lambda_k * torch.dot(p_k, p_k)
            lambda_k = 2 * lambda_k
        
        # Step 5: Calculate step length
        mu_k = torch.dot(p_k, r_k)
        alpha_k = -mu_k / delta_k  # Note the negative sign for correct step direction
        
        # Step 6: Calculate comparison parameter  
        Delta_k = alpha_k * mu_k  # Expected decrease (should be negative)
        
        # Step 7: Update weights and evaluate
        w_new = w_k + alpha_k * p_k
        f_new, _ = compute_loss_and_gradient(w_new)
        Delta_f = f_new - f_k
        
        # Step 8: Test for successful reduction
        # In SCG: Delta_k < 0 (predicted decrease), Delta_f should be actual change
        # Accept if actual decrease is at least 25% of predicted decrease
        if Delta_f < 0.25 * Delta_k:
            success = True
            lambda_bar = 0
            
            if Delta_f >= 0.75 * Delta_k:
                lambda_k = lambda_k / 4
            
            # Accept the step
            w_k = w_new
            f_k = f_new
            r_k_prev = r_k.clone()  # Store previous gradient
            _, g_k = compute_loss_and_gradient(w_k)
            r_k = g_k.clone()
            lambda_bar = lambda_bar + lambda_k
            lambda_k = lambda_bar
            
        else:
            success = False
            lambda_bar = lambda_bar + lambda_k
            lambda_k = lambda_bar
        
        # Step 9: Update search direction (only if successful)
        if success:
            # Check for restart condition
            if k % n_params == 0 or r_k_prev is None:
                p_k = -r_k.clone()  # Restart with steepest descent
            else:
                # Polak-Ribiere formula with numerical stability
                beta_k = torch.dot(r_k, r_k - r_k_prev) / (torch.dot(r_k_prev, r_k_prev) + 1e-10)
                p_k = -r_k + beta_k * p_k
            
            k += 1
            
            # Check convergence
            grad_norm = torch.norm(r_k).item()
            if grad_norm < tolerance:
                if verbose:
                    print(f"Converged at epoch {epoch}, gradient norm: {grad_norm:.2e}")
                break
            
            # Evaluate and store losses
            if epoch % eval_freq == 0 or epoch == max_epochs - 1:
                train_loss, test_loss = evaluate_model()
                train_losses.append(train_loss)
                test_losses.append(test_loss)
                
                if verbose:
                    print(f"Epoch {epoch:4d} - Train Loss: {train_loss:.6f}, "
                          f"Test Loss: {test_loss:.6f}, Grad Norm: {grad_norm:.2e}")
    
    # Final evaluation if not already done
    if (max_epochs - 1) % eval_freq != 0:
        train_loss, test_loss = evaluate_model()
        train_losses.append(train_loss)
        test_losses.append(test_loss)
    
    if verbose:
        print(f"Training completed. Final - Train Loss: {train_losses[-1]:.6f}, "
              f"Test Loss: {test_losses[-1]:.6f}")
    
    return train_losses, test_losses

**NOTE:**

*For classification tasks, the output layer is linear and we use `CrossEntropyLoss`, which applies the required softmax internally. For regression tasks, the output layer is also linear, and we use `MSELoss`.*

In [None]:
# # Example usage of SCG - classification
# model = FeedforwardNN(input_dim=4, hidden_dim=16, output_dim=3)
# X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X, y, classification=True)

# X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train, dtype=torch.long)
# X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
# y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# train_losses, test_losses = train_scg(
#     model=model,
#     X_train=X_train_tensor,
#     y_train=y_train_tensor,
#     X_test=X_test_tensor,
#     y_test=y_test_tensor,
#     max_epochs=500,
#     tolerance=1e-5,
#     verbose=True,
#     eval_freq=1
# )
# print("Train Losses:", train_losses)
# print("Test Losses:", test_losses)

In [None]:
# # Example usage of SCG - function approximation
# model = FeedforwardNN(input_dim=1, hidden_dim=16, output_dim=1)
# X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X_sine, y_sine, classification=False)
# X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
# X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
# y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
# train_losses, test_losses = train_scg(
#     model=model,
#     X_train=X_train_tensor,
#     y_train=y_train_tensor,
#     X_test=X_test_tensor,
#     y_test=y_test_tensor,
#     max_epochs=500,
#     tolerance=1e-5,
#     verbose=True,
#     eval_freq=1
# )
# print("Train Losses:", train_losses)
# print("Test Losses:", test_losses)

In [132]:
def train_lfrog(
    model: nn.Module,
    X_train: torch.Tensor,
    y_train: torch.Tensor,
    X_test: torch.Tensor,
    y_test: torch.Tensor,
    loss_fn: Optional[Callable] = None,
    epochs: int = 1000,
    dt: float = 0.5,
    max_step: float = 1.0,
    convergence_tol: float = 1e-5,
    max_consecutive_decreases: int = 2,
    time_step_reduction_threshold: int = 3,
    time_step_increase_factor: float = 0.001,
    batch_size: Optional[int] = None,
    device: str = 'cpu',
    print_every: int = 100,
    early_stopping_patience: int = 50,
    min_improvement: float = 1e-6
) -> Tuple[List[float], List[float]]:
    """
    Train a neural network using Snyman's LeapFrog dynamic optimization algorithm.
    
    Args:
        model: PyTorch neural network model
        X_train, y_train: Training data and labels
        X_test, y_test: Test data and labels
        loss_fn: Loss function (default: MSE for regression, CrossEntropy for classification)
        epochs: Maximum number of iterations
        dt: Initial time step
        max_step: Maximum allowable step size (δ in the paper)
        convergence_tol: Convergence tolerance for gradient norm
        max_consecutive_decreases: j parameter - max consecutive velocity decreases before reset
        time_step_reduction_threshold: m parameter - consecutive steps before time step reduction
        time_step_increase_factor: δ₁ parameter for time step increase
        batch_size: Batch size (None for full batch)
        device: Device to run on
        print_every: Print progress every N steps
    
    Returns:
        Tuple of (train_losses, test_losses) lists
    """
    
    # Move model and data to device
    model = model.to(device)
    X_train, y_train = X_train.to(device), y_train.to(device)
    X_test, y_test = X_test.to(device), y_test.to(device)
    
    # Auto-detect loss function if not provided
    if loss_fn is None:
        if len(y_train.shape) == 1 or y_train.shape[1] == 1:
            if torch.all((y_train == 0) | (y_train == 1)):
                loss_fn = nn.BCEWithLogitsLoss()
            else:
                loss_fn = nn.MSELoss()
        else:
            loss_fn = nn.CrossEntropyLoss()
    
    # Initialize tracking variables
    train_losses = []
    test_losses = []
    
    # Early stopping variables
    best_test_loss = float('inf')
    patience_counter = 0
    
    # Get initial parameters as flat vector
    params = []
    for p in model.parameters():
        params.append(p.view(-1))
    x_k = torch.cat(params)
    n_params = len(x_k)
    
    # Compute initial gradient and velocity
    def compute_loss_and_grad():
        model.zero_grad()
        if batch_size is None:
            outputs = model(X_train)
            loss = loss_fn(outputs, y_train)
        else:
            # Mini-batch gradient
            idx = torch.randperm(len(X_train))[:batch_size]
            outputs = model(X_train[idx])
            loss = loss_fn(outputs, y_train[idx])
        
        loss.backward()
        
        # Extract gradients as flat vector
        grads = []
        for p in model.parameters():
            if p.grad is not None:
                grads.append(p.grad.view(-1))
            else:
                grads.append(torch.zeros_like(p.view(-1)))
        grad = torch.cat(grads)
        
        return loss.item(), grad
    
    def update_model_params(x):
        """Update model parameters from flat parameter vector"""
        idx = 0
        for p in model.parameters():
            param_size = p.numel()
            p.data = x[idx:idx + param_size].view(p.shape)
            idx += param_size
    
    def evaluate_test():
        """Evaluate model on test set"""
        model.eval()
        with torch.no_grad():
            test_outputs = model(X_test)
            test_loss = loss_fn(test_outputs, y_test)
        model.train()
        return test_loss.item()
    
    # Initialize algorithm variables
    train_loss, grad_k = compute_loss_and_grad()
    v_k = -0.5 * grad_k * dt  # Initial velocity
    
    # Algorithm state variables
    consecutive_decreases = 0
    consecutive_negative_dot_products = 0
    successful_steps = 0
    current_dt = dt
    
    print(f"Initial loss: {train_loss:.6f}, Gradient norm: {torch.norm(grad_k):.6f}")
    
    for epoch in range(epochs):
        # Store current state
        x_k_old = x_k.clone()
        v_k_old = v_k.clone()
        grad_k_old = grad_k.clone()
        v_k_norm_old = torch.norm(v_k)
        
        # Step A: Compute step size and limit if necessary
        step_size = torch.norm(v_k) * current_dt
        if step_size > max_step:
            v_k = max_step * v_k / step_size
            step_size = max_step
        
        # Step B: Leap-frog integration
        # Update position
        x_k = x_k + v_k * current_dt
        update_model_params(x_k)
        
        # Compute new gradient and update velocity
        train_loss, grad_k = compute_loss_and_grad()
        a_k = -grad_k  # acceleration (negative gradient)
        v_k = v_k + a_k * current_dt
        
        # Record losses
        test_loss = evaluate_test()
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        
        # Early stopping logic
        if test_loss < best_test_loss - min_improvement:
            best_test_loss = test_loss
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= early_stopping_patience:
                print(f"Early stopping at epoch {epoch}: No improvement in test loss for {early_stopping_patience} epochs")
                print(f"Best test loss: {best_test_loss:.6f}")
                break
        
        # Step C: Check convergence
        grad_norm = torch.norm(grad_k)
        if grad_norm < convergence_tol:
            print(f"Converged at epoch {epoch}: gradient norm {grad_norm:.2e}")
            break
        
        # Time step control - check for gradient direction changes AND velocity-gradient alignment
        if epoch > 0:
            dot_product = torch.dot(grad_k, grad_k_old)
            velocity_gradient_dot = torch.dot(v_k, grad_k)
            
            if dot_product <= 0:
                consecutive_negative_dot_products += 1
            else:
                consecutive_negative_dot_products = 0
                
            # Additional safeguard: if velocity is pointing uphill, this is concerning
            if velocity_gradient_dot > 0:
                consecutive_negative_dot_products += 1  # Treat as problematic
        
        # Time step reduction
        if consecutive_negative_dot_products >= time_step_reduction_threshold:
            current_dt = current_dt / 2
            current_dt = max(current_dt, 1e-6)
            x_k = (x_k + x_k_old) / 2
            v_k = (v_k + v_k_old) / 4
            update_model_params(x_k)
            consecutive_negative_dot_products = 0
            successful_steps = 0
            if print_every > 0 and epoch % print_every == 0:
                print(f"Epoch {epoch}: Reduced time step to {current_dt:.6f}")
        
        # Step D: Energy monitoring (kinetic energy check)
        v_k_norm = torch.norm(v_k)
        if v_k_norm > v_k_norm_old:
            # Kinetic energy increased - successful step
            consecutive_decreases = 0
            if step_size < max_step:
                successful_steps += 1
                # Time step increase - cap growth for stability
                growth_factor = min(1.01, 1 + successful_steps * time_step_increase_factor)
                current_dt = growth_factor * current_dt
                current_dt = max(current_dt, 1e-6)

        else:
            # Kinetic energy decreased - intervene
            consecutive_decreases += 1
            successful_steps = 0
            
            # Restart from midpoint
            x_k = (x_k + x_k_old) / 2
            update_model_params(x_k)
            
            if consecutive_decreases <= max_consecutive_decreases:
                # Reduce velocity
                v_k = (v_k + v_k_old) / 4
            else:
                # Reset velocity to zero
                v_k = torch.zeros_like(v_k)
                consecutive_decreases = 0
        
        # Print progress
        if print_every > 0 and epoch % print_every == 0:
            print(f"Epoch {epoch}: Train Loss = {train_loss:.6f}, "
                  f"Test Loss = {test_loss:.6f}, Grad Norm = {grad_norm:.6f}, "
                  f"dt = {current_dt:.6f}")
    
    return train_losses, test_losses

In [None]:
# # Example usage of LFROG - classification
# model = FeedforwardNN(input_dim=4, hidden_dim=16, output_dim=3)
# X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X, y, classification=True)

# X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train, dtype=torch.long)
# X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
# y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# train_losses, test_losses = train_lfrog(
#     model=model,
#     X_train=X_train_tensor,
#     y_train=y_train_tensor,
#     X_test=X_test_tensor,
#     y_test=y_test_tensor,
#     epochs=500,
#     dt=0.005,
#     max_step=0.05,
#     early_stopping_patience=50,  # Stop if no improvement for 50 epochs
#     min_improvement=1e-6,        # Minimum improvement threshold
#     print_every=25,
#     loss_fn=nn.CrossEntropyLoss()
# )
# print("Train Losses:", train_losses)
# print("Test Losses:", test_losses)

In [None]:
# # Example usage of LFROG - function approximation
# model = FeedforwardNN(input_dim=1, hidden_dim=16, output_dim=1)
# X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(X_sine, y_sine, classification=False)
# X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
# X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)    
# y_test_tensor = torch.tensor(y_test, dtype=torch.float32)
# train_losses, test_losses = train_lfrog(
#     model=model,
#     X_train=X_train_tensor,
#     y_train=y_train_tensor,
#     X_test=X_test_tensor,
#     y_test=y_test_tensor,   
#     epochs=500,
#     dt=0.005,
#     max_step=0.05,
#     early_stopping_patience=50,  # Stop if no improvement for 50 epochs
#     min_improvement=1e-6,        # Minimum improvement threshold
#     print_every=25,
#     loss_fn=nn.MSELoss()
# )
# print("Train Losses:", train_losses)
# print("Test Losses:", test_losses)

### Visualisatin and Results