In [None]:
# Shallower NN with trainable Chebyshev Activation Function - Deterministic Inpterpretation of NN - 39000 Parameters


import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import math
import joblib

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class ChebyshevActivation(nn.Module):
    def __init__(self, in_features, degree=15):
        super().__init__()
        self.degree = degree
        self.in_features = in_features
        
        # Learnable coefficients for each feature
        self.coeffs = nn.Parameter(torch.randn(in_features, degree + 1, dtype=torch.float32))
        
        self._initialize_parameters()

    def _initialize_parameters(self):
        """Initialize coefficients with appropriate scheme for numerical stability"""
        with torch.no_grad():
            # Use smaller std for stability, considering polynomial degree
            std = 0.01 / math.sqrt(self.degree + 1)
            nn.init.normal_(self.coeffs, mean=0.0, std=std)
            
            # Ensure first coefficient (constant term) is reasonable
            self.coeffs[:, 0].uniform_(-0.1, 0.1)

    def _chebyshev_basis(self, x):
        """Compute Chebyshev polynomial basis functions with numerical stability"""
        
        # Clamp input to prevent extreme polynomial values
        x = torch.clamp(x, -2.0, 2.0)
        
        # Compute polynomials using stable recurrence
        T_list = [torch.ones_like(x)]
        if self.degree >= 1:
            T_list.append(x)
            for k in range(1, self.degree):
                Tk = 2 * x * T_list[k] - T_list[k-1]
                # Clamp intermediate results to prevent overflow
                Tk = torch.clamp(Tk, -10.0, 10.0)
                T_list.append(Tk)
        
        return torch.stack(T_list, dim=-1)  

    def forward(self, x):
        # Compute Chebyshev basis
        basis = self._chebyshev_basis(x)  # (batch_size, in_features, degree+1)
        
        # Apply coefficients
        return torch.einsum('bfk,fk->bf', basis, self.coeffs)

class PolynomialRootsModel(nn.Module):
    def __init__(self, input_dim=6, hidden_dims=[64,128,128,64], output_dim=10, cheb_degree=10):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        
        # Create layers
        self.layers = nn.ModuleList()
        
        # Input layer
        self.layers.append(nn.Linear(input_dim, hidden_dims[0]))
        self.layers.append(ChebyshevActivation(hidden_dims[0], degree=cheb_degree))
        # self.layers.append(nn.BatchNorm1d(hidden_dims[0]))
        
        # Hidden layers
        for i in range(1, len(hidden_dims)):
            self.layers.append(nn.Linear(hidden_dims[i-1], hidden_dims[i]))
            self.layers.append(ChebyshevActivation(hidden_dims[i], degree=cheb_degree))
            # self.layers.append(nn.BatchNorm1d(hidden_dims[i]))
        
        # Output layer
        self.layers.append(nn.Linear(hidden_dims[-1], output_dim))
        
        # Initialize weights
        self._initialize_weights()
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm1d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

def extract_complex_parts(complex_value):
    """Extract real and imaginary parts from complex number or string"""
    try:
        if isinstance(complex_value, str):
            complex_value = complex_value.strip()
            complex_num = complex(complex_value)
        elif isinstance(complex_value, (int, float)):
            complex_num = complex(complex_value)
        else:
            complex_num = complex(complex_value)
        
        return np.real(complex_num), np.imag(complex_num)
    except (ValueError, TypeError):
        print(f"Warning: Could not parse complex number: {complex_value}")
        return 0.0, 0.0

def load_and_preprocess_data(file_path, test_size=0.2, val_size=0.25, random_state=42):
    """Load data from CSV, preprocess it, and split into train/val/test sets"""
    # Load data
    df = pd.read_csv(file_path)
    
    print(f"Dataset shape: {df.shape}")
    print(f"Dataset columns: {df.columns.tolist()}")
    print("\nFirst few rows:")
    print(df.head())
    
    # Extract input features (polynomial coefficients)
    input_cols = ['a', 'b', 'c', 'd', 'e', 'f']
    X = df[input_cols].values
    
    # Extract output features (roots)
    y = np.zeros((len(df), 10))
    
    for i in range(5):
        root_col = f'root_{i+1}'
        if root_col in df.columns:
            for j, complex_value in enumerate(df[root_col].values):
                real_part, imag_part = extract_complex_parts(complex_value)
                y[j, i] = real_part      # Real parts (columns 0-4)
                y[j, i+5] = imag_part    # Imaginary parts (columns 5-9)
    
    print(f"Input shape: {X.shape}")
    print(f"Output shape: {y.shape}")
    
    # Check for any NaN or infinite values
    print(f"NaN values in X: {np.isnan(X).sum()}")
    print(f"NaN values in y: {np.isnan(y).sum()}")
    print(f"Infinite values in X: {np.isinf(X).sum()}")
    print(f"Infinite values in y: {np.isinf(y).sum()}")
    
    # Clean data (remove NaN or infinite values)
    valid_indices = ~(np.isnan(X).any(axis=1) | np.isnan(y).any(axis=1) | 
                      np.isinf(X).any(axis=1) | np.isinf(y).any(axis=1))
    X = X[valid_indices]
    y = y[valid_indices]
    
    print(f"After cleaning - X shape: {X.shape}, y shape: {y.shape}")
    
    # Split and scale data
    X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=test_size, random_state=random_state)
    X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=val_size, random_state=random_state)
    
    print(f"Training set: {X_train.shape[0]} samples")
    print(f"Validation set: {X_val.shape[0]} samples")
    print(f"Test set: {X_test.shape[0]} samples")
    
    # Standardize features
    scaler_X = StandardScaler()
    scaler_y = StandardScaler()
    
    X_train_scaled = scaler_X.fit_transform(X_train)
    X_val_scaled = scaler_X.transform(X_val)
    X_test_scaled = scaler_X.transform(X_test)
    
    y_train_scaled = scaler_y.fit_transform(y_train)
    y_val_scaled = scaler_y.transform(y_val)
    y_test_scaled = scaler_y.transform(y_test)
    
    print("Data preprocessing completed successfully!")
    
    # Convert to PyTorch tensors and prepare DataLoaders
    train_dataset = TensorDataset(torch.FloatTensor(X_train_scaled), torch.FloatTensor(y_train_scaled))
    val_dataset = TensorDataset(torch.FloatTensor(X_val_scaled), torch.FloatTensor(y_val_scaled))
    test_dataset = TensorDataset(torch.FloatTensor(X_test_scaled), torch.FloatTensor(y_test_scaled))
    
    train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=512)
    test_loader = DataLoader(test_dataset, batch_size=512)
    
    return train_loader, val_loader, test_loader, scaler_X, scaler_y

def train_model(model, train_loader, val_loader, epochs=1000, lr=0.01, patience=5, 
               save_path='best_polynomial_roots_model.pt'):
    """Train the PyTorch model with early stopping and learning rate reduction"""
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.5, patience=7, verbose=True, min_lr=1e-7
    )
    
    best_val_loss = float('inf')
    patience_counter = 0
    
    print("Starting training...")
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        
        epoch_train_loss = running_loss / len(train_loader.dataset)
        
        # Validation phase
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                val_loss = criterion(outputs, targets)
                running_val_loss += val_loss.item() * inputs.size(0)
        
        epoch_val_loss = running_val_loss / len(val_loader.dataset)
        
        # Print progress
        print(f'Epoch {epoch+1}/{epochs} - '
              f'Train Loss: {epoch_train_loss:.6f} - '
              f'Val Loss: {epoch_val_loss:.6f}')
        
        # Learning rate scheduling
        scheduler.step(epoch_val_loss)
        
        # # Early stopping check
        # if epoch_val_loss < best_val_loss:
        #     best_val_loss = epoch_val_loss
        #     patience_counter = 0
        #     # Save the best model
        #     torch.save(model.state_dict(), save_path)
        #     print(f"Model saved at epoch {epoch+1} with validation loss: {best_val_loss:.6f}")
        # else:
        #     patience_counter += 1
        #     if patience_counter >= patience:
        #         print(f"Early stopping triggered after {epoch+1} epochs")
        #         break
    
    # Load the best model
    model.load_state_dict(torch.load(save_path, map_location=device))
    print("Training completed!")
    
    return model

def predict_polynomial_roots(model, coefficients, scaler_X, scaler_y):
    """
    Predict roots for a polynomial given coefficients [a,b,c,d,e,f]
    where polynomial is: a*x^5 + b*x^4 + c*x^3 + d*x^2 + e*x + f = 0
    """
    model.eval()
    
    # Ensure input is numpy array
    coeffs = np.array(coefficients).reshape(1, -1)
    
    # Scale the input
    coeffs_scaled = scaler_X.transform(coeffs)
    
    # Convert to tensor
    coeffs_tensor = torch.FloatTensor(coeffs_scaled).to(device)
    
    # Predict
    with torch.no_grad():
        prediction_scaled = model(coeffs_tensor)
    
    # Convert back to numpy
    prediction_scaled_np = prediction_scaled.cpu().numpy()
    
    # Inverse scale the output
    prediction = scaler_y.inverse_transform(prediction_scaled_np)
    
    # Reshape to get real and imaginary parts
    real_parts = prediction[0, :5]
    imag_parts = prediction[0, 5:]
    
    # Combine into complex roots
    roots = real_parts + 1j * imag_parts
    
    return roots, real_parts, imag_parts

def batch_predict_roots(model, coefficient_list, scaler_X, scaler_y):
    """Predict roots for multiple polynomials at once"""
    model.eval()
    
    # Convert to numpy array
    coeffs_array = np.array(coefficient_list)
    
    # Scale the inputs
    coeffs_scaled = scaler_X.transform(coeffs_array)
    
    # Convert to tensor
    coeffs_tensor = torch.FloatTensor(coeffs_scaled).to(device)
    
    # Predict
    with torch.no_grad():
        predictions_scaled = model(coeffs_tensor)
    
    # Convert back to numpy
    predictions_scaled_np = predictions_scaled.cpu().numpy()
    
    # Inverse scale
    predictions = scaler_y.inverse_transform(predictions_scaled_np)
    
    results = []
    for i, prediction in enumerate(predictions):
        real_parts = prediction[:5]
        imag_parts = prediction[5:]
        roots = real_parts + 1j * imag_parts
        results.append({
            'coefficients': coefficient_list[i],
            'roots': roots,
            'real_parts': real_parts,
            'imaginary_parts': imag_parts
        })
    
    return results

def verify_polynomial_roots(coeffs, roots, tolerance=1e-6):
    """Verify that the predicted roots actually satisfy the polynomial equation"""
    a, b, c, d, e, f = coeffs
    verification_results = []
    
    for i, root in enumerate(roots):
        # Calculate polynomial value at root
        poly_value = a*root**5 + b*root**4 + c*root**3 + d*root**2 + e*root + f
        error = abs(poly_value)
        is_valid = error < tolerance
        
        verification_results.append({
            'root_index': i+1,
            'root_value': root,
            'polynomial_value': poly_value,
            'error': error,
            'is_valid_root': is_valid
        })
    
    return verification_results

def save_model_components(model, scaler_X, scaler_y, model_path='polynomial_roots_model.pt', 
                         scaler_x_path='scaler_X.pkl', scaler_y_path='scaler_y.pkl'):
    """Save the model and scalers for future use"""
    # Save model
    torch.save(model.state_dict(), model_path)
    
    # Save scalers
    joblib.dump(scaler_X, scaler_x_path)
    joblib.dump(scaler_y, scaler_y_path)
    
    print("Model and scalers saved successfully!")

def load_model_components(model_path='polynomial_roots_model.pt',
                         scaler_x_path='scaler_X.pkl', scaler_y_path='scaler_y.pkl'):
    """Load the trained model and scalers for inference"""
    # Initialize model architecture
    model = PolynomialRootsModel().to(device)
    
    # Load model weights
    model.load_state_dict(torch.load(model_path, map_location=device))
    
    # Load scalers
    scaler_X = joblib.load(scaler_x_path)
    scaler_y = joblib.load(scaler_y_path)
    
    return model, scaler_X, scaler_y

# Main execution block
if __name__ == "__main__":
    # Load and preprocess data
    file_path = r"C:\Users\Akshay Patil\Desktop\Roots\R.csv"  # Update this path as needed
    train_loader, val_loader, test_loader, scaler_X, scaler_y = load_and_preprocess_data(file_path)
    
    # Initialize model
    model = PolynomialRootsModel(
        input_dim=6, 
        hidden_dims=[64,128,128,64], 
        output_dim=10, 
        cheb_degree=10
    ).to(device)
    
    # Display model architecture
    print("\nModel Architecture:")
    print(model)
    
    # Count parameters
    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"\nTotal parameters: {total_params:,}")
    print(f"Trainable parameters: {trainable_params:,}")
    
    # Train the model
    trained_model = train_model(
        model, 
        train_loader, 
        val_loader, 
        epochs=300, 
        lr=0.0001, 
        patience=1
    )
    
    # Evaluate on test set
    trained_model.eval()
    test_loss = 0.0
    test_mae = 0.0
    criterion = nn.MSELoss()
    mae_criterion = nn.L1Loss()
    
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = trained_model(inputs)
            test_loss += criterion(outputs, targets).item() * inputs.size(0)
            test_mae += mae_criterion(outputs, targets).item() * inputs.size(0)
    
    test_loss /= len(test_loader.dataset)
    test_mae /= len(test_loader.dataset)
    
    print(f"\nTest Results:")
    print(f"Test Loss (MSE): {test_loss:.6f}")
    print(f"Test MAE: {test_mae:.6f}")
    
    # Save model components
    save_model_components(trained_model, scaler_X, scaler_y)
    
    # Test prediction function
    sample_coeffs = [1, -5, 8, -4, 10, 11]
    predicted_roots, real_parts, imag_parts = predict_polynomial_roots(
        trained_model, sample_coeffs, scaler_X, scaler_y
    )
    
    print(f"\nSample Prediction:")
    print(f"Input polynomial coefficients: {sample_coeffs}")
    print(f"Predicted roots:")
    for i, root in enumerate(predicted_roots):
        print(f"  Root {i+1}: {root:.4f}")
    
    # Verify the roots
    verification = verify_polynomial_roots(sample_coeffs, predicted_roots)
    print("\nRoot verification:")
    for v in verification:
        status = "✓" if v['is_valid_root'] else "✗"
        print(f"  {status} Root {v['root_index']}: Error = {v['error']:.2e}")
    
    # Test batch prediction
    sample_batch = [
        [1, -5, 8, -4, 0, 0],
        [2, 1, -3, 0, 1, -2],
        [1, 0, 0, 0, 0, -1]
    ]
    
    batch_results = batch_predict_roots(trained_model, sample_batch, scaler_X, scaler_y)
    print("\nBatch prediction results:")
    for i, result in enumerate(batch_results):
        print(f"Polynomial {i+1}: {result['coefficients']}")
        print(f"Roots: {result['roots']}")
        
        # Verify the roots
        verification = verify_polynomial_roots(result['coefficients'], result['roots'])
        print("Root verification:")
        for v in verification:
            status = "✓" if v['is_valid_root'] else "✗"
            print(f"  {status} Root {v['root_index']}: Error = {v['error']:.2e}")
        print()
    
    print("Code execution completed successfully!")
    print("\nTo use the trained model in a new session:")
    print("1. model, scaler_X, scaler_y = load_model_components()")
    print("2. roots, _, _ = predict_polynomial_roots(model, [a,b,c,d,e,f], scaler_X, scaler_y)")