In [1]:
%matplotlib widget
import numpy as np
import matplotlib.pyplot as plt
import math
import os
from matplotlib import rcParams
from ForceMapping import forceMapping as fm
from sklearn.preprocessing import MinMaxScaler
import optuna
from sklearn.metrics import mean_squared_error
np.set_printoptions(suppress=True)
from itertools import product
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
import pickle

### Load the data

In [7]:
period = 140

In [8]:
# keep all limbs has same amount of abnormal samples
data_normal = np.loadtxt('data_for_train.csv', delimiter=',')
RF_ab = np.load('RF_limb.npy')
LF_ab = np.load('LF_limb.npy')
LH_ab = np.load('LH_limb.npy')
RH_ab = np.load('RH_limb.npy')[period*3:,:]


In [12]:
RH_ab.shape[0]

4200

In [13]:
data_for_predict = np.vstack((data_normal, RF_ab, LF_ab, LH_ab, RH_ab))

### Load the model

#### ESN model

In [15]:
esn_rf, esn_lf, esn_lh, esn_rh = load_esn_models()

#### MLP model

In [16]:
model_tags = ["RF_Z", "RF_Y", "LF_Z", "LF_Y", "LH_Z", "LH_Y", "RH_Z", "RH_Y"]
models = load_named_models(MLP, model_tags, base_path="./Model/", device='cpu')

### Prediction

#### ESN prediction

In [30]:
# ESN prediction
rf_esn_prediction = esn_rf.predict(data_for_predict[:, 16:20])
lf_esn_prediction = esn_lf.predict(data_for_predict[:, 20:24])
lh_esn_prediction = esn_lh.predict(data_for_predict[:, 24:28])
rh_esn_prediction = esn_rh.predict(data_for_predict[:, 28:32])
# retrieve leading signal for MLP prediction
rf_leading, rf_behind = split_by_half_period(rf_esn_prediction,period,70,70)
lf_leading, lf_behind = split_by_half_period(lf_esn_prediction,period,70,70)
lh_leading, lh_behind = split_by_half_period(lh_esn_prediction,period,70,70)
rh_leading, rh_behind = split_by_half_period(rh_esn_prediction,period,70,70)

In [34]:
# resplit to indiviual sample# select y to train as an example
'''Z'''
# rf
rf_in_z = reshape_to_samples(rf_leading[:,0])
# lf
lf_in_z = reshape_to_samples(lf_leading[:,0])
# lh
lh_in_z = reshape_to_samples(lh_leading[:,0])
# rh
rh_in_z = reshape_to_samples(rh_leading[:,0])

'''Y'''
# rf
rf_in_y = reshape_to_samples(rf_leading[:,1])
# lf
lf_in_y = reshape_to_samples(lf_leading[:,1])
# lh
lh_in_y = reshape_to_samples(lh_leading[:,1])
# lh
rh_in_y = reshape_to_samples(rh_leading[:,1])

#### MLP prediction

In [35]:
# rf signals 
RF_Z_latter_half = np.array([predict(x, models["RF_Z"]) for x in rf_in_z])
RF_Y_latter_half = np.array([predict(x, models["RF_Y"]) for x in rf_in_y])
# lf signals 
LF_Z_latter_half = np.array([predict(x, models["LF_Z"]) for x in lf_in_z])
LF_Y_latter_half = np.array([predict(x, models["LF_Y"]) for x in lf_in_y])



#### Utility

In [2]:
class MLP(nn.Module):
    def __init__(self, input_size=70, hidden_size=32, output_size=70):
        super(MLP, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )

    def forward(self, x):
        return self.model(x)
def load_named_models(model_class, model_tags, base_path="./", device='cpu'):
    """
    Load multiple named models (e.g., MLP_RF_Z.pth) into a dictionary.

    Parameters:
    - model_class: class definition of the model (e.g., MLP)
    - model_tags: list of tags like ["RF_Z", "RF_Y", "LF_Z", "LF_Y"]
    - base_path: folder where model files are located
    - device: 'cpu' or 'cuda'

    Returns:
    - Dictionary of loaded models, e.g., models["RF_Z"] -> model
    """
    models = {}
    for tag in model_tags:
        model = model_class()
        path = f"{base_path}MLP_{tag}.pth"
        state_dict = torch.load(path, map_location=torch.device(device))
        model.load_state_dict(state_dict)
        model.to(device)
        model.eval()
        models[tag] = model
    return models
def predict(input_array, model, scaler_X=None, scaler_Y=None):
    model.eval()
    x = input_array.reshape(1, -1)
    x_tensor = torch.tensor(x, dtype=torch.float32)
    with torch.no_grad():
        y_tensor = model(x_tensor)
    y = y_tensor.cpu().numpy().squeeze()
   
    return y

In [3]:
class ESN:
    def __init__(self, input_size, reservoir_size, output_size, 
                 spectral_radius=0.95, sparsity=0.1, leak_rate=0.9, seed=42):
        if seed is not None:
            np.random.seed(seed)

        self.input_size = input_size
        self.reservoir_size = reservoir_size
        self.output_size = output_size
        self.leak_rate = leak_rate

        # Input weights
        self.Win = np.random.uniform(-1, 1, (reservoir_size, input_size))

        # Reservoir weights
        W = np.random.rand(reservoir_size, reservoir_size) - 0.5
        mask = np.random.rand(*W.shape) < sparsity
        W *= mask  # sparsify
        eigvals = np.max(np.abs(np.linalg.eigvals(W)))
        self.Wres = W * (spectral_radius / eigvals)

        # Output weights (trained later)
        self.Wout = None

        self.state = np.zeros((reservoir_size,))

    def _update_state(self, u):
        pre_activation = np.dot(self.Win, u) + np.dot(self.Wres, self.state)
        new_state = np.tanh(pre_activation)
        self.state = (1 - self.leak_rate) * self.state + self.leak_rate * new_state
        return self.state

    def fit(self, inputs, targets, washout=50, ridge_lambda=1e-6):
        states = []
        for u in inputs:
            state = self._update_state(u)
            states.append(state)

        states = np.array(states)
        states_washed = states[washout:]
        targets_washed = targets[washout:]

        # Add bias term
        extended_states = np.hstack([states_washed, np.ones((states_washed.shape[0], 1))])
        
        # Ridge regression
        self.Wout = np.dot(np.linalg.pinv(extended_states), targets_washed)

    def predict(self, inputs):
        outputs = []
        for u in inputs:
            state = self._update_state(u)
            extended_state = np.concatenate([state, [1]])  # Add bias
            y = np.dot(extended_state, self.Wout)
            outputs.append(y)
        return np.array(outputs)
        
def load_esn_models(model_dir='./Model'):
    model_names = ['RF', 'LF', 'LH', 'RH']
    esn_models = {}

    for name in model_names:
        file_path = f'{model_dir}/esn_{name}.pkl'
        with open(file_path, 'rb') as f:
            esn_models[name] = pickle.load(f)

    return esn_models['RF'], esn_models['LF'], esn_models['LH'], esn_models['RH']

In [27]:
def split_sequential_data(data, period=140):
    """
    Splits sequential (n, 2) time-varying data into (n//period, period, 2) samples.

    Parameters:
    - data: np.ndarray of shape (n, 2), time-series data
    - period: int, number of time steps per sample

    Returns:
    - np.ndarray of shape (n//period, period, 2)
    """
    n = data.shape[0]
    if n % period != 0:
        raise ValueError(f"Length of data ({n}) is not divisible by period ({period})")

    return data.reshape(n // period, period, 2)

def split_signal_halves(data_in):
    """
    Splits a 3D array into two halves along the second dimension.

    Parameters:
    - data_in: np.ndarray of shape (n_samples, time_steps, channels)

    Returns:
    - data_leading: np.ndarray of shape (n_samples, time_steps//2, channels)
    - data_behind: np.ndarray of shape (n_samples, time_steps//2, channels)
    """
    midpoint = data_in.shape[1] // 2
    data_leading = data_in[:, :midpoint, :]
    data_behind = data_in[:, midpoint:, :]
    return data_leading, data_behind

def split_by_half_period(data, period, front_half_period, back_half_period):
    n = data.shape[0] // period
    set_out1 = []
    set_out2 = []

    for i in range(n):
        start = i * period
        set_out1.append(data[start : start + front_half_period, :])
        set_out2.append(data[start + period - back_half_period : start + period, :])

    return np.vstack(set_out1), np.vstack(set_out2)
def reshape_to_samples(data, sample_length=70):
    n = len(data)
    num_samples = n // sample_length  # Number of complete samples
    trimmed_data = data[:num_samples * sample_length]
    reshaped = trimmed_data.reshape((num_samples, sample_length))
    return reshaped