In [None]:
!pip install pymoo



In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.nn.init as init
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from math import sqrt
from torch.utils.data import TensorDataset, DataLoader
import math
import copy
import warnings

# Suppress matplotlib warnings for cleaner output during optimization
warnings.filterwarnings("ignore", category=UserWarning, module="matplotlib")

# --- Pymoo Imports ---
from pymoo.core.problem import Problem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.operators.sampling.rnd import FloatRandomSampling
from pymoo.termination import get_termination
from pymoo.optimize import minimize


# Set device to GPU if available, otherwise CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

#region ----------------- Model Architecture -----------------
class ChebySigmoidActivation(nn.Module):
    def __init__(self, alpha=1.0, beta=1.0, n=1.0):
        super().__init__()
        self.alpha = alpha
        self.beta = beta
        self.n = n

    def get_n_value(self):
        return self.n

    def forward(self, x):
        sigmoid_val = torch.sigmoid(x)
        tanh_beta_val = torch.tanh(self.beta * x)
        safe_tanh_beta_val = torch.clamp(tanh_beta_val, -1 + 1e-6, 1 - 1e-6)
        out = sigmoid_val + self.alpha * torch.cos(self.n * torch.acos(safe_tanh_beta_val)) * sigmoid_val * (1 - sigmoid_val)
        return out

class ChebyTanhActivation(nn.Module):
    def __init__(self, alpha=1.0, beta=1.0, n=1.0):
        super().__init__()
        self.alpha = alpha
        self.beta = beta
        self.n = n

    def get_n_value(self):
        return self.n

    def forward(self, x):
        tanh_beta_val = torch.tanh(self.beta * x)
        tanh_val = torch.tanh(x)
        sech_val = (1 / torch.cosh(x)) ** 2
        safe_tanh_beta_val = torch.clamp(tanh_beta_val, -1 + 1e-6, 1 - 1e-6)
        out = tanh_val + self.alpha * torch.cos(self.n * torch.acos(safe_tanh_beta_val)) * sech_val
        return out

class CustomLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size, alpha=1.0, beta=1.0, n=1.0):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.weight_ih = nn.Parameter(torch.empty(4 * hidden_size, input_size))
        self.weight_hh = nn.Parameter(torch.empty(4 * hidden_size, hidden_size))
        self.bias_ih = nn.Parameter(torch.zeros(4 * hidden_size))
        self.bias_hh = nn.Parameter(torch.zeros(4 * hidden_size))
        self.sigmoid_activation = ChebySigmoidActivation(alpha=alpha, beta=beta, n=n)
        self.tanh_activation = ChebyTanhActivation(alpha=alpha, beta=beta, n=n)

    def forward(self, input, hx):
        h_prev, c_prev = hx
        gates = F.linear(input, self.weight_ih, self.bias_ih) + F.linear(h_prev, self.weight_hh, self.bias_hh)
        ingate, forgetgate, cellgate, outgate = gates.chunk(4, dim=1)
        forgetgate = self.sigmoid_activation(forgetgate)
        ingate = self.sigmoid_activation(ingate)
        cellgate_candidate = self.tanh_activation(cellgate)
        outgate = self.sigmoid_activation(outgate)
        c_next = forgetgate * c_prev + ingate * cellgate_candidate
        h_next = outgate * self.tanh_activation(c_next)
        return h_next, (h_next, c_next)

    def get_n_values(self):
        return {
            'sigmoid_n': self.sigmoid_activation.get_n_value(),
            'tanh_n': self.tanh_activation.get_n_value()
        }

class CustomLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, alpha=1.0, beta=1.0, n=1.0):
        super().__init__()
        self.cell = CustomLSTMCell(input_size, hidden_size, alpha=alpha, beta=beta, n=n)
        self.hidden_size = hidden_size

    def forward(self, inputs, hx=None):
        inputs = inputs.transpose(0, 1)
        if hx is None:
            batch_size = inputs.size(1)
            h_0 = torch.zeros(batch_size, self.cell.hidden_size, device=inputs.device)
            c_0 = torch.zeros(batch_size, self.cell.hidden_size, device=inputs.device)
            hx = (h_0, c_0)
        h, c = hx
        outputs = []
        for t in range(inputs.size(0)):
            h, (h, c) = self.cell(inputs[t], (h, c))
            outputs.append(h)
        outputs = torch.stack(outputs).transpose(0, 1)
        return outputs, (h, c)

    def get_n_values(self):
        return self.cell.get_n_values()

