In [None]:
# ================================================================
# 9)  FGSM ADVERSARIAL ROBUSTNESS TEST (inputs -> maximize MSE)
#     - Untargeted FGSM: x_adv = clip(x + eps*sign(dL/dx))
#     - Works for regression / sequence-to-sequence MSE
# ================================================================

def fgsm_attack_batch(model, x, y, eps, clip_min=0.0, clip_max=1.0):
    """
    FGSM for a batch.
    x: [B,T,D]  (should be float32)
    y: [B,T,1]
    Returns:
      x_adv: [B,T,D]
      loss_clean: scalar
      loss_adv: scalar  (computed after crafting x_adv)
      grad_linf: scalar (max |grad|)
    """
    # Ensure inputs are float32
    x = tf.cast(x, dtype=tf.float32)
    y = tf.cast(y, dtype=tf.float32)

    x_var = tf.Variable(x)

    with tf.GradientTape() as tape:
        y_hat = model(x_var, training=False)
        loss = tf.reduce_mean(tf.square(y - y_hat))  # MSE over batch+time
    grad = tape.gradient(loss, x_var)

    # FGSM step
    x_adv = x_var + eps * tf.sign(grad)
    x_adv = tf.clip_by_value(x_adv, clip_min, clip_max)

    # recompute losses
    y_hat_clean = model(x, training=False)
    loss_clean = tf.reduce_mean(tf.square(y - y_hat_clean))

    y_hat_adv = model(x_adv, training=False)
    loss_adv = tf.reduce_mean(tf.square(y - y_hat_adv))

    grad_linf = tf.reduce_max(tf.abs(grad))
    return x_adv, loss_clean, loss_adv, grad_linf


def fgsm_sweep_eval(model, X, Y, eps_list, batch_size=16, clip_min=0.0, clip_max=1.0):
    """
    Evaluate clean MSE and adversarial MSE for a sweep of eps values.
    X, Y can be tf.Tensor or numpy arrays with shapes:
      X: [N,T,D], Y: [N,T,1]
    Returns dict with per-eps metrics.
    """
    # Ensure inputs are float32
    X = tf.cast(X, dtype=tf.float32)
    Y = tf.cast(Y, dtype=tf.float32)

    ds = tf.data.Dataset.from_tensor_slices((X, Y)).batch(batch_size)

    # clean baseline once
    clean_mse_accum, n_batches = 0.0, 0
    for xb, yb in ds:
        yhat = model(xb, training=False)
        clean_mse_accum += tf.reduce_mean(tf.square(yb - yhat))
        n_batches += 1
    clean_mse = float(clean_mse_accum / tf.cast(n_batches, tf.float32))

    results = {"clean_mse": clean_mse, "per_eps": {}}

    for eps in eps_list:
        adv_mse_accum = 0.0
        grad_linf_accum = 0.0
        n_batches = 0

        for xb, yb in ds:
            _, _, loss_adv, grad_linf = fgsm_attack_batch(
                model, xb, yb, eps=float(eps), clip_min=clip_min, clip_max=clip_max
            )
            adv_mse_accum += loss_adv
            grad_linf_accum += grad_linf
            n_batches += 1

        adv_mse = float(adv_mse_accum / tf.cast(n_batches, tf.float32))
        grad_linf_mean = float(grad_linf_accum / tf.cast(n_batches, tf.float32))

        results["per_eps"][float(eps)] = {
            "adv_mse": adv_mse,
            "mse_ratio_adv_over_clean": adv_mse / (clean_mse + 1e-12),
            "mean_grad_linf": grad_linf_mean,
        }

    return results


def craft_fgsm(model, X, Y, epsilon=0.1, relative=True, eps_floor=1e-12, batch_size=64):
    """
    Create FGSM adversarial examples for the whole dataset.
    - epsilon: if relative=True, scales by per-sample, per-feature std over time.
               if relative=False, used as absolute value.
    Returns: X_adv (same shape as X)
    """
    # Ensure inputs are float32
    X = tf.cast(X, dtype=tf.float32)
    Y = tf.cast(Y, dtype=tf.float32)

    N = tf.shape(X)[0]
    Din = X.shape[-1]
    X_adv_chunks = []

    # Compute per-sample, per-feature std over time for relative scaling
    if relative:
        # std over time axis=1 -> shape (N, 1, Din)
        std_t = tf.math.reduce_std(X, axis=1, keepdims=True) # X is already float32
        eps_abs_full = epsilon * tf.maximum(std_t, eps_floor)
    else:
        eps_abs_full = epsilon

    ds = tf.data.Dataset.from_tensor_slices((X, Y)).batch(batch_size)
    idx = 0
    for xb, yb in ds:
        if relative:
            # slice matching current batch
            eps_b = eps_abs_full[idx: idx + tf.shape(xb)[0]]
        else:
            eps_b = epsilon
        # xb and yb are already tf.float32 from the dataset created from tf.cast(X, tf.float32)
        x_adv_b = _fgsm_batch(model, xb, yb, tf.cast(eps_b, tf.float32))
        X_adv_chunks.append(x_adv_b.numpy())
        idx += tf.shape(xb)[0]
    X_adv = np.concatenate(X_adv_chunks, axis=0)
    return X_adv

def plot_fgsm_example(model, X, Y, sample_idx=0, eps=0.02, clip_min=0.0, clip_max=1.0):
    """
    Plot one sample: clean prediction vs adversarial prediction, plus input perturbation magnitude.
    """
    # Ensure inputs are float32
    x = tf.cast(X[sample_idx:sample_idx+1], dtype=tf.float32)  # [1,T,D]
    y = tf.cast(Y[sample_idx:sample_idx+1], dtype=tf.float32)  # [1,T,1]

    x_adv, loss_clean, loss_adv, grad_linf = fgsm_attack_batch(
        model, x, y, eps=float(eps), clip_min=clip_min, clip_max=clip_max
    )

    yhat_clean = model(x, training=False).numpy().squeeze()
    yhat_adv   = model(x_adv, training=False).numpy().squeeze()
    ytrue      = y.numpy().squeeze()

    dx = (x_adv - x).numpy().squeeze()  # [T,D]
    dx_linf_t = np.max(np.abs(dx), axis=-1)  # [T]

    plt.figure(figsize=(10,4))
    plt.title(f"FGSM example | idx={sample_idx} eps={eps} | clean_mse={float(loss_clean):.3e} adv_mse={float(loss_adv):.3e}")
    plt.plot(ytrue, label="target", linewidth=2)
    plt.plot(yhat_clean, ":", label="pred (clean)", linewidth=2)
    plt.plot(yhat_adv, "--", label="pred (FGSM)", linewidth=2)
    plt.grid(True); plt.legend(); plt.show()

    plt.figure(figsize=(10,3))
    plt.title(f"Input perturbation per time-step (Linf over features) | mean={dx_linf_t.mean():.3e}, max={dx_linf_t.max():.3e}, grad_linf={float(grad_linf):.3e}")
    plt.plot(dx_linf_t, linewidth=1.5)
    plt.grid(True); plt.show()


# ----------------- Run FGSM test on TEST split -----------------
# eps is in your normalized [0,1] input space. Try small values first.
eps_list = [0.0, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2]

fgsm_results = fgsm_sweep_eval(
    model,
    Xte_tf, Yte_tf,
    eps_list=eps_list,
    batch_size=16,
    clip_min=0.0, clip_max=1.0
)

print("\n=== FGSM ROBUSTNESS (TEST) ===")
print(f"Clean MSE: {fgsm_results['clean_mse']:.4e}")
print("{:>8s} {:>12s} {:>12s} {:>12s}".format("eps", "adv_mse", "ratio", "mean|grad|inf"))
print("-"*54)
for eps in eps_list:
    r = fgsm_results["per_eps"][float(eps)]
    print("{:8.3g} {:12.4e} {:12.4f} {:12.4e}".format(
        eps, r["adv_mse"], r["mse_ratio_adv_over_clean"], r["mean_grad_linf"]
    ))

# Plot one illustrative sample
plot_fgsm_example(
    model,
    Xte_tf.numpy(), Yte_tf.numpy(),
    sample_idx=0,
    eps=0.02,
    clip_min=0.0, clip_max=1.0
)


In [None]:
# ================================================================
# BLOCK 1: IMPORTS & SETUP
# ================================================================
import os
import gc
import time
import csv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy.interpolate import InterpolatedUnivariateSpline
from tensorflow import keras
from tensorflow.keras import layers

# Set random seed for reproducibility
tf.random.set_seed(42)
np.random.seed(42)

print(f"TensorFlow Version: {tf.__version__}")


In [None]:

# ================================================================
# BLOCK 2: DATA PROCESSING HELPERS
# ================================================================
def read_split(path):
    """Read all txt files: columns are t, in1, in2, out.
    Removes the first data row (index 0) and header."""
    X, Y, T = [], [], []
    mins, maxs = {'in1':[], 'in2':[], 'out':[]}, {'in1':[], 'in2':[], 'out':[]}
    
    if not os.path.exists(path):
        print(f"Path not found: {path}")
        return [], [], [], {}, {}

    for fn in sorted(os.listdir(path)):
        fp = os.path.join(path, fn)
        if not os.path.isfile(fp):
            continue
            
        try:
            # skiprows=1 for header.
            data = np.loadtxt(fp, unpack=True, skiprows=1)
        except Exception as e:
            print(f"Error reading {fn}: {e}")
            continue

        if data.shape[0] < 4:
            continue

        # Data assignment (assuming format: Time, In1, In2, ..., Out)
        t   = data[0]
        in1 = data[1]
        in2 = data[2]
        out = data[3] 

        # --- FIX: Check for NaNs ---
        if np.any(np.isnan(in1)) or np.any(np.isnan(in2)) or np.any(np.isnan(out)):
            continue
        
        # --- REQUIREMENT: Remove the first row of data ---
        # We slice [1:] to skip the first time step
        t   = t[1:]
        in1 = in1[1:]
        in2 = in2[1:]
        out = out[1:]

        X.append(np.stack([in1, in2], axis=-1))   # (L,2)
        Y.append(out.reshape(-1,1))               # (L,1)
        T.append(t)
        
        for k, v in zip(['in1','in2','out'], [in1,in2,out]):
            mins[k].append(v.min()); maxs[k].append(v.max())
            
    return X, Y, T, mins, maxs

def normalize(X, Y, gmins, gmaxs):
    """Scale to [0,1] using global mins/maxs."""
    imin = np.array([gmins['in1'], gmins['in2']], dtype=np.float32)
    imax = np.array([gmaxs['in1'], gmaxs['in2']], dtype=np.float32)
    omin = np.float32(gmins['out'])
    omax = np.float32(gmaxs['out'])

    Xn, Yn = [], []
    for x, y in zip(X, Y):
        x = np.asarray(x, dtype=np.float32)
        y = np.asarray(y, dtype=np.float32)
        
        denom_in = imax - imin
        denom_in[denom_in == 0] = 1.0
        denom_out = omax - omin
        if denom_out == 0: denom_out = 1.0
        
        Xn.append((x - imin) / denom_in)
        Yn.append((y - omin) / denom_out)
    return Xn, Yn

def resample_split(X, Y, T, L):
    """Resample sequences to fixed length L using Splines."""
    Xr, Yr, Tr = [], [], []
    for x, y, t in zip(X, Y, T):
        xo = np.arange(len(x))
        xn = np.linspace(0, len(x)-1, L)
        
        rx = np.stack([InterpolatedUnivariateSpline(xo, x[:,c], k=2)(xn) for c in range(2)], axis=-1)
        ry = InterpolatedUnivariateSpline(xo, y[:,0], k=2)(xn).reshape(-1,1)
        rt = InterpolatedUnivariateSpline(xo, t, k=2)(xn)
        
        Xr.append(rx)
        Yr.append(ry)
        Tr.append(rt)
    return np.array(Xr, dtype=np.float32), np.array(Yr, dtype=np.float32), np.array(Tr, dtype=np.float32)



In [None]:
# ================================================================
# BLOCK 3: CUSTOM VANILLA GRU CELL (FIXED SERIALIZATION)
# ================================================================
# Use tf.keras.utils for compatibility with older/newer TF versions
@tf.keras.utils.register_keras_serializable(package="MyLayers")
class VanillaGRUCell(layers.Layer):
    """
    Vanilla GRU Cell from scratch.
    Weights are exposed as self.kernel and self.recurrent_kernel.
    """
    def __init__(self, units, init_min=-0.5, init_max=0.5, **kwargs):
        super(VanillaGRUCell, self).__init__(**kwargs)
        self.units = units
        self.state_size = units
        self.init_min = init_min
        self.init_max = init_max

    def build(self, input_shape):
        input_dim = input_shape[-1]
        
        # Initialize weights between [-5, 5] as requested
        initializer = tf.random_uniform_initializer(minval=self.init_min, maxval=self.init_max)

        # Kernel: [Input -> Gates/Candidate]
        # Concatenated order: z (update), r (reset), h (candidate)
        self.kernel = self.add_weight(
            shape=(input_dim, self.units * 3),
            initializer=initializer,
            name='kernel'
        )
        
        # Recurrent Kernel: [Hidden -> Gates/Candidate]
        self.recurrent_kernel = self.add_weight(
            shape=(self.units, self.units * 3),
            initializer=initializer,
            name='recurrent_kernel'
        )

        self.bias = self.add_weight(
            shape=(self.units * 3,),
            initializer='zeros',
            name='bias'
        )
        self.built = True

    def call(self, inputs, states):
        h_tm1 = states[0]  # Previous state

        # 1. Linear Matrix Multiplications
        x_k = tf.matmul(inputs, self.kernel) 
        h_k = tf.matmul(h_tm1, self.recurrent_kernel)
        
        # 2. Split into components (z, r, h)
        x_z, x_r, x_h = tf.split(x_k, 3, axis=1)
        h_z, h_r, h_h = tf.split(h_k, 3, axis=1)
        b_z, b_r, b_h = tf.split(self.bias, 3, axis=0)

        # 3. Gates
        z = tf.nn.sigmoid(x_z + h_z + b_z) # Update gate
        r = tf.nn.sigmoid(x_r + h_r + b_r) # Reset gate
        
        # 4. Candidate State (Vanilla GRU: U_h * (r . h_prev))
        h_tilde = tf.nn.tanh(x_h + (r * h_h) + b_h)

        # 5. Update State
        h = (1 - z) * h_tm1 + z * h_tilde
        
        return h, [h]

    def get_config(self):
        config = super(VanillaGRUCell, self).get_config()
        config.update({
            "units": self.units,
            "init_min": self.init_min,
            "init_max": self.init_max
        })
        return config



In [None]:
# ================================================================
# BLOCK 4: PARAMETER TRACKER CALLBACK
# ================================================================
class ParameterTracker(keras.callbacks.Callback):
    def __init__(self, filepath='param_convergence.csv'):
        super(ParameterTracker, self).__init__()
        self.filepath = filepath

    def on_train_begin(self, logs=None):
        with open(self.filepath, 'w', newline='') as f:
            writer = csv.writer(f)
            # Epoch + 10 parameters
            headers = ['epoch'] + [f'p{i}' for i in range(10)]
            writer.writerow(headers)

    def on_epoch_end(self, epoch, logs=None):
        try:
            # Access weights from the first Bidirectional layer
            # layer[0] -> Bidirectional
            bi_layer = self.model.layers[1] # Usually layer 0 is Input, layer 1 is BiGRU
            if not isinstance(bi_layer, layers.Bidirectional):
                # Fallback search
                for l in self.model.layers:
                    if isinstance(l, layers.Bidirectional):
                        bi_layer = l
                        break
            
            all_weights = bi_layer.weights
            
            # Flatten the first weight matrix and take 10 elements
            if len(all_weights) > 0:
                w_flat = tf.reshape(all_weights[0], [-1]).numpy()
                params = w_flat[:10]
                
                with open(self.filepath, 'a', newline='') as f:
                    writer = csv.writer(f)
                    writer.writerow([epoch] + params.tolist())
        except Exception as e:
            print(f"Tracking warning: {e}")



In [None]:
# ================================================================
# BLOCK 5: MODEL BUILDER (FIXED)
# ================================================================
def build_custom_bi_gru(seq_len, n_inputs, hidden_units=16):
    inp = keras.Input(shape=(seq_len, n_inputs))
    
    # Instantiate distinct cells for forward/backward
    fwd_cell_1 = VanillaGRUCell(hidden_units, init_min=-0.5, init_max=0.5)
    bwd_cell_1 = VanillaGRUCell(hidden_units, init_min=-0.5, init_max=0.5)
    
    # Layer 1
    # FIX: We must explicitly set go_backwards=True for the backward layer
    x = layers.Bidirectional(
        layers.RNN(fwd_cell_1, return_sequences=True),
        backward_layer=layers.RNN(bwd_cell_1, return_sequences=True, go_backwards=True)
    )(inp)
    
    # Instantiate distinct cells for Layer 2
    fwd_cell_2 = VanillaGRUCell(hidden_units, init_min=-0.5, init_max=0.5)
    bwd_cell_2 = VanillaGRUCell(hidden_units, init_min=-0.5, init_max=0.5)
    
    # Layer 2
    x = layers.Bidirectional(
        layers.RNN(fwd_cell_2, return_sequences=True),
        backward_layer=layers.RNN(bwd_cell_2, return_sequences=True, go_backwards=True)
    )(x)

    out = layers.Dense(1)(x)
    
    model = keras.Model(inp, out)
    model.compile(
        loss='mse',
        optimizer=keras.optimizers.Adam(learning_rate=1e-3, clipnorm=1.0),
        metrics=['mae', 'mse']
    )
    return model



In [None]:
# ================================================================
# BLOCK 6: MAIN EXECUTION & EVALUATION
# ================================================================
# Define paths
train_dir = '/kaggle/input/bi-directional-gru-ltspice-example/Data !fig24_31_1/Train'
val_dir   = '/kaggle/input/bi-directional-gru-ltspice-example/Data !fig24_31_1/Validation'
test_dir  = '/kaggle/input/bi-directional-gru-ltspice-example/Data !fig24_31_1/Test'

print("1. Loading and Preprocessing Data...")
Xtr_raw, Ytr_raw, Ttr_raw, mins_tr, maxs_tr = read_split(train_dir)
Xva_raw, Yva_raw, Tva_raw, _, _             = read_split(val_dir)
Xte_raw, Yte_raw, Tte_raw, _, _             = read_split(test_dir)

# Normalize
gmins = {k: np.min(mins_tr[k]) for k in mins_tr}
gmaxs = {k: np.max(maxs_tr[k]) for k in maxs_tr}
Xtr_n, Ytr_n = normalize(Xtr_raw, Ytr_raw, gmins, gmaxs)
Xva_n, Yva_n = normalize(Xva_raw, Yva_raw, gmins, gmaxs)
Xte_n, Yte_n = normalize(Xte_raw, Yte_raw, gmins, gmaxs)

# Resample
SEQ_LEN = 400
Xtr, Ytr, Ttr = resample_split(Xtr_n, Ytr_n, Ttr_raw, SEQ_LEN)
Xva, Yva, Tva = resample_split(Xva_n, Yva_n, Tva_raw, SEQ_LEN)
Xte, Yte, Tte = resample_split(Xte_n, Yte_n, Tte_raw, SEQ_LEN)

print(f"   Train shape: {Xtr.shape}")
print(f"   Test shape:  {Xte.shape}")

# Build
print("\n2. Building Model...")
model = build_custom_bi_gru(SEQ_LEN, 2, hidden_units=16)
model.summary()



In [None]:
# Callbacks
csv_file = 'param_convergence.csv'
csv_logger = ParameterTracker(csv_file)
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)

# Train
print("\n3. Starting Training...")
start_time = time.time()
history = model.fit(
    Xtr, Ytr,
    validation_data=(Xva, Yva),
    epochs=20, 
    batch_size=16,
    callbacks=[csv_logger, early_stop],
    verbose=1
)
total_train_time = time.time() - start_time

# Evaluation
print("\n=== FINAL EVALUATION REPORT ===")

# Metrics
train_loss, train_mae, train_mse = model.evaluate(Xtr, Ytr, verbose=0)
test_loss, test_mae, test_mse = model.evaluate(Xte, Yte, verbose=0)

# Inference Time
t0 = time.perf_counter()
_ = model.predict(Xte[:1], verbose=0)
inf_time_ms = (time.perf_counter() - t0) * 1000 

print(f"{'Metric':<20} | {'Train':<12} | {'Test':<12}")
print("-" * 50)
print(f"{'MSE':<20} | {train_mse:.5e}  | {test_mse:.5e}")
print(f"{'MAE':<20} | {train_mae:.5e}  | {test_mae:.5e}")
print("-" * 50)
print(f"Total Train Time:       {total_train_time:.2f} s")
print(f"Inference Time (1 seq): {inf_time_ms:.2f} ms")



In [None]:
# Plot 1: Parameter Convergence
if os.path.exists(csv_file):
    try:
        df_params = pd.read_csv(csv_file)
        plt.figure(figsize=(10, 5))
        # Plot first 2 tracked parameters
        plt.plot(df_params['epoch'], df_params['p0'], label='Weight[0]', marker='o')
        plt.plot(df_params['epoch'], df_params['p1'], label='Weight[1]', marker='x')
        plt.title('Parameter Convergence (Vanilla BiGRU)')
        plt.xlabel('Epoch')
        plt.ylabel('Weight Value')
        plt.legend()
        plt.grid(True)
        plt.show()
    except Exception as e:
        print(f"Plot error: {e}")


In [None]:

# Plot 2: Prediction vs True
def plot_sample(X, Y, T, idx=0):
    if len(X) == 0: return
    pred = model.predict(X[idx:idx+1], verbose=0)[0]
    plt.figure(figsize=(10, 4))
    plt.plot(T[idx], Y[idx], label='True', linewidth=2)
    plt.plot(T[idx], pred, label='Pred', linestyle='--', linewidth=2)
    plt.title(f'Test Sample {idx} Prediction')
    plt.legend()
    plt.grid(True)
    plt.show()

print("\nVisualizing Test Sample...")
plot_sample(Xte, Yte, Tte, idx=0)

In [None]:
# ================================================================
# BLOCK 6: EXECUTION & PHASE-PLANE PLOTTING
# ================================================================
# ... (Previous loading/training code remains the same) ...

# 1. Load Data & Normalize (Run previous blocks if needed)
# ...

# 2. Build & Train
print("\nStarting Training...")
model = build_custom_bi_gru(SEQ_LEN, 2, 32)
csv_file = 'param_convergence.csv'
csv_logger = ParameterTracker(csv_file)

history = model.fit(
    Xtr, Ytr,
    validation_data=(Xva, Yva),
    epochs=20, 
    batch_size=16,
    callbacks=[csv_logger],
    verbose=1
)

# 3. PHASE-PLANE PLOT (Param 0 vs Param 1)
try:
    if os.path.exists(csv_file):
        df = pd.read_csv(csv_file)
        
        if not df.empty and 'p0' in df.columns and 'p1' in df.columns:
            plt.figure(figsize=(8, 8))
            
            # Scatter plot with color mapping to Epoch
            # This shows the TRAJECTORY of convergence
            sc = plt.scatter(df['p0'], df['p1'], c=df['epoch'], cmap='viridis', s=50, zorder=2)
            
            # Connect points with a line to show the path
            plt.plot(df['p0'], df['p1'], c='gray', alpha=0.5, linestyle='--', zorder=1)
            
            # Mark Start and End
            plt.plot(df['p0'].iloc[0], df['p1'].iloc[0], 'rx', markersize=12, label='Start', zorder=3)
            plt.plot(df['p0'].iloc[-1], df['p1'].iloc[-1], 'g*', markersize=15, label='End', zorder=3)
            
            plt.title('Parameter Convergence Trajectory\n(Weight 0 vs Weight 1)')
            plt.xlabel('Parameter 0 value')
            plt.ylabel('Parameter 1 value')
            plt.colorbar(sc, label='Epoch')
            plt.legend()
            plt.grid(True)
            plt.show()
            
            print("Plot generated: The line shows the path the weights took during optimization.")
        else:
            print("CSV is empty or missing columns.")
    else:
        print("CSV file not found.")
except Exception as e:
    print(f"Plotting error: {e}")

In [None]:
# ================================================================
# 1. IMPORTS & SETUP
# ================================================================
import os
import gc
import csv
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy.interpolate import InterpolatedUnivariateSpline
from tensorflow import keras
from tensorflow.keras import layers

# Set global seed for reproducibility of the *sequence* of experiments
# (Each experiment will still get a different random seed)
np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow Version: {tf.__version__}")

# ================================================================
# 2. DATA PROCESSING (Robust)
# ================================================================
def read_and_process_data(base_path):
    """Reads, cleans, normalizes, and resamples the dataset."""
    train_dir = os.path.join(base_path, 'Train')
    test_dir = os.path.join(base_path, 'Test')
    
    # 1. Read Raw
    def _read(path):
        X, Y, T = [], [], []
        mins, maxs = {'in1':[], 'in2':[], 'out':[]}, {'in1':[], 'in2':[], 'out':[]}
        if not os.path.exists(path): return [], [], [], {}, {}
        
        for fn in sorted(os.listdir(path)):
            fp = os.path.join(path, fn)
            if not os.path.isfile(fp): continue
            try:
                data = np.loadtxt(fp, unpack=True, skiprows=1)
                if data.shape[0] < 4 or data.shape[1] < 10: continue
                
                # Skip first row
                t, in1, in2, out = data[0][1:], data[1][1:], data[2][1:], data[3][1:]
                
                # Check NaNs
                if not (np.all(np.isfinite(in1)) and np.all(np.isfinite(in2)) and np.all(np.isfinite(out))):
                    continue
                    
                X.append(np.stack([in1, in2], axis=-1))
                Y.append(out.reshape(-1,1))
                T.append(t)
                
                mins['in1'].append(in1.min()); maxs['in1'].append(in1.max())
                mins['in2'].append(in2.min()); maxs['in2'].append(in2.max())
                mins['out'].append(out.min()); maxs['out'].append(out.max())
            except: continue
        return X, Y, T, mins, maxs

    print("Reading Data...")
    Xtr, Ytr, Ttr, mins_tr, maxs_tr = _read(train_dir)
    Xte, Yte, Tte, _, _ = _read(test_dir)
    
    if len(Xtr) == 0: raise ValueError("No training data found!")

    # 2. Normalize
    gmins = {k: np.min(mins_tr[k]) for k in mins_tr}
    gmaxs = {k: np.max(maxs_tr[k]) for k in maxs_tr}
    
    def _norm(X, Y):
        epsilon = 1e-8
        imin = np.array([gmins['in1'], gmins['in2']], dtype=np.float32)
        imax = np.array([gmaxs['in1'], gmaxs['in2']], dtype=np.float32)
        omin, omax = np.float32(gmins['out']), np.float32(gmaxs['out'])
        
        Xn, Yn = [], []
        for x, y in zip(X, Y):
            d_in = imax - imin
            d_in[d_in < epsilon] = 1.0
            d_out = omax - omin
            if d_out < epsilon: d_out = 1.0
            
            Xn.append((x - imin) / d_in)
            Yn.append((y - omin) / d_out)
        return Xn, Yn

    Xtr_n, Ytr_n = _norm(Xtr, Ytr)
    Xte_n, Yte_n = _norm(Xte, Yte)

    # 3. Resample
    SEQ_LEN = 400
    def _resample(X, Y, T):
        Xr, Yr = [], []
        for x, y, t in zip(X, Y, T):
            try:
                xo, xn = np.arange(len(x)), np.linspace(0, len(x)-1, SEQ_LEN)
                rx = np.stack([InterpolatedUnivariateSpline(xo, x[:,c], k=2)(xn) for c in range(2)], axis=-1)
                ry = InterpolatedUnivariateSpline(xo, y[:,0], k=2)(xn).reshape(-1,1)
                Xr.append(rx); Yr.append(ry)
            except: continue
        return np.array(Xr, dtype=np.float32), np.array(Yr, dtype=np.float32)

    Xtr_f, Ytr_f = _resample(Xtr_n, Ytr_n, Ttr)
    Xte_f, Yte_f = _resample(Xte_n, Yte_n, Tte)
    
    # Final Safety Clean
    Xtr_f = np.nan_to_num(Xtr_f); Ytr_f = np.nan_to_num(Ytr_f)
    Xte_f = np.nan_to_num(Xte_f); Yte_f = np.nan_to_num(Yte_f)
    
    return Xtr_f, Ytr_f, Xte_f, Yte_f, SEQ_LEN


In [None]:

# ================================================================
# 3. CUSTOM MODEL COMPONENTS
# ================================================================
@tf.keras.utils.register_keras_serializable(package="MyLayers")
class VanillaGRUCell(layers.Layer):
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.state_size = units

    def build(self, input_shape):
        input_dim = input_shape[-1]
        
        # We will set the initializer in the loop later, or use Glorot here
        # For this experiment, we rely on the kernel_initializer passed 
        # to the layer constructor if supported, or we manually set weights.
        # But for custom cells, usually we define init in build. 
        # To support random restarts, we will use a random seed in the initializer.
        
        self.kernel = self.add_weight(shape=(input_dim, self.units * 3),
                                      initializer='glorot_uniform', name='kernel')
        self.recurrent_kernel = self.add_weight(shape=(self.units, self.units * 3),
                                                initializer='glorot_uniform', name='recurrent_kernel')
        self.bias = self.add_weight(shape=(self.units * 3,), initializer='zeros', name='bias')
        self.built = True

    def call(self, inputs, states):
        h_tm1 = states[0]
        x_k = tf.matmul(inputs, self.kernel)
        h_k = tf.matmul(h_tm1, self.recurrent_kernel)
        
        x_z, x_r, x_h = tf.split(x_k, 3, axis=1)
        h_z, h_r, h_h = tf.split(h_k, 3, axis=1)
        b_z, b_r, b_h = tf.split(self.bias, 3, axis=0)

        z = tf.nn.sigmoid(x_z + h_z + b_z)
        r = tf.nn.sigmoid(x_r + h_r + b_r)
        h_tilde = tf.nn.tanh(x_h + (r * h_h) + b_h)
        h = (1 - z) * h_tm1 + z * h_tilde
        return h, [h]
    
    def get_config(self):
        return super().get_config() | {"units": self.units}

class ParamLogger(keras.callbacks.Callback):
    """Logs params to a list in memory for the loop"""
    def __init__(self, run_id, store_dict):
        super().__init__()
        self.run_id = run_id
        self.store = store_dict
        self.store[run_id] = {'epoch':[], 'p0':[], 'p1':[]}

    def on_epoch_end(self, epoch, logs=None):
        try:
            # Extract weights from the first layer's forward cell
            # Path: Bidirectional -> ForwardRNN -> Cell -> Kernel
            bi_layer = self.model.layers[1] 
            # In Keras Bidirectional, forward_layer is an RNN
            w = bi_layer.forward_layer.cell.kernel.numpy().flatten()
            
            self.store[self.run_id]['epoch'].append(epoch)
            self.store[self.run_id]['p0'].append(w[0])
            self.store[self.run_id]['p1'].append(w[1])
        except Exception as e:
            print(f"Log Error: {e}")

# ================================================================
# 4. EXPERIMENT LOOP
# ================================================================
def run_experiments(X, Y, n_runs=20, epochs=15):
    results = {} # To store trajectories
    
    for i in range(n_runs):
        print(f"--- Starting Run {i+1}/{n_runs} ---")
        
        # 1. Force Clear Session to reset randomness and memory
        keras.backend.clear_session()
        gc.collect()
        
        # 2. Set distinct seed for this run
        # This ensures 'random_uniform' or 'glorot' produces different values
        run_seed = 42 + i
        tf.random.set_seed(run_seed)
        np.random.seed(run_seed)
        
        # 3. Build Model
        # Input
        inp = keras.Input(shape=(X.shape[1], X.shape[2]))
        
        # IMPORTANT: To get different initializations, we rely on the 
        # tf.random.set_seed call above which affects layer build()
        l1 = layers.Bidirectional(
            layers.RNN(VanillaGRUCell(32), return_sequences=True),
            backward_layer=layers.RNN(VanillaGRUCell(32), return_sequences=True, go_backwards=True)
        )(inp)
        
        out = layers.Dense(1)(l1)
        model = keras.Model(inp, out)
        
        # Gradient Clipping to prevent NaN on bad inits
        model.compile(loss='mse', optimizer=keras.optimizers.Adam(1e-3, clipnorm=1.0))
        
        # 4. Train with Logger
        logger = ParamLogger(run_id=f"run_{i}", store_dict=results)
        
        # Use a small subset or fewer epochs if speed is an issue
        model.fit(X, Y, epochs=epochs, batch_size=32, verbose=0, callbacks=[logger])
        
        # Print final loss to ensure it didn't diverge
        final_loss = model.evaluate(X, Y, verbose=0)
        print(f"Run {i+1} Final MSE: {final_loss:.5f}")
        
    return results

# ================================================================
# 5. EXECUTION & PLOTTING
# ================================================================

# A. Load Data
base_path = '/kaggle/input/bi-directional-gru-ltspice-example/Data !fig24_31_1'
try:
    Xtr, Ytr, Xte, Yte, SEQ_LEN = read_and_process_data(base_path)
    print(f"Data Loaded: {Xtr.shape}")

    # B. Run Loop
    experiment_data = run_experiments(Xtr, Ytr, n_runs=20, epochs=15)

    # C. Plot
    plt.figure(figsize=(10, 8))
    
    # Color map
    colors = plt.cm.jet(np.linspace(0, 1, 20))
    
    for idx, (run_id, data) in enumerate(experiment_data.items()):
        p0 = data['p0']
        p1 = data['p1']
        
        if len(p0) == 0: continue
            
        # Plot the trajectory line
        plt.plot(p0, p1, color=colors[idx], alpha=0.6, linewidth=1.5)
        
        # Mark Start (Square) and End (Circle)
        plt.scatter(p0[0], p1[0], color=colors[idx], marker='s', s=40, label='Start' if idx==0 else "")
        plt.scatter(p0[-1], p1[-1], color=colors[idx], marker='o', s=60, label='End' if idx==0 else "")

    plt.title(f'Convergence Trajectories of 20 Random Initializations\n(Parameter 0 vs Parameter 1)')
    plt.xlabel('Parameter 0 Value')
    plt.ylabel('Parameter 1 Value')
    plt.grid(True, linestyle='--', alpha=0.5)
    
    # Custom legend
    from matplotlib.lines import Line2D
    legend_elements = [Line2D([0], [0], marker='s', color='k', label='Initialization (Start)', markerfacecolor='k', markersize=8, linestyle='None'),
                       Line2D([0], [0], marker='o', color='k', label='Converged (End)', markerfacecolor='k', markersize=10, linestyle='None')]
    plt.legend(handles=legend_elements, loc='best')
    
    plt.show()

except Exception as e:
    print(f"Execution Failed: {e}")

In [None]:
# ================================================================
# 1. SETUP
# ================================================================
import os
import gc
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy.interpolate import InterpolatedUnivariateSpline
from tensorflow import keras
from tensorflow.keras import layers

# Fix random seeds for reproducibility of the *experiment set*
np.random.seed(42)
tf.random.set_seed(42)

# ================================================================
# 2. DATA LOADING (Robust)
# ================================================================
def get_clean_data(base_path):
    # (Simplified loader from previous steps)
    train_dir = os.path.join(base_path, 'Train')
    if not os.path.exists(train_dir): raise ValueError("Path not found!")
    
    X, Y = [], []
    for fn in sorted(os.listdir(train_dir)):
        try:
            fp = os.path.join(train_dir, fn)
            data = np.loadtxt(fp, unpack=True, skiprows=1)
            # Skip first row & check size
            if data.shape[1] < 10: continue
            in1, in2, out = data[1][1:], data[2][1:], data[3][1:]
            
            # Simple Spline Resample to 600 steps
            L = 600
            xo, xn = np.arange(len(in1)), np.linspace(0, len(in1)-1, L)
            r1 = InterpolatedUnivariateSpline(xo, in1, k=2)(xn)
            r2 = InterpolatedUnivariateSpline(xo, in2, k=2)(xn)
            ro = InterpolatedUnivariateSpline(xo, out, k=2)(xn)
            
            X.append(np.stack([r1, r2], axis=-1))
            Y.append(ro.reshape(-1,1))
        except: continue
        
    X, Y = np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32)
    # Check NaNs
    X = np.nan_to_num(X)
    Y = np.nan_to_num(Y)
    return X, Y

# ================================================================
# 3. MODEL DEFINITION
# ================================================================
@tf.keras.utils.register_keras_serializable(package="MyLayers")
class VanillaGRUCell(layers.Layer):
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.state_size = units

    def build(self, input_shape):
        # We initialize widely [-1.5, 1.5] to create distinct start points for the plot
        init = tf.random_uniform_initializer(minval=-1.5, maxval=1.5)
        
        self.kernel = self.add_weight(shape=(input_shape[-1], self.units * 3),
                                      initializer=init, name='kernel')
        self.recurrent_kernel = self.add_weight(shape=(self.units, self.units * 3),
                                                initializer=init, name='recurrent_kernel')
        self.bias = self.add_weight(shape=(self.units * 3,), initializer='zeros', name='bias')
        self.built = True

    def call(self, inputs, states):
        h_tm1 = states[0]
        x_k = tf.matmul(inputs, self.kernel)
        h_k = tf.matmul(h_tm1, self.recurrent_kernel)
        
        x_z, x_r, x_h = tf.split(x_k, 3, axis=1)
        h_z, h_r, h_h = tf.split(h_k, 3, axis=1)
        b_z, b_r, b_h = tf.split(self.bias, 3, axis=0)

        z = tf.nn.sigmoid(x_z + h_z + b_z)
        r = tf.nn.sigmoid(x_r + h_r + b_r)
        h_tilde = tf.nn.tanh(x_h + (r * h_h) + b_h)
        h = (1 - z) * h_tm1 + z * h_tilde
        return h, [h]
    
    def get_config(self):
        return super().get_config() | {"units": self.units}


In [None]:

# ================================================================
# 4. TRAINING LOOP WITH TRACKING
# ================================================================
class TrajectoryTracker(keras.callbacks.Callback):
    def __init__(self, history_list):
        self.history = history_list # Reference to external list
        
    def on_epoch_end(self, epoch, logs=None):
        # Grab two specific weights to plot (Weight[0] and Weight[1] of first layer)
        # We track the same two indices across all runs
        try:
            # Path: BiDir -> ForwardRNN -> Cell -> Kernel
            w = self.model.layers[1].forward_layer.cell.kernel.numpy().flatten()
            self.history.append((w[0], w[1]))
        except: pass

def run_20_inits(X, Y):
    all_trajectories = [] # List of lists. Each inner list is one run [(x0,y0), (x1,y1)...]
    
    n_runs = 20
    print(f"Starting {n_runs} training sessions...")
    
    for i in range(n_runs):
        # 1. Clear memory
        keras.backend.clear_session()
        tf.random.set_seed(42 + i) # Change seed to get new start point
        
        # 2. Build Model
        inp = keras.Input(shape=(600, 2))
        l1 = layers.Bidirectional(
            layers.RNN(VanillaGRUCell(32), return_sequences=True),
            backward_layer=layers.RNN(VanillaGRUCell(32), return_sequences=True, go_backwards=True)
        )(inp)
        out = layers.Dense(1)(l1)
        model = keras.Model(inp, out)
        
        # Clipnorm is essential for the wide initialization
        model.compile(loss='mse', optimizer=keras.optimizers.Adam(2e-3, clipnorm=1.0))
        
        # 3. Track weights
        run_path = [] # Stores (w1, w2) for this specific run
        
        # Record INITIAL state (Epoch 0)
        # We need to run one dummy step to initialize weights (build model)
        model.predict(X[:1], verbose=0) 
        w_init = model.layers[1].forward_layer.cell.kernel.numpy().flatten()
        run_path.append((w_init[0], w_init[1]))
        
        # Train
        model.fit(X, Y, epochs=15, batch_size=32, verbose=0, 
                  callbacks=[TrajectoryTracker(run_path)])
        
        all_trajectories.append(run_path)
        print(f"Run {i+1}/{n_runs} complete.")
        
    return all_trajectories

# ================================================================
# 5. EXECUTION & VISUALIZATION
# ================================================================
# Path config
base_path = '/kaggle/input/bi-directional-gru-ltspice-example/Data !fig24_31_1'

try:
    # 1. Load
    X, Y = get_clean_data(base_path)
    print(f"Data Shape: {X.shape}")

    # 2. Run Experiments
    trajectories = run_20_inits(X, Y)

    # 3. Plot (Matching the User's Image Style)
    plt.figure(figsize=(10, 8))
    
    # Plotting loop
    for run_data in trajectories:
        # Unzip list of tuples [(x,y), (x,y)] -> [x,x], [y,y]
        xs, ys = zip(*run_data)
        
        # Draw the trajectory line (Black lines as per paper style)
        plt.plot(xs, ys, color='black', linewidth=1.2, alpha=0.8)
        
        # Draw the Equilibrium point (End of training)
        # We use a black dot for the final point
        plt.plot(xs[-1], ys[-1], 'ko', markersize=6) 

    # Formatting to match the paper
    plt.title('Training Trajectories of 20 Random Initializations', fontsize=14)
    plt.xlabel('Parameter X1', fontsize=12)
    plt.ylabel('Parameter X2', fontsize=12)
    
    # Create a custom legend manually
    from matplotlib.lines import Line2D
    legend_elements = [
        Line2D([0], [0], color='black', lw=1.5, label='Trajectory'),
        Line2D([0], [0], marker='o', color='w', label='Equilibrium point',
               markerfacecolor='black', markersize=8),
    ]
    plt.legend(handles=legend_elements, loc='best')
    
    plt.grid(True, linestyle='--', alpha=0.5)
    plt.tight_layout()
    plt.show()
    
except Exception as e:
    print(f"Error: {e}")

In [None]:
# ================================================================
# 1. IMPORTS & SETUP
# ================================================================
import os
import gc
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from scipy.interpolate import InterpolatedUnivariateSpline
from tensorflow import keras
from tensorflow.keras import layers

# Set seeds for reproducibility of the *experiment set*
np.random.seed(42)
tf.random.set_seed(42)

# ================================================================
# 2. DATA LOADING (Robust)
# ================================================================
def get_clean_data(base_path):
    train_dir = os.path.join(base_path, 'Train')
    if not os.path.exists(train_dir): 
        # Fallback for Kaggle paths if strictly necessary, otherwise raise
        print(f"Warning: Path {train_dir} not found.")
        return np.zeros((10, 600, 2)), np.zeros((10, 600, 1))
    
    X, Y = [], []
    for fn in sorted(os.listdir(train_dir)):
        try:
            fp = os.path.join(train_dir, fn)
            data = np.loadtxt(fp, unpack=True, skiprows=1)
            if data.shape[1] < 10: continue
            in1, in2, out = data[1][1:], data[2][1:], data[3][1:]
            
            # Simple Spline Resample
            L = 600
            xo, xn = np.arange(len(in1)), np.linspace(0, len(in1)-1, L)
            r1 = InterpolatedUnivariateSpline(xo, in1, k=2)(xn)
            r2 = InterpolatedUnivariateSpline(xo, in2, k=2)(xn)
            ro = InterpolatedUnivariateSpline(xo, out, k=2)(xn)
            
            X.append(np.stack([r1, r2], axis=-1))
            Y.append(ro.reshape(-1,1))
        except: continue
        
    X, Y = np.array(X, dtype=np.float32), np.array(Y, dtype=np.float32)
    return np.nan_to_num(X), np.nan_to_num(Y)

# ================================================================
# 3. MODEL DEFINITION
# ================================================================
@tf.keras.utils.register_keras_serializable(package="MyLayers")
class VanillaGRUCell(layers.Layer):
    def __init__(self, units, **kwargs):
        super().__init__(**kwargs)
        self.units = units
        self.state_size = units

    def build(self, input_shape):
        # Wide initialization [-1.5, 1.5] to separate the start points visually
        init = tf.random_uniform_initializer(minval=-1.5, maxval=1.5)
        
        self.kernel = self.add_weight(shape=(input_shape[-1], self.units * 3),
                                      initializer=init, name='kernel')
        self.recurrent_kernel = self.add_weight(shape=(self.units, self.units * 3),
                                                initializer=init, name='recurrent_kernel')
        self.bias = self.add_weight(shape=(self.units * 3,), initializer='zeros', name='bias')
        self.built = True

    def call(self, inputs, states):
        h_tm1 = states[0]
        x_k = tf.matmul(inputs, self.kernel)
        h_k = tf.matmul(h_tm1, self.recurrent_kernel)
        
        x_z, x_r, x_h = tf.split(x_k, 3, axis=1)
        h_z, h_r, h_h = tf.split(h_k, 3, axis=1)
        b_z, b_r, b_h = tf.split(self.bias, 3, axis=0)

        z = tf.nn.sigmoid(x_z + h_z + b_z)
        r = tf.nn.sigmoid(x_r + h_r + b_r)
        h_tilde = tf.nn.tanh(x_h + (r * h_h) + b_h)
        h = (1 - z) * h_tm1 + z * h_tilde
        return h, [h]
    
    def get_config(self):
        return super().get_config() | {"units": self.units}

# ================================================================
# 4. ROBUST TRAJECTORY TRACKER
# ================================================================
def get_first_kernel_weights(model):
    """Helper to safely find the first kernel weight regardless of layer structure."""
    for layer in model.layers:
        # Check if it's bidirectional
        if hasattr(layer, 'forward_layer'):
            # Access the forward RNN layer
            if hasattr(layer.forward_layer, 'cell'):
                return layer.forward_layer.cell.kernel.numpy().flatten()
    # Fallback: just return the first trainable weight of the model
    if len(model.trainable_weights) > 0:
        return model.trainable_weights[0].numpy().flatten()
    return np.array([0.0, 0.0])

class TrajectoryTracker(keras.callbacks.Callback):
    def __init__(self, history_list):
        self.history = history_list 
        
    def on_epoch_end(self, epoch, logs=None):
        # Capture weights at end of every epoch
        w = get_first_kernel_weights(self.model)
        self.history.append((w[0], w[1]))

def run_20_inits(X, Y):
    all_trajectories = [] 
    
    n_runs = 20
    print(f"Starting {n_runs} training sessions...")
    
    for i in range(n_runs):
        keras.backend.clear_session()
        tf.random.set_seed(42 + i) # Distinct seed for initialization
        
        # Build Model
        inp = keras.Input(shape=(600, 2))
        l1 = layers.Bidirectional(
            layers.RNN(VanillaGRUCell(32), return_sequences=True),
            backward_layer=layers.RNN(VanillaGRUCell(32), return_sequences=True, go_backwards=True)
        )(inp)
        out = layers.Dense(1)(l1)
        model = keras.Model(inp, out)
        
        # Use simple SGD or Adam. SGD sometimes gives smoother "curves" for visualization.
        model.compile(loss='mse', optimizer=keras.optimizers.Adam(5e-3, clipnorm=1.0))
        
        # Track weights
        run_path = [] 
        
        # 1. Capture INITIAL weights (Epoch 0)
        # Force model build by passing one sample
        model.predict(X[:1], verbose=0) 
        w_init = get_first_kernel_weights(model)
        run_path.append((w_init[0], w_init[1]))
        
        # 2. Train
        # Reduced to 10 epochs for cleaner visualization lines
        model.fit(X, Y, epochs=15, batch_size=32, verbose=0, 
                  callbacks=[TrajectoryTracker(run_path)])
        
        if len(run_path) < 2:
            print(f"Warning: Run {i} captured too few points.")
            
        all_trajectories.append(run_path)
        print(f"Run {i+1}/{n_runs} complete. Points captured: {len(run_path)}")
        
    return all_trajectories

# ================================================================
# 5. EXECUTION & PLOTTING
# ================================================================
base_path = '/kaggle/input/bi-directional-gru-ltspice-example/Data !fig24_31_1'

# 1. Load Data
X, Y = get_clean_data(base_path)

if len(X) > 0:
    # 2. Run Experiments
    trajectories = run_20_inits(X, Y)

    # 3. Plot
    plt.figure(figsize=(10, 8))
    
    for i, run_data in enumerate(trajectories):
        if len(run_data) < 2: continue
        
        # Unzip trajectory to X and Y coords
        xs, ys = zip(*run_data)
        
        # Plot the Line (Trajectory)
        plt.plot(xs, ys, color='black', linewidth=1.5, alpha=0.7)
        
        # Plot the End Point (Equilibrium)
        plt.plot(xs[-1], ys[-1], 'ko', markersize=6, zorder=3) 

    # Styling to match the paper
    plt.title('Training Trajectories of 20 Random Initializations', fontsize=14)
    plt.xlabel('Parameter X1 (Weight 0)', fontsize=12)
    plt.ylabel('Parameter X2 (Weight 1)', fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.5)
    
    # Custom Legend
    from matplotlib.lines import Line2D
    legend_elements = [
        Line2D([0], [0], color='black', lw=1.5, label='Trajectory'),
        Line2D([0], [0], marker='o', color='w', label='Equilibrium point',
               markerfacecolor='black', markersize=8),
    ]
    plt.legend(handles=legend_elements, loc='upper right')
    
    plt.tight_layout()
    plt.show()
else:
    print("No data found to run experiments.")

Starting 20 training sessions...
Run 1/20 complete. Points captured: 16
Run 2/20 complete. Points captured: 16
Run 3/20 complete. Points captured: 16
Run 4/20 complete. Points captured: 16
Run 5/20 complete. Points captured: 16
Run 6/20 complete. Points captured: 16