class MultivariateLSTM(nn.Module):
    def __init__(self, input_size, hidden_size=50, output_size=1, alpha=1.0, beta=1.0, n=1.0):
        super().__init__()
        self.lstm = CustomLSTM(input_size, hidden_size, alpha=alpha, beta=beta, n=n)
        self.dropout = nn.Dropout(0.2)
        self.linear = nn.Linear(hidden_size, output_size)
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            init.xavier_uniform_(module.weight)
            if module.bias is not None: init.constant_(module.bias, 0)
        elif isinstance(module, CustomLSTMCell):
            init.xavier_uniform_(module.weight_ih)
            init.xavier_uniform_(module.weight_hh)
            init.constant_(module.bias_ih, 0)
            init.constant_(module.bias_hh, 0)

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        last_output = self.dropout(lstm_out[:, -1, :])
        predictions = self.linear(last_output)
        return predictions

    def get_n_values(self):
        return self.lstm.get_n_values()
#endregion

#region ----------------- Data & Training Pipeline -----------------
def create_multivariate_sequences(data, seq_length, feature_column_idx=None, target_column_idx=None):
    xs, ys = [], []
    if feature_column_idx is not None and not isinstance(feature_column_idx, list):
        feature_column_idx = [feature_column_idx]
    for i in range(len(data) - seq_length):
        x = data[i:i+seq_length, feature_column_idx] if feature_column_idx is not None else data[i:i+seq_length]
        y = data[i+seq_length, target_column_idx] if target_column_idx is not None else data[i+seq_length]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

def prepare_multivariate_data(data, seq_length=20, train_ratio=0.8, batch_size=32, feature_column_idx=None, target_column_idx=None):
    if isinstance(data, pd.DataFrame):
        data = data.values

    n_features = data.shape[1]
    input_size = len(feature_column_idx) if isinstance(feature_column_idx, list) else (1 if feature_column_idx is not None else n_features)
    output_size = len(target_column_idx) if isinstance(target_column_idx, list) else (1 if target_column_idx is not None else n_features)

    train_size = int(len(data) * train_ratio)
    scaler = MinMaxScaler(feature_range=(-1, 1))
    scaler.fit(data[:train_size])
    data_scaled = scaler.transform(data)

    X, y = create_multivariate_sequences(data_scaled, seq_length, feature_column_idx, target_column_idx)

    X_train, X_test = X[:train_size-seq_length], X[train_size-seq_length:]
    y_train, y_test = y[:train_size-seq_length], y[train_size-seq_length:]

    X_train = torch.FloatTensor(X_train).to(device)
    y_train = torch.FloatTensor(y_train).to(device)
    X_test = torch.FloatTensor(X_test).to(device)
    y_test = torch.FloatTensor(y_test).to(device)

    train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

    return {
        'train_loader': train_loader, 'test_loader': test_loader, 'scaler': scaler,
        'X_train': X_train, 'y_train': y_train, 'X_test': X_test, 'y_test': y_test,
        'input_size': input_size, 'output_size': output_size,
        'target_column_idx': target_column_idx
    }

def train_model(model, train_loader, test_loader, epochs=100, learning_rate=0.001, verbose=True):
    model = model.to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        model.train()
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

        if verbose and (epoch + 1) % 10 == 0:
            model.eval()
            test_loss = 0
            with torch.no_grad():
                for X_test_batch, y_test_batch in test_loader:
                    y_pred_test = model(X_test_batch)
                    test_loss += criterion(y_pred_test, y_test_batch).item()
            test_loss /= len(test_loader)
            print(f'Epoch {epoch+1}/{epochs}, Test Loss: {test_loss:.6f}')

    # Final evaluation to get test RMSE
    model.eval()
    all_preds, all_true = [], []
    with torch.no_grad():
        for X_test_batch, y_test_batch in test_loader:
            preds = model(X_test_batch).cpu().numpy()
            true = y_test_batch.cpu().numpy()
            all_preds.append(preds)
            all_true.append(true)

    test_rmse = sqrt(mean_squared_error(np.vstack(all_true), np.vstack(all_preds)))
    return test_rmse

def predict_all_data(model, data_dict, target_cols_to_predict=None):
    model.eval()
    with torch.no_grad():
        test_predictions_scaled = model(data_dict['X_test']).cpu().numpy()

    scaler = data_dict['scaler']
    all_target_idx = data_dict['target_column_idx']
    all_target_cols = [data_dict['scaler'].get_feature_names_out()[i] for i in all_target_idx]

    # Default to first column if nothing is specified
    if target_cols_to_predict is None:
        target_cols_to_predict = [all_target_cols[0]]

    # Find the indices within the output that we want to keep
    predict_indices_in_output = [all_target_cols.index(col) for col in target_cols_to_predict]

    # Create a full-size array for inverse transform
    dummy_array = np.zeros((test_predictions_scaled.shape[0], scaler.n_features_in_))
    # Place all predictions into their correct spots
    dummy_array[:, all_target_idx] = test_predictions_scaled
    # Inverse transform the whole array
    predictions_unscaled_all = scaler.inverse_transform(dummy_array)

    # Select only the columns we want to return
    final_predictions = predictions_unscaled_all[:, [all_target_idx[i] for i in predict_indices_in_output]]

    return final_predictions

def predict_and_visualize_multivariate(model, data_dict, title='Multivariate Time Series Forecasting', target_cols_to_plot=None):
    scaler = data_dict['scaler']
    all_target_idx = data_dict['target_column_idx']
    all_target_cols = [scaler.get_feature_names_out()[i] for i in all_target_idx]
    y_test_numpy = data_dict['y_test'].cpu().numpy()

    # Default to first column if nothing is specified
    if target_cols_to_plot is None:
        target_cols_to_plot = [all_target_cols[0]]

    predictions = predict_all_data(model, data_dict, target_cols_to_predict=target_cols_to_plot)

    # Get actual data for the same columns
    plot_indices_in_output = [all_target_cols.index(col) for col in target_cols_to_plot]
    dummy_array_true = np.zeros((len(y_test_numpy), scaler.n_features_in_))
    dummy_array_true[:, all_target_idx] = y_test_numpy
    actual = scaler.inverse_transform(dummy_array_true)[:, [all_target_idx[i] for i in plot_indices_in_output]]

    # --- Plotting ---
    plt.figure(figsize=(15, 7))
    colors = plt.cm.viridis(np.linspace(0, 1, len(target_cols_to_plot)))

    for i, col_name in enumerate(target_cols_to_plot):
        plt.plot(actual[:, i], label=f'Actual {col_name}', color=colors[i], alpha=0.8)
        plt.plot(predictions[:, i], label=f'Predicted {col_name}', color=colors[i], linestyle='--')

    test_rmse = sqrt(mean_squared_error(actual, predictions))
    plt.title(f'{title} - Test Set (Overall RMSE: {test_rmse:.4f})')
    plt.xlabel('Time Step')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True, which='both', linestyle='--', linewidth=0.5)
    plt.tight_layout()
    plt.show()

    return test_rmse

def load_multivariate_from_csv(csv_path, date_column=None):
    df = pd.read_csv(csv_path)
    if date_column and date_column in df.columns:
        df[date_column] = pd.to_datetime(df[date_column])
        df = df.sort_values(by=date_column)

    features_df = df.select_dtypes(include=np.number)
    if features_df.isnull().sum().sum() > 0:
        features_df = features_df.fillna(method='ffill').fillna(method='bfill')
        features_df = features_df.fillna(features_df.mean())

    return features_df

def train_from_csv(csv_path, feature_columns, target_column, date_column=None,
                   seq_length=20, hidden_size=50, epochs=100,
                   learning_rate=0.001, alpha=1.0, beta=1.0, n=1.0, graph=True):

    all_data = load_multivariate_from_csv(csv_path, date_column=date_column)

    feature_column_idx = [all_data.columns.get_loc(c) for c in feature_columns]
    target_column_idx = [all_data.columns.get_loc(c) for c in target_column] if isinstance(target_column, list) else all_data.columns.get_loc(target_column)

    data_dict = prepare_multivariate_data(
        all_data, seq_length=seq_length, feature_column_idx=feature_column_idx, target_column_idx=target_column_idx
    )

    model = MultivariateLSTM(
        input_size=data_dict['input_size'], hidden_size=hidden_size,
        output_size=data_dict['output_size'], alpha=alpha, beta=beta, n=n
    )

    test_rmse = train_model(model, data_dict['train_loader'], data_dict['test_loader'], epochs=epochs, learning_rate=learning_rate, verbose=graph)

    if graph:
      target_str = ", ".join(target_column) if isinstance(target_column, list) else target_column
      title = f'Forecasting {target_str} using {",".join(feature_columns)}'
      predict_and_visualize_multivariate(model, data_dict, title=title, target_cols_to_plot=target_column)

    return model, test_rmse
#endregion

#region ----------------- Lyapunov Exponent Calculation -----------------
def compute_lyapunov_multivariate(initial_sequence, model, steps=1000, delta=1e-5):
    seq_len, num_features = initial_sequence.shape
    model.eval()

    x0 = initial_sequence.reshape(1, seq_len, num_features).astype(np.float32)

    perturb = np.random.randn(*x0.shape)
    perturb /= np.linalg.norm(perturb)
    x1 = x0 + perturb * delta
    d0 = np.linalg.norm(x1 - x0)

    divergences = []
    for _ in range(steps):
        with torch.no_grad():
            x0_tensor = torch.tensor(x0, dtype=torch.float32, device=device)
            x1_tensor = torch.tensor(x1, dtype=torch.float32, device=device)

            y0 = model(x0_tensor).cpu().numpy().reshape(1, 1, -1) # Output shape (1, 1, output_size)
            y1 = model(x1_tensor).cpu().numpy().reshape(1, 1, -1)

        # Assuming output_size matches num_features for recurrence
        if y0.shape[2] != num_features:
            raise ValueError(f"Model output size ({y0.shape[2]}) must match number of features ({num_features}) for Lyapunov calculation.")

        d = np.linalg.norm(y1 - y0)

        if d > 1e-12:
            divergences.append(np.log(d / d0))
            # Renormalize
            x1_pred_renorm = y0 + (d0 / d) * (y1 - y0)
        else:
            # Re-perturb if trajectories collapse
            new_perturb = np.random.randn(*y0.shape)
            new_perturb /= np.linalg.norm(new_perturb)
            x1_pred_renorm = y0 + d0 * new_perturb

        # Roll window
        x0 = np.append(x0[:, 1:, :], y0, axis=1)
        x1 = np.append(x1[:, 1:, :], x1_pred_renorm, axis=1)

    divergences = np.array(divergences)
    if len(divergences) < 2: return 0.0 # Not enough data for a fit

    t_vals = np.arange(len(divergences))
    coeffs = np.polyfit(t_vals, divergences, 1)
    lyap = coeffs[0]

    return lyap * 100 # Scaling for better numerical stability/readability
#endregion

#region ----------------- Pymoo Optimization Problem -----------------

# --- CONFIGURATION ---
CSV_PATH = 'lorenz5.csv'
SEQ_LENGTH = 60
HIDDEN_SIZE = 64
EPOCHS = 1
POPULATION_SIZE = 5
GENERATIONS = 5

# Load data once to get initial sequence for Lyapunov calculation
FULL_DATA = load_multivariate_from_csv(CSV_PATH)
FEATURE_COLS = ['x', 'y', 'z']
TARGET_COL = ['x', 'y', 'z']
INITIAL_SEQUENCE = FULL_DATA[FEATURE_COLS].values[:SEQ_LENGTH]


class PymooProblem(Problem):
    def __init__(self):
        super().__init__(
            n_var=3,          # Number of variables: alpha, beta, n
            n_obj=2,          # Number of objectives: min RMSE, max Lyapunov
            n_constr=0,       # Number of constraints
            xl=np.array([-1.0, 0.1, 1]),   # Lower bounds for [alpha, beta, n]
            xu=np.array([1.0, 2.0, 5])     # Upper bounds for [alpha, beta, n]
        )

    def _evaluate(self, x, out, *args, **kwargs):
        # x is a 2D array where each row is a solution [alpha, beta, n]
        objectives = []
        for solution in x:
            alpha, beta, n_float = solution
            n_int = int(round(n_float)) # n must be an integer for the model

            print(f"Evaluating: α={alpha:.3f}, β={beta:.3f}, n={n_int}")

            # 1. Train the model
            model, test_rmse = train_from_csv(
                csv_path=CSV_PATH,
                feature_columns=FEATURE_COLS,
                target_column=TARGET_COL,
                seq_length=SEQ_LENGTH,
                hidden_size=HIDDEN_SIZE,
                epochs=EPOCHS,
                alpha=alpha,
                beta=beta,
                n=n_int,
                graph=False # No plotting during optimization
            )

            # 2. Compute Lyapunov Exponent
            lyap = compute_lyapunov_multivariate(INITIAL_SEQUENCE, model)

            # Pymoo minimizes objectives, so we negate lyapunov to maximize it
            objectives.append([test_rmse, -lyap])
            print(f"Result: RMSE={test_rmse:.4f}, Lyapunov={lyap:.4f}")


        out["F"] = np.array(objectives)

# --- ALGORITHM SETUP ---
problem = PymooProblem()

algorithm = NSGA2(
    pop_size=POPULATION_SIZE,
    n_offsprings=10,
    sampling=FloatRandomSampling(),
    crossover=SBX(prob=0.9, eta=15),
    mutation=PM(eta=20),
    eliminate_duplicates=True
)

termination = get_termination("n_gen", GENERATIONS)

# --- RUN OPTIMIZATION ---
print("\n--- Starting Pymoo NSGA-II Optimization ---")
res = minimize(
    problem,
    algorithm,
    termination,
    seed=1,
    save_history=True,
    verbose=True
)
print("--- Optimization Finished ---")

# --- PROCESS RESULTS ---
# F is the objective space (results), X is the design space (parameters)
F = res.F
X = res.X

# Find the best solution: the one with the lowest RMSE (first objective)
best_solution_index = np.argmin(F[:, 0])
best_params = X[best_solution_index]
best_results = F[best_solution_index]

best_alpha, best_beta, best_n_float = best_params
best_n = int(round(best_n_float))
best_rmse, neg_best_lyap = best_results

print("\n--- Best Parameters Found ---")
print(f"Alpha: {best_alpha:.4f}")
print(f"Beta: {best_beta:.4f}")
print(f"N: {best_n}")
print(f"Corresponding Test RMSE: {best_rmse:.4f}")
print(f"Corresponding Lyapunov Exponent: {-neg_best_lyap:.4f}")
print("----------------------------\n")


# --- RETRAIN AND VISUALIZE THE BEST MODEL ---
print("--- Training final model with best parameters and visualizing... ---")
train_from_csv(
    csv_path=CSV_PATH,
    feature_columns=FEATURE_COLS,
    target_column=TARGET_COL,
    seq_length=SEQ_LENGTH,
    hidden_size=HIDDEN_SIZE,
    epochs=EPOCHS,  # Train for longer on the best model
    learning_rate=0.0005,
    alpha=best_alpha,
    beta=best_beta,
    n=best_n,
    graph=True # Enable graphing for the final run
)
#endregion

Using device: cpu
PyTorch version: 2.8.0+cu126
CUDA available: False

--- Starting Pymoo NSGA-II Optimization ---
Evaluating: α=-0.166, β=1.469, n=1
Result: RMSE=0.0291, Lyapunov=0.0121
Evaluating: α=-0.395, β=0.379, n=1
Result: RMSE=0.0118, Lyapunov=-0.0004
Evaluating: α=-0.627, β=0.757, n=3
Result: RMSE=0.0408, Lyapunov=0.0086
Evaluating: α=0.078, β=0.896, n=4
Result: RMSE=0.0171, Lyapunov=0.0090
Evaluating: α=-0.591, β=1.768, n=1
Result: RMSE=0.3574, Lyapunov=-0.0404
n_gen  |  n_eval  | n_nds  |      eps      |   indicator  
     1 |        5 |      3 |             - |             -
Evaluating: α=-0.166, β=1.346, n=1
Result: RMSE=0.0127, Lyapunov=0.0150
Evaluating: α=-0.395, β=0.407, n=1
Result: RMSE=0.0137, Lyapunov=0.0046
Evaluating: α=-0.172, β=0.455, n=1
Result: RMSE=0.0200, Lyapunov=0.0067
Evaluating: α=0.182, β=1.469, n=1
Result: RMSE=0.0262, Lyapunov=0.0033
Evaluating: α=-0.404, β=1.060, n=1
Result: RMSE=0.0341, Lyapunov=0.0019
Evaluating: α=-0.395, β=0.379, n=1
Result: RMSE=

AttributeError: 'numpy.ndarray' object has no attribute 'linear'