<h1>Import Libaries</h1>

In [None]:
# %pip install tensorflow
# %pip install pandas
# %pip install scikit-learn
# %pip install plotly
# %pip install scipy

import tensorflow as tf
from tensorflow import keras
print('Tensorflow version:', tf.__version__)
from keras.models import Sequential
from keras import Input
from keras.layers import Bidirectional, GRU, RepeatVector, Dense, TimeDistributed, concatenate, Dot, Activation, Concatenate, Flatten # for creating layers inside the Neural Network

import pandas as pd
print('pandas: %s' % pd.__version__)
import numpy as np
print('numpy: %s' % np.__version__)

import sklearn
print('sklearn: %s' % sklearn.__version__)
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

import plotly
import plotly.express as px
import plotly.graph_objects as go
print('plotly: %s' % plotly.__version__)

import scipy.io
from scipy.interpolate import interp1d
print('scipy: %s' % scipy.__version__)

import matplotlib.pyplot as plt
import math
import os.path

<h1>Load Data</h1>

<h3>From several csv files</h3>
This section synchronizes and concatinates the EIM and kinematic data through unix time.

In [None]:
def rolling_mean(input_signal):
    output_signal = input_signal.copy()
    buffer = len(input_signal) // 50
    running_sum = 0.0

    for i in range(len(input_signal)):
        running_sum += input_signal[i]

        if i < buffer:
            output_signal[i] = running_sum / float(i + 1)
        else:
            running_sum -= input_signal[i - buffer]
            output_signal[i] = running_sum / float(buffer)

    return output_signal

In [None]:
# Set Pandas options to display more columns
pd.options.display.max_columns=150

# Loading data and adding headers 'ElbowAngles' and 'Time'
kinematic_data_dir = 'MoCap/dynamic/99_HRML_4kg_pd/elbow_angles.csv'
eim_data_dir = 'MoCap/dynamic/99_HRML_4kg_pd/processed_output_data.csv'
df_kin=pd.read_csv(kinematic_data_dir, encoding='utf-8', names=["ElbowAngles", "Time"])
df_eim=pd.read_csv(eim_data_dir, encoding='utf-8')

# Stripping data of unwanted delimiters and converting to float
df_kin['ElbowAngles'] = df_kin['ElbowAngles'].str.strip('[]').astype(float)
df_kin['Time'] = df_kin['Time'].str.strip('[]').astype(str)
df_kin['Time'] = df_kin['Time'].str.strip('\'').astype(float)

# Extracting Unix time
kin_unix = df_kin['Time'].values
eim_unix = df_eim['Time'].values

# Extracting max and min Unix values of kin and using these to figure out where to slice the EIM data
kin_min = kin_unix.min()
kin_max = kin_unix.max()

# Filter EIM data based on kinematic Unix time range
df_eim = df_eim[(df_eim['Time'] >= kin_min) & (df_eim['Time'] <= kin_max)]

# Creating timestamps. EIM samples at 1000hZ, so 1 timestamp will correspond to 1ms
df_eim['Timestamp'] = np.linspace(0, (len(df_eim) - 1), len(df_eim))
df_kin['Timestamp'] = np.linspace(0, (len(df_kin) - 1), len(df_kin))

# Extract timestamps and kinematic data
kinematic_timestamps = df_kin['Timestamp'].values
kinematic_data = df_kin['ElbowAngles'].values
eim_timestamps = df_eim['Timestamp'].values

# Interpolation of the kinematic data to match the EIM data
shape = eim_timestamps.shape
kinematic_interpolated = np.empty(shape)
kin_idx = 0
step = math.floor(len(eim_timestamps)/len(kinematic_timestamps))
step_remainder = len(eim_timestamps)/len(kinematic_timestamps) - step
step_temp = 0
temp = 0

# Linear interpolation done by averaging between two points. Each data step is done based 
# on the integer difference between the lenghts of the datasets. For increased accuracy, 
# whenever the remainder of the division becomes equal to or greater than 1, 1 is added 
# to the step, and withdrawn from the counter.
for i in range(0, len(kinematic_data), 1):
    kinematic_interpolated[kin_idx] = kinematic_data[i]
    if i < len(kinematic_data)-1:
        temp = kinematic_data[i+1]
        for j in range(kin_idx + 1, kin_idx+step, 1):
            kinematic_interpolated[j] = (kinematic_interpolated[j-1] + temp) / 2
    else:
        temp = kinematic_data[i]
        for j in range(kin_idx + 1, len(eim_timestamps), 1):
            kinematic_interpolated[j] = (kinematic_interpolated[j-1] + temp) / 2

        step_temp = step_temp+step_remainder
    if step_temp >= 1:
        kin_idx = kin_idx + step + 1
        step_temp = step_temp - 1
    else:
        kin_idx = kin_idx + step


# Interpolate kinematic data to match EIM timestamps. Lines are drawn between the spread out data. 
# Missing values are being extrapolated
kinematic_interpolated = interp1d(eim_timestamps, kinematic_interpolated, kind='linear', fill_value='extrapolate')

# The interpolated data is being saved to the JointAngle column in the eim_data
df_eim['JointAngle'] = kinematic_interpolated(eim_timestamps)

# df_eim['RollingAverageMag'] = df_eim['EIMMagnitude'].rolling(100).mean()
# df_eim['RollingAveragePhase'] = df_eim['EIMPhase'].rolling(100).mean()

# Calculating rolling mean

df_eim['RollingAverageMag'] = rolling_mean(df_eim['EIMMagnitude'])
df_eim['RollingAveragePhase'] = rolling_mean(df_eim['EIMPhase'])
df_eim['RollingStdEIM'] = df_eim['EIMMagnitude'].rolling(100).std()
std_value=df_eim['RollingStdEIM'][99]
df_eim['RollingStdEIM'].fillna(value=std_value, inplace=True) 

#Filling NaN values out with the first mean value in the series
# mean_value=df_eim['RollingAverageMag'][99]
# df_eim['RollingAverageMag'].fillna(value=mean_value, inplace=True) 
# mean_value=df_eim['RollingAveragePhase'][99]
# df_eim['RollingAveragePhase'].fillna(value=mean_value, inplace=True) 


# NB! THIS SECTION OF SAVING THE FILES INTO ONE CSV FILE HAS BEEN COMMENTED OUT, NOT 
# TO RISK OVERWRITING THE all_samples.csv FILE

# Saving the result to csv, where all samples are gonna be saved
# df_final = df_eim[['Sample', 'EIMMagnitude', 'EIMPhase', 'JointAngle', 'Mass', 'Time', 'RollingAverageMag', 'RollingAveragePhase']]
# df_eim_kin = 'all_samples.csv'
# 
# if(os.path.isfile(df_eim_kin)):
#     sample_number = input("Please provide the sample number and press enter:")
#     mass = input("Please provide the mass used in the current sample:")
#     df_final = df_final.assign(Sample=sample_number)
#     df_final = df_final.assign(Mass=mass)
#     # df_final.loc[:]['Sample'] = sample_number
#     # df_final.loc[:]['Mass'] = mass
#     # df_final['Sample'] = df_final['Sample'].replace(df_final['Sample'], sample_number)
#     # df_final['Mass'] = df_final['Mass'].replace(df_final['Mass'], sample_number)

#     df_final.to_csv(df_eim_kin, mode='a', index= False, header=False)
# else:
#     sample_number = input("Please provide the sample number and press enter:")
#     mass = input("Please provide the mass used in the current sample:")
#     df_final = df_final.assign(Sample=sample_number)
#     df_final = df_final.assign(Mass=mass)
#     # df_final['Sample'] = df_final['Sample'].replace(df_final['Sample'], sample_number)
#     # df_final['Mass'] = df_final['Mass'].replace(df_final['Mass'], sample_number)

#     df_final.to_csv(df_eim_kin, mode='w', index= False)

# Assuming df_eim is your DataFrame
plt.plot(df_eim['Timestamp'], df_eim['JointAngle'], marker='o')
plt.xlabel('Timestamp')
plt.ylabel('ElbowAngles')
plt.title('Line Plot of ElbowAngles')
plt.show()

# plt.plot(df_eim['Timestamp'], df_eim['EIMMagnitude'], marker='o')
# plt.xlabel('Timestamp')
# plt.ylabel('EIMMagnitude')
# plt.title('Line Plot of EIMMagnitude')
# plt.show()

plt.plot(df_eim['Timestamp'], df_eim['RollingAverageMag'], marker='o')
plt.xlabel('Timestamp')
plt.ylabel('RollingAverageEIM')
plt.title('Line Plot of RollingAverageEIM')
plt.show()

plt.plot(df_eim['Timestamp'], df_eim['RollingStdEIM'], marker='o')
plt.xlabel('Timestamp')
plt.ylabel('RollingStdEIM')
plt.title('Line Plot of RollingStdEIM')
plt.show()

plt.plot(df_eim['Timestamp'], df_eim['RollingAveragePhase'], marker='o')
plt.xlabel('Timestamp')
plt.ylabel('EIMPhase')
plt.title('Line Plot of EIMPhase')
plt.show()

# print(df_final.head(100))
print(df_eim.head(100))

<h3>From one csv file</h3>

Load csv of all samples. Group data by 'Sample'.

In [None]:
dir_all = 'all_samples.csv'
df_all=pd.read_csv(dir_all, encoding='utf-8')
grouped = df_all.groupby('Sample')

In [None]:
# Set Pandas options to display more columns
pd.options.display.max_columns=150

# Calculate the median for each group
median_values = grouped[['EIMMagnitude', 'EIMPhase']].apply(lambda group: group[['EIMMagnitude', 'EIMPhase']].agg(['min', 'max']).median())

# Calculate the mean for each group
mean_values = grouped[['EIMMagnitude', 'EIMPhase']].mean()

# Calculate the standard deviation for each group
standard_deviations = grouped[['EIMMagnitude', 'EIMPhase']].std()

# Calculate the variance for each group
variance_values = grouped[['EIMMagnitude', 'EIMPhase']].var()

# Calculate the kurtosis for each group
kurtosis_values = grouped[['EIMMagnitude', 'EIMPhase']].apply(pd.DataFrame.kurtosis)

# Reset the index to get the 'Sample' column back
median_values.reset_index(inplace=True)
mean_values.reset_index(inplace=True)
standard_deviations.reset_index(inplace=True)
variance_values.reset_index(inplace=True)
kurtosis_values.reset_index(inplace=True)

# Rename the columns to indicate they represent the median
median_values.columns = ['Sample', 'MedianEIMMagnitude', 'MedianEIMPhase']
mean_values.columns = ['Sample', 'MeanEIMMagnitude', 'MeanEIMPhase']
standard_deviations.columns = ['Sample', 'StdEIMMagnitude', 'StdEIMPhase']
variance_values.columns = ['Sample', 'VarEIMMagnitude', 'VarEIMPhase']
kurtosis_values.columns = ['Sample', 'KurtEIMMagnitude', 'KurtEIMPhase']

# Merge the median and mean values back into the original DataFrame based on the 'Sample' column
df_all = df_all.merge(median_values, on='Sample', how='left')
df_all = df_all.merge(mean_values, on='Sample', how='left')
df_all = df_all.merge(standard_deviations, on='Sample', how='left')
df_all = df_all.merge(variance_values, on='Sample', how='left')
df_all = df_all.merge(kurtosis_values, on='Sample', how='left')

# Calculate rate of change for each group
df_all['ROCEIMMagnitude'] = df_all['RollingAverageMag'].pct_change()
df_all['ROCEIMPhase'] = df_all['RollingAveragePhase'].pct_change()

#Filling NaN values out with the first ROC_value in the series
ROC_value=df_all['ROCEIMMagnitude'][1]
df_all['ROCEIMMagnitude'].fillna(value=ROC_value, inplace=True)
ROC_value=df_all['ROCEIMPhase'][1]
df_all['ROCEIMPhase'].fillna(value=ROC_value, inplace=True)

In [None]:
df_all

In [None]:

# Assuming your data is stored in a pandas DataFrame named 'df'
# Filter the data for the first 10 samples
df_last_10_samples = df_all[df_all['Sample'] >= 89]

# Create a scatter plot for EIMMagnitude and JointAngle
fig = go.Figure()

for sample in df_last_10_samples['Sample'].unique():
    sample_data = df_last_10_samples[df_last_10_samples['Sample'] == sample]
    fig.add_trace(go.Scatter(x=sample_data.index, y=sample_data['EIMMagnitude'],
                             mode='lines+markers', name=f'Sample {sample} - EIMMagnitude'))
    fig.add_trace(go.Scatter(x=sample_data.index, y=sample_data['JointAngle'],
                             mode='lines+markers', name=f'Sample {sample} - JointAngle'))

fig.update_layout(title='EMG and Kinematic Data for the last 10 Samples',
                  xaxis_title='Time Steps', yaxis_title='Value')

# Show the plot
fig.show()

<h3>Create test set<h3>
These will be extracted from dataframe after normalization.

In [None]:
df_grouped = df_all.groupby(['Sample'])

specific_samples = [23, 38, 47, 50, 89, 98]

# Create an empty DataFrame to store the selected samples
df_test = pd.DataFrame()

# Iterate through the specific samples and extract them
for sample_value in specific_samples:
    if sample_value in df_grouped.groups:
        df_test = pd.concat([df_test, df_grouped.get_group(sample_value)])

In [None]:
# df_test_dir = 'test_samples.csv'
# df_test.to_csv(df_test_dir, index= False)

df_test

In [None]:
# Define the columns you want to normalize
features_to_normalize = ['EIMMagnitude', 'EIMPhase', 'RollingAverageMag', 'RollingAveragePhase',
                        'MedianEIMMagnitude', 'MedianEIMPhase', 'MeanEIMMagnitude',
                        'MeanEIMPhase', 'StdEIMMagnitude', 'StdEIMPhase', 'VarEIMMagnitude',
                        'VarEIMPhase', 'KurtEIMMagnitude', 'KurtEIMPhase', 'ROCEIMMagnitude',
                        'ROCEIMPhase']

targets_to_normalize = ['Mass', 'JointAngle']

columns_to_use = ['EIMMagnitude', 'EIMPhase', 'RollingAverageMag', 'RollingAveragePhase',
                        'MedianEIMMagnitude', 'MedianEIMPhase', 'MeanEIMMagnitude',
                        'MeanEIMPhase', 'StdEIMMagnitude', 'StdEIMPhase', 'VarEIMMagnitude',
                        'VarEIMPhase', 'KurtEIMMagnitude', 'KurtEIMPhase', 'ROCEIMMagnitude',
                        'ROCEIMPhase', 'Mass', 'JointAngle']

# Extract the 'Sample' column to append after normalization
sample_column = df_all['Sample'].values

# Create a copy of an original dataframe
df2=df_all.drop(['Time', 'Sample'], axis=1)

# Extracting mean and standard deviation for mean normalization
df_mean = df2.mean()
df_std = df2.std()
normalized_df=(df2-df_mean)/df_std

# Add the sample column again
normalized_df['Sample'] = sample_column

# Save means and std to CSV
# df_mean.to_csv('/content/drive/MyDrive/NeuralNetwork/means.csv', header=True)
# df_std.to_csv('/content/drive/MyDrive/NeuralNetwork/std_devs.csv', header=True)

# Show a snapshot of data
normalized_df

In [None]:
df_org = normalized_df*df_std+df_mean
df_org

In [None]:
# Display the original DataFrame length
print("Original DataFrame Length:", len(normalized_df))

# Remove the extracted samples from the original DataFrame
normalized_df.drop(normalized_df[normalized_df['Sample'] == 23].index, inplace = True)
normalized_df.drop(normalized_df[normalized_df['Sample'] == 38].index, inplace = True)
normalized_df.drop(normalized_df[normalized_df['Sample'] == 47].index, inplace = True)
normalized_df.drop(normalized_df[normalized_df['Sample'] == 50].index, inplace = True)
normalized_df.drop(normalized_df[normalized_df['Sample'] == 89].index, inplace = True)
normalized_df.drop(normalized_df[normalized_df['Sample'] == 98].index, inplace = True)

# Display the modified original DataFrame length
print("\nModified Original DataFrame Length:", len(normalized_df))

In [None]:
# Check if test samples have been removed
specific_samples = [23, 38, 47, 50, 89, 98]

for sample_value in specific_samples:
  print(sample_value in normalized_df['Sample'].unique())

In [None]:
normalized_df

In [None]:
# Set number of features and sequence lengths
num_features = 16
sequence_length = 10

columns_x = ['EIMMagnitude', 'EIMPhase', 'RollingAverageMag', 'RollingAveragePhase',
            'MedianEIMMagnitude', 'MedianEIMPhase', 'MeanEIMMagnitude',
            'MeanEIMPhase', 'StdEIMMagnitude', 'StdEIMPhase', 'VarEIMMagnitude',
            'VarEIMPhase', 'KurtEIMMagnitude', 'KurtEIMPhase', 'ROCEIMMagnitude',
            'ROCEIMPhase']

columns_y = ['JointAngle', 'Mass']

# Group the data by the 'Sample' column
df_grouped = normalized_df.groupby(['Sample'])

# Create sequences for each group
X_seq, y_seq = [], []

for group_name, group_data in df_grouped:

    group_data_x_temp = group_data[columns_x]
    # Convert to NumPy array
    group_data_x = np.array(group_data_x_temp)

    group_data_temp_y = group_data[columns_y]
    # Convert to NumPy array
    group_data_y = np.array(group_data_temp_y)

    # Create sequences
    for i in range(len(group_data) - sequence_length + 1):
        X_seq.append(group_data_x[i:i+sequence_length, :])
        y_seq.append(group_data_y[i+sequence_length-1, :])

# Convert to numpy arrays
X_seq_np = np.array(X_seq)
y_seq_np = np.array(y_seq)

# Split into training and validation data in a 80/20 ratio. Testing is done with the samples extracted from the dataset earlier
X_train, X_val, y_train, y_val = train_test_split(X_seq_np, y_seq_np, test_size=0.2, random_state=42)

In [None]:
from keras.models import load_model
# Assuming loaded_model_path is the path to your saved model file
loaded_model_path = 'C:/Users/Simons Lenovo/Documents/GitHub/MUSE/Code/Implementation/second_L2_Batch_16feat_30ts_50ep.keras'

# Load the model with custom_objects to recognize the GRU layer
loaded_model = load_model(loaded_model_path)

print("Model loaded successfully.")
print(loaded_model.summary())

<h1>Hyper parameter tuning</h1> 

# Introduction to Hyperparameter Sweeps

Searching through high dimensional hyperparameter spaces to find the most performant model can get unwieldy very fast. Hyperparameter sweeps provide an organized and efficient way to conduct a battle royale of models and pick the most accurate model. They enable this by automatically searching through combinations of hyperparameter values (e.g. learning rate, batch size, number of hidden layers, optimizer type) to find the most optimal values.

In this tutorial we'll see how you can run sophisticated hyperparameter sweeps in 3 easy steps using Weights and Biases.

![](https://i.imgur.com/WVKkMWw.png)

## Sweeps: An Overview

Running a hyperparameter sweep with Weights & Biases is very easy. There are just 3 simple steps:

1. **Define the sweep:** we do this by creating a dictionary or a [YAML file](https://docs.wandb.com/library/sweeps/configuration) that specifies the parameters to search through, the search strategy, the optimization metric et all.

2. **Initialize the sweep:** with one line of code we initialize the sweep and pass in the dictionary of sweep configurations:
`sweep_id = wandb.sweep(sweep_config)`

3. **Run the sweep agent:** also accomplished with one line of code, we call wandb.agent() and pass the sweep_id to run, along with a function that defines your model architecture and trains it:
`wandb.agent(sweep_id, function=train)`

And voila! That's all there is to running a hyperparameter sweep! In the notebook below, we'll walk through these 3 steps in more detail.


We highly encourage you to fork this notebook, tweak the parameters, or try the model with your own dataset!

## Resources
- [Sweeps docs →](https://docs.wandb.com/library/sweeps)
- [Launching from the command line →](https://www.wandb.com/articles/hyperparameter-tuning-as-easy-as-1-2-3)

# Setup
Start out by installing the experiment tracking library and setting up your free W&B account:


*   **pip install wandb** – Install the W&B library
*   **import wandb** – Import the wandb library

In [None]:
# WandB – Install the W&B library
# %pip install wandb -q
import wandb
from wandb.keras import WandbCallback

In [None]:
# %pip install np_utils

In [None]:
# %pip install wandb -qq

In [None]:
from keras.datasets import fashion_mnist
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dropout, Dense, Flatten, GRU, BatchNormalization
from tensorflow.keras.utils import to_categorical
from keras.optimizers import SGD
from keras.optimizers import RMSprop, SGD, Adam, Nadam
from keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, Callback, EarlyStopping
from keras.initializers import he_normal
from keras.regularizers import l2

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

import wandb
from wandb.keras import WandbCallback

## For first tries: Further split the data for testing

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.7, random_state=42)

In [None]:
X_train.shape

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

In [None]:
print(X_train.shape)
print(y_train.shape)
print(X_val.shape)
print(y_val.shape)

## 1. Define the Sweep

Weights & Biases sweeps give you powerful levers to configure your sweeps exactly how you want them, with just a few lines of code. The sweeps config can be defined as a dictionary or a [YAML file](https://docs.wandb.com/library/sweeps).

Let's walk through some of them together:
*   **Metric** – This is the metric the sweeps are attempting to optimize. Metrics can take a `name` (this metric should be logged by your training script) and a `goal` (maximize or minimize).
*   **Search Strategy** – Specified using the 'method' variable. We support several different search strategies with sweeps.
  *   **Grid Search** – Iterates over every combination of hyperparameter values.
  *   **Random Search** – Iterates over randomly chosen combinations of hyperparameter values.
  *   **Bayesian Search** – Creates a probabilistic model that maps hyperparameters to probability of a metric score, and chooses parameters with high probability of improving the metric. The objective of Bayesian optimization is to spend more time in picking the hyperparameter values, but in doing so trying out fewer hyperparameter values.
*   **Stopping Criteria** – The strategy for determining when to kill off poorly peforming runs, and try more combinations faster. We offer several custom scheduling algorithms like [HyperBand](https://arxiv.org/pdf/1603.06560.pdf) and Envelope.
*   **Parameters** – A dictionary containing the hyperparameter names, and discreet values, max and min values or distributions from which to pull their values to sweep over.

You can find a list of all configuration options [here](https://docs.wandb.com/library/sweeps/configuration).

In [None]:
# Setting the search mode. Choices: grid, random, bayes.
# Use random over grid if you have many hyperparemeters to tune, since grid is computational heavy
sweep_config = {
    'method': 'random'
    }

In [None]:
# To tell the sweep which metric to optimize and in which direction. Only necesary for bayes, but a good idea regardless
metric = {
    'name': 'accuracy',
    'goal': 'maximize'
    }

sweep_config['metric'] = metric

In [None]:
# Specifying hyperparameters we need to optimize
parameters_dict = {
    'optimizer': {
        'values': ['adam', 'nadam', 'sgd', 'rmsprop']
        },
    'learning_rate': {
        # a flat distribution between 0 and 0.1
        'distribution': 'uniform',
        'min': 0,
        'max': 0.1
        },
    'batch_size': {
        # integers between 32 and 256
        # with evenly-distributed logarithms
        'distribution': 'q_log_uniform_values',
        'q': 8,
        'min': 32,
        'max': 256,
        },
    # 'fc_layer_size1': {
    #     'values': [32, 64, 128, 256]
    #     },
    # 'fc_layer_size2': {
    #     'values': [32, 64, 128, 256]
    #     },
    # 'dropout': {
    #     'values': [0.2, 0.3, 0.4, 0.5]
    #     },
    # 'reccurent_dropout': {
    #     'values': [0.2, 0.3, 0.4, 0.5]
    #     },
    # 'kernel_regularizer': {
    #     # a flat distribution between 0 and 0.1
    #     'distribution': 'uniform',
    #     'min': 0.01,
    #     'max': 0.4
    # }

      
    # 'fc_layer_size': {
    #     'values': [128, 256, 512]
    #     },
    # 'dropout': {
    #       'values': [0.3, 0.4, 0.5]
    #     },
    # 'activation': {
    #     'values': ['relu', 'elu', 'selu', 'softmax']
    #     },
    # 'weight_decay': {
    #     'values': [0.0005, 0.005, 0.05]
    #     }
    }

sweep_config['parameters'] = parameters_dict

In [None]:
# Configure the sweep – specify the parameters to search through, the search strategy, the optimization metric et all.
# sweep_config = {
#     'method': 'random', #grid, random
#     'metric': {
#       'name': 'accuracy',
#       'goal': 'maximize'
#     },
#     'parameters': {
#         'epochs': {
#             'values': [2, 5, 10]
#         },
#         'batch_size': {
#             'values': [256, 128, 64, 32]
#         },
#         'dropout': {
#             'values': [0.3, 0.4, 0.5]
#         },
#         'fc_layer_size': {
#             'values': [16, 32, 64]
#         },
#         'weight_decay': {
#             'values': [0.0005, 0.005, 0.05]
#         },
#         'learning_rate': {
#             'values': [1e-2, 1e-3, 1e-4, 3e-4, 3e-5, 1e-5]
#         },
#         'optimizer': {
#             'values': ['adam', 'nadam', 'sgd', 'rmsprop']
#         },
#         'activation': {
#             'values': ['relu', 'elu', 'selu', 'softmax']
#         }
#     }
# }

In [None]:
# For setting values that we don't want to vary in the script.
parameters_dict.update({
    'epochs': {
        'value': 10}
    })

In [None]:
import pprint

pprint.pprint(sweep_config)

## 2. Initialize the Sweep

In [None]:
# Initialize a new sweep
# Arguments:
#     – sweep_config: the sweep config dictionary defined above
#     – entity: Set the username for the sweep
#     – project: Set the project name for the sweep
os.environ['WANDB_NOTEBOOK_NAME'] = 'C:/Users/Simons Lenovo/Documents/GitHub/MUSE/Code/GRU_network/network.ipynb'
sweep_id = wandb.sweep(sweep_config, entity="simoncv", project="sweeps-test")

### Define Your Neural Network
Before we can run the sweep, let's define a function that creates and trains our neural network.

In the function below, we define a simplified version of a VGG19 model in Keras, and add the following lines of code to log models metrics, visualize performance and output and track our experiments easily:
*   **wandb.init()** – Initialize a new W&B run. Each run is single execution of the training script.
*   **wandb.config** – Save all your hyperparameters in a config object. This lets you use our app to sort and compare your runs by hyperparameter values.
*   **callbacks=[WandbCallback()]** – Fetch all layer dimensions, model parameters and log them automatically to your W&B dashboard.
*   **wandb.log()** – Logs custom objects – these can be images, videos, audio files, HTML, plots, point clouds etc. Here we use wandb.log to log images of Simpson characters overlaid with actual and predicted labels.

In [None]:
# The sweep calls this function with each set of hyperparameters
def train():
    # Default values for hyper-parameters we're going to sweep over
    config_defaults = {
        'epochs': 10,
        'batch_size': 128,
        'weight_decay': 0.0005,
        'learning_rate': 1e-3,
        'activation': 'relu',
        'optimizer': 'adam',
        'fc_layer_size1': 32,
        'fc_layer_size2': 64,
        'dropout': 0.2,
        'reccurent_dropout': 0.4,
        'kernel_regularizer': 0.01,
        'momentum': 0.9,
        'seed': 42
    }

    # Initialize a new wandb run
    wandb.init(config=config_defaults)

    # Config is a variable that holds and saves hyperparameters and inputs
    config = wandb.config

    # Set random seed for reproducibility
    seed_value = 42
    np.random.seed(seed_value)
    tf.random.set_seed(seed_value)


    # Build the GRU model
    model = Sequential()

    model.add(GRU(units=config.fc_layer_size1, activation='relu', return_sequences=True,
                  kernel_initializer=he_normal(seed=seed_value),
                  kernel_regularizer=l2(config.kernel_regularizer),
                  input_shape=(sequence_length, num_features), name='Input-Layer'))
    model.add(BatchNormalization())
    model.add(Dropout(config.dropout, seed=seed_value))  # Dropout after the first GRU layer

    model.add(GRU(units = config.fc_layer_size2, activation='relu',
                  kernel_initializer=he_normal(seed=seed_value),
                  kernel_regularizer=l2(config.kernel_regularizer),
                  recurrent_dropout=config.reccurent_dropout, dropout=config.dropout))
    model.add(BatchNormalization())
    model.add(Dropout(config.dropout, seed=seed_value))  # Dropout after the second GRU layer

    model.add(Dense(units=2, activation='linear'))  # Output layer

    # Define the optimizer
    if config.optimizer=='sgd':
      optimizer = SGD(learning_rate=config.learning_rate, weight_decay=1e-5, momentum=config.momentum, nesterov=True)
    elif config.optimizer=='rmsprop':
      optimizer = RMSprop(learning_rate=config.learning_rate, weight_decay=1e-5)
    elif config.optimizer=='adam':
      optimizer = Adam(learning_rate=config.learning_rate, beta_1=0.9, beta_2=0.999, clipnorm=1.0)
    elif config.optimizer=='nadam':
      optimizer = Nadam(learning_rate=config.learning_rate, beta_1=0.9, beta_2=0.999, clipnorm=1.0)

    model.compile(loss = "mean_squared_error", optimizer = optimizer, metrics=['accuracy'])

    model.fit(X_train, y_train, batch_size=config.batch_size,
              epochs=config.epochs,
              validation_data=(X_val, y_val),
              callbacks=[WandbCallback(data_type="graph", validation_data=(X_val, y_val)),
                          EarlyStopping(patience=10, restore_best_weights=True)])

## 3. Run the sweep agent

In [None]:
# Initialize a new sweep
# Arguments:
#     – sweep_id: the sweep_id to run - this was returned above by wandb.sweep()
#     – function: function that defines your model architecture and trains it
wandb.agent(sweep_id, train)

<h1>Feature engineering</h1> 
Calculating median, mean, standard deviation, variance and kurtosis for each group.

In [None]:
# Set Pandas options to display more columns
pd.options.display.max_columns=150

# Calculate the median for each group
median_values = grouped[['EIMMagnitude', 'EIMPhase', 'JointAngle']].apply(lambda group: group[['EIMMagnitude', 'EIMPhase', 'JointAngle']].agg(['min', 'max']).median())

# Calculate the mean for each group
mean_values = grouped[['EIMMagnitude', 'EIMPhase', 'JointAngle']].mean()

# Calculate the standard deviation for each group
standard_deviations = grouped[['EIMMagnitude', 'EIMPhase', 'JointAngle']].std()

# Calculate the variance for each group
variance_values = grouped[['EIMMagnitude', 'EIMPhase', 'JointAngle']].var()

# Calculate the kurtosis for each group
kurtosis_values = grouped[['EIMMagnitude', 'EIMPhase', 'JointAngle']].apply(pd.DataFrame.kurtosis)

# Reset the index to get the 'Sample' column back
median_values.reset_index(inplace=True)
mean_values.reset_index(inplace=True)
standard_deviations.reset_index(inplace=True)
variance_values.reset_index(inplace=True)
kurtosis_values.reset_index(inplace=True)

# Rename the columns to indicate they represent the median
median_values.columns = ['Sample', 'MedianEIMMagnitude', 'MedianEIMPhase', 'MedianJointAngle']
mean_values.columns = ['Sample', 'MeanEIMMagnitude', 'MeanEIMPhase', 'MeanJointAngle']
standard_deviations.columns = ['Sample', 'StdEIMMagnitude', 'StdEIMPhase', 'StdJointAngle']
variance_values.columns = ['Sample', 'VarEIMMagnitude', 'VarEIMPhase', 'VarJointAngle']
kurtosis_values.columns = ['Sample', 'KurtEIMMagnitude', 'KurtEIMPhase', 'KurtJointAngle']

# Merge the median and mean values back into the original DataFrame based on the 'Sample' column
df_all = df_all.merge(median_values, on='Sample', how='left')
df_all = df_all.merge(mean_values, on='Sample', how='left')
df_all = df_all.merge(standard_deviations, on='Sample', how='left')
df_all = df_all.merge(variance_values, on='Sample', how='left')
df_all = df_all.merge(kurtosis_values, on='Sample', how='left')

# Calculate rate of change for each group
df_all['ROCEIMMagnitude'] = df_all['RollingAverageMag'].pct_change()
df_all['ROCEIMPhase'] = df_all['RollingAveragePhase'].pct_change()
df_all['ROCJointAngle'] = df_all['JointAngle'].pct_change()

#Filling NaN values out with the first mean value in the series
ROC_value=df_all['ROCEIMMagnitude'][1]
df_all['ROCEIMMagnitude'].fillna(value=ROC_value, inplace=True)
ROC_value=df_all['ROCEIMPhase'][1]
df_all['ROCEIMPhase'].fillna(value=ROC_value, inplace=True)
ROC_value=df_all['ROCJointAngle'][1]
df_all['ROCJointAngle'].fillna(value=ROC_value, inplace=True)

In [None]:
df_all.head(150)

<h5>Visualization</h5>

In [None]:

# Assuming your data is stored in a pandas DataFrame named 'df'
# Filter the data for the first 10 samples
df_last_10_samples = df_all[df_all['Sample'] >= 89]

# Create a scatter plot for EIMMagnitude and JointAngle
fig = go.Figure()

for sample in df_last_10_samples['Sample'].unique():
    sample_data = df_last_10_samples[df_last_10_samples['Sample'] == sample]
    fig.add_trace(go.Scatter(x=sample_data.index, y=sample_data['EIMMagnitude'],
                             mode='lines+markers', name=f'Sample {sample} - EIMMagnitude'))
    fig.add_trace(go.Scatter(x=sample_data.index, y=sample_data['JointAngle'],
                             mode='lines+markers', name=f'Sample {sample} - JointAngle'))

fig.update_layout(title='EMG and Kinematic Data for the last 10 Samples',
                  xaxis_title='Time Steps', yaxis_title='Value')

# Show the plot
fig.show()


<h1>Data preprocessing</h1>

Normalizing. Range of the scaler should be the same for x and y axis

In [None]:
# Create a copy of an original dataframe
df2=df_all.copy()

# Define the columns you want to normalize
columns_to_normalize = ['EIMMagnitude', 'EIMPhase',	'JointAngle', 'RollingAverageMag', 'RollingAveragePhase', 
                        'MedianEIMMagnitude', 'MedianEIMPhase', 'MedianJointAngle', 'MeanEIMMagnitude',
                        'MeanEIMPhase', 'MeanJointAngle', 'StdEIMMagnitude', 'StdEIMPhase',	'StdJointAngle',
                        'VarEIMMagnitude', 'VarEIMPhase', 'VarJointAngle', 'KurtEIMMagnitude', 'KurtEIMPhase',
                        'KurtJointAngle', 'ROCEIMMagnitude', 'ROCEIMPhase', 'ROCJointAngle']

# Initialize the MinMaxScaler
scaler = MinMaxScaler(feature_range=(0, 1))

# Fit and transform the selected columns
df2[columns_to_normalize] = scaler.fit_transform(df_all[columns_to_normalize])

# Show a snaphsot of data
df2

In [None]:
df_grouped = df2.groupby(['Sample'])

# Display the original DataFrame length
print("Original DataFrame Length:", len(df2))

specific_samples = [23, 38, 47, 50, 89, 98]

# Create an empty DataFrame to store the selected samples
df_test = pd.DataFrame()

# Iterate through the specific samples and extract them
for sample_value in specific_samples:
    if sample_value in df_grouped.groups:
        df_test = pd.concat([df_test, df_grouped.get_group(sample_value)])

# Remove the extracted samples from the original DataFrame
df2.drop(df2[df2['Sample'] == 23].index, inplace = True)
df2.drop(df2[df2['Sample'] == 38].index, inplace = True)
df2.drop(df2[df2['Sample'] == 47].index, inplace = True)
df2.drop(df2[df2['Sample'] == 50].index, inplace = True)
df2.drop(df2[df2['Sample'] == 89].index, inplace = True)
df2.drop(df2[df2['Sample'] == 98].index, inplace = True)

# Display the modified original DataFrame length
print("\nModified Original DataFrame Length:", len(df2))

In [None]:
for sample_value in specific_samples:
  print(sample_value in df2['Sample'].unique())

In [None]:
# Assuming your data is stored in a pandas DataFrame named 'df'
# Filter the data for the first 10 samples
df_last_10_samples = df2[df2['Sample'] >= 89]

# Create a scatter plot for EIMMagnitude and JointAngle
fig = go.Figure()

for sample in df_last_10_samples['Sample'].unique():
    sample_data = df_last_10_samples[df_last_10_samples['Sample'] == sample]
    fig.add_trace(go.Scatter(x=sample_data.index, y=sample_data['EIMMagnitude'],
                             mode='lines+markers', name=f'Sample {sample} - EIMMagnitude'))
    fig.add_trace(go.Scatter(x=sample_data.index, y=sample_data['JointAngle'],
                             mode='lines+markers', name=f'Sample {sample} - JointAngle'))

fig.update_layout(title='EMG and Kinematic Data for the First 10 Samples',
                  xaxis_title='Time Steps', yaxis_title='Value')

# Show the plot
fig.show()

<h1>Prepare sequences for GRU</h1>

This is important for synchronizing the data with eachother with a common time variable. The "time-variable" from the dataset is therefor strictly speaking not needed for training the network.

<h3>Training and evaluating GRU model.</h3>

In [None]:
def reshape_for_gru(datain, timestep, scaler, columns_to_use):
    X_out, Y_out = None, None

    for sample in datain['Sample'].unique():
        datatmp = datain[datain['Sample'] == sample].copy()

        for col in columns_to_use:
            arr = datatmp[col].to_numpy()

            # Scale using transform (using previously fitted scaler)
            arr_scaled = scaler.transform(arr.reshape(-1, 1)).flatten()

            # Use numpy's stride tricks to create the sequences
            shape = (len(arr_scaled) - 2 * timestep + 1, timestep, 1)
            strides = (arr_scaled.itemsize, arr_scaled.itemsize, arr_scaled.itemsize)
            X_tmp = np.lib.stride_tricks.as_strided(arr_scaled, shape=shape, strides=strides)

            # Reshape for GRU input
            X_tmp = X_tmp.reshape(-1, timestep, 1)

            # Create corresponding Y sequences
            Y_tmp = arr_scaled[timestep:2 * timestep].reshape(-1, timestep, 1)

            if X_out is None:
                X_out = X_tmp
                Y_out = Y_tmp
            else:
                X_out = np.concatenate((X_out, X_tmp), axis=0)
                Y_out = np.concatenate((Y_out, Y_tmp), axis=0)

    return X_out, Y_out

Splitting the data and reshaping it for GRU

In [None]:
# test_path = 'EIM_kin_data.csv'
# df_test=pd.read_csv(test_path, encoding='utf-8')

thousand_datapoints = df_all.iloc[0:4000]

scaler = MinMaxScaler(feature_range=(0,1))
timestep = 10

# Split data into train, eval, and test dataframes
train_ratio = 0.7
eval_ratio = 0.2
test_ratio = 0.1

# Split the data
df_train, df_temp = train_test_split(thousand_datapoints, test_size=1 - train_ratio, random_state=42)
df_eval, df_test = train_test_split(df_temp, test_size=test_ratio / (test_ratio + eval_ratio), random_state=42)

# 'EIMMagnitude', 'EIMPhase',	'JointAngle', 'Mass', 'RollingAverageMag', 'RollingAveragePhase', 
#                         'MedianEIMMagnitude', 'MedianEIMPhase', 'MedianJointAngle', 'MeanEIMMagnitude',
#                         'MeanEIMPhase', 'MeanJointAngle', 'StdEIMMagnitude', 'StdEIMPhase',	'StdJointAngle',
#                         'VarEIMMagnitude', 'VarEIMPhase', 'VarJointAngle', 'KurtEIMMagnitude', 'KurtEIMPhase',
#                         'KurtJointAngle', 'ROCEIMMagnitude', 'ROCEIMPhase', 'ROCJointAngle'

columns_to_train_on = ['EIMMagnitude', 'EIMPhase',	'JointAngle', 'Mass', 'RollingAverageMag', 'RollingAveragePhase', 
                        'MedianEIMMagnitude', 'MedianEIMPhase', 'MedianJointAngle', 'MeanEIMMagnitude']

# Use fit to train the scaler on the training data only, actual scaling will be done inside reshaping function
scaler.fit(df_train[columns_to_train_on].values.reshape(-1, 1))

# Use the reshaping function to reshape the data for GRU
columns_to_use = columns_to_train_on

X_train, Y_train = reshape_for_gru(df_train, timestep, scaler, columns_to_use)
X_eval, Y_eval = reshape_for_gru(df_eval, timestep, scaler, columns_to_use)
X_test, Y_test = reshape_for_gru(df_test, timestep, scaler, columns_to_use)

In [None]:
print('X_train_shape: ',X_train.shape)
print('Y_train_shape: ',Y_train.shape)
print('X_eval_shape: ',X_eval.shape)
print('Y_eval_shape: ',Y_eval.shape)
print('X_test_shape: ',X_test.shape)
print('Y_test_shape: ',Y_test.shape)

print(Y_test[0])

<h3>Specify the structure of a Neural Network<h3>

In [None]:
model = Sequential(name="GRU-Model")
model.add(Input(shape=(X_train.shape[1],X_train.shape[2]), name='Input-Layer'))
model.add(Bidirectional(GRU(units=32, activation='tanh', recurrent_activation='sigmoid', stateful=False), name='Hidden-GRU-Encoder-Layer'))
model.add(RepeatVector(X_train.shape[1], name='Repeat-Vector-Layer'))
model.add(Bidirectional(GRU(units=32, activation='tanh', recurrent_activation='sigmoid', stateful=False, return_sequences=True), name='Hidden-GRU-Decoder-Layer'))
model.add(TimeDistributed(Dense(units=1, activation='linear'), name='Output-Layer'))

<h3>Compile the model<h3>

In [None]:
model.compile(optimizer='adam',
              loss='mean_squared_error',
              metrics=['MeanSquaredError', 'MeanAbsoluteError'],
              loss_weights=None,
              weighted_metrics=None,
              run_eagerly=None,
              steps_per_execution=None
             )

<h3>Fit the model on the dataset<h3>

In [None]:
history = model.fit(X_train,
                    Y_train,
                    batch_size=32,
                    epochs=50,
                    verbose=1,
                    callbacks=None,
                    validation_data=(X_eval, Y_eval),
                    shuffle=True,
                    class_weight=None,
                    sample_weight=None,
                    initial_epoch=0,
                    steps_per_epoch=None,
                    validation_steps=None,
                    validation_batch_size=None,
                    validation_freq=10,
                    max_queue_size=10,
                    workers=1,
                    use_multiprocessing=True,
                   )

<h3>Use model to make predictions<h3>

In [None]:
# Predict results on training data
#pred_train = model.predict(X_train)
# Predict results on test data
pred_test = model.predict(X_test)

<h3>Print Performance Summary<h3>

In [None]:
print("")
print('-------------------- Model Summary --------------------')
# print model summary
model.summary()
print("")
print('-------------------- Weights and Biases --------------------')
print("Too many parameters to print but you can use the code provided if needed")
print("")
#for layer in model.layers:
#    print(layer.name)
#    for item in layer.get_weights():
#        print("  ", item)
#print("")

# Print the last value in the evaluation metrics contained within history file
print('-------------------- Evaluation on Training Data --------------------')
for item in history.history:
    print("Final", item, ":", history.history[item][-1])
print("")

# Evaluate the model on the test data using "evaluate"
print('-------------------- Evaluation on Test Data --------------------')
results = model.evaluate(X_test, Y_test)
print("")

In [None]:
pred_test = model.predict(X_test)

# Create a plot
fig = go.Figure()

# Add traces for actual values (ground truth)
for i in range(X_test):
    fig.add_trace(go.Scatter(x=np.arange(len(Y_test[i])), y=scaler.inverse_transform(Y_test[i].reshape(-1, 1)).flatten(),
                             mode='lines',
                             name=f'Sample {i + 1} - Actual',
                             opacity=0.8,
                             line=dict(width=1)
                            ))

# Add traces for predicted values
for i in range(X_test):
    fig.add_trace(go.Scatter(x=np.arange(len(pred_test[i])), y=scaler.inverse_transform(pred_test[i].reshape(-1, 1)).flatten(),
                             mode='lines',
                             name=f'Sample {i + 1} - Predicted',
                             opacity=1,
                             line=dict(width=2, dash='dot')
                            ))

# Customize the plot appearance
fig.update_layout(
    plot_bgcolor='white',
    xaxis=dict(
        showgrid=True, gridwidth=1, gridcolor='lightgrey',
        zeroline=True, zerolinewidth=1, zerolinecolor='lightgrey',
        showline=True, linewidth=1, linecolor='black',
        title='Time Steps'
    ),
    yaxis=dict(
        showgrid=True, gridwidth=1, gridcolor='lightgrey',
        zeroline=True, zerolinewidth=1, zerolinecolor='lightgrey',
        showline=True, linewidth=1, linecolor='black',
        title='Values'
    ),
    title=dict(text="Actual vs. Predicted Values", font=dict(color='black'))
)

# Show the plot
fig.show()

<h1>Splitting the data</h1>

Splits the data into a training/evaluation/test distribution of 70/20/10. The random_state parameter is a seed for the random split, that allows reproducability.

In [None]:
num_kinematic_features = 18
num_emg_features = 7

# Extract X_kinematic and y_kinematic
# Adjust num_kinematic_features
X_kinematic = kinematic_data_normalized[:, :, :num_kinematic_features]
# Assuming the last time step represents the target
y_kinematic = kinematic_data_normalized[:, -1, :]

# Extract X_emg and y_emg
# Adjust num_emg_features
X_emg = emg_data_normalized[:, :, :num_emg_features]
# Assuming the last time step represents the target
y_emg = emg_data_normalized[:, -1, :]

# Now you have X_kinematic, y_kinematic, X_emg, and y_emg for further processing

# Split the data
X_kinematic_train, X_kinematic_val, y_kinematic_train, y_kinematic_val = train_test_split(
    X_kinematic, y_kinematic, test_size=0.3, random_state=42
)
X_emg_train, X_emg_val, y_emg_train, y_emg_val = train_test_split(
    X_emg, y_emg, test_size=0.3, random_state=42
)

# Ensure shapes match the model input requirements
# Add additional processing steps if necessary

# Print the shapes for verification
# print("Shapes of Kinematic Data Sets:")
# print("Train:", X_train_kinematic.shape, y_train_kinematic.shape)
# print("Validation:", X_val_kinematic.shape, y_val_kinematic.shape)
# print("Test:", X_test_kinematic.shape, y_test_kinematic.shape)

# print("\nShapes of EMG Data Sets:")
# print("Train:", X_train_emg.shape, y_train_emg.shape)
# print("Validation:", X_val_emg.shape, y_val_emg.shape)
# print("Test:", X_test_emg.shape, y_test_emg.shape)

<h1>Model architecture</h1>

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Masking, RepeatVector, Layer, Reshape
import tensorflow.keras.backend as K

# Assuming num_emg_features, num_kinematic_features, and num_joint_angle_features are defined
num_emg_features = 7
num_kinematic_features = 18

# Total number of features
num_input_features = num_emg_features + num_kinematic_features

# Define the input sequence shape
# Variable-length input sequence
input_seq_shape = (None, num_input_features)

# Define the GRU units
gru_units = 32

# EMG data branch
encoder_inputs_emg = Input(shape=(None, num_emg_features), name='Input-Layer-EMG')
encoder_emg = GRU(gru_units, return_sequences=True, name='Hidden-GRU-Encoder-Layer-EMG')(encoder_inputs_emg)

# Kinematic data branch
encoder_inputs_kinematic = Input(shape=(None, num_kinematic_features), name='Input-Layer-Kinematic')
encoder_kinematic = GRU(gru_units, return_sequences=True, name='Hidden-GRU-Encoder-Layer-Kinematic')(encoder_inputs_kinematic)

# Concatenate EMG and kinematic encodings
encoder_combined = concatenate([encoder_emg, encoder_kinematic], axis=-1)

# Attention Mechanism
attention = Dot(axes=[1, 1], name='Attention-Layer')([encoder_combined, encoder_combined])
attention = Activation('softmax', name='Attention-Activation')(attention)

# Apply attention weights to encoder outputs
context = Dot(axes=[1, 2], name='Context-Layer')([attention, encoder_combined])

# Decoder
decoder_inputs = Input(shape=(None, num_kinematic_features), name='Decoder-Input-Layer')
decoder_gru = GRU(gru_units, return_sequences=True, name='Hidden-GRU-Decoder-Layer')(decoder_inputs)

# Flatten context along with masking
context_flattened = Flatten()(Masking()(context))

# Custom layer to repeat the context along the time axis
class RepeatContextLayer(Layer):
    def __init__(self, **kwargs):
        super(RepeatContextLayer, self).__init__(**kwargs)

    def call(self, inputs, **kwargs):
        return K.expand_dims(inputs, axis=1)

    def compute_output_shape(self, input_shape):
        return (input_shape[0], 1, input_shape[1])

context_expanded = RepeatContextLayer(name='Repeat-Context')(context_flattened)

# Concatenate expanded context and decoder GRU output
decoder_combined = Concatenate(axis=-1, name='Concatenate-Layer')([context_expanded, decoder_gru])

# Flatten the decoder_combined while maintaining the sequence length
decoder_flattened = Flatten(name='Flatten-Layer')(decoder_combined)

print('Decoder shape before reshaping: ', decoder_flattened.shape)
# Reshape to have a single dimension in the output
decoder_output = Reshape((-1, 1), name='Reshape-Layer')(decoder_flattened)
print('Flatten-Layer shape: ', decoder_output.shape)

# Output layer
outputs = TimeDistributed(Dense(1, activation='linear'), name='Output-Layer')(decoder_output)

# Create the model
model = Model(inputs=[encoder_inputs_emg, encoder_inputs_kinematic, decoder_inputs], outputs=outputs)
model.compile(optimizer='adam', loss='mean_squared_error')

# Print model summary for review
model.summary()


TEST OF CODE IN COLAB

In [None]:
from tensorflow.keras.models import load_model

# Assuming loaded_model_path is the path to your saved model file
loaded_model_path = 'short_test_5_epoch.keras'

# Load the model with custom_objects to recognize the GRU layer
loaded_model = load_model(loaded_model_path)

print("Model loaded successfully.")
print(loaded_model.summary())

In [None]:
from sklearn.preprocessing import MinMaxScaler

df_last_10_samples = df_all[df_all['Sample'] >= 89]
test_eim = np.array(df_last_10_samples['EIMMagnitude'])
print(test_eim)

scaler = MinMaxScaler(feature_range=(0,1))

columns_to_normalize = ['EIMMagnitude', 'EIMPhase',	'JointAngle', 'RollingAverageMag', 'RollingAveragePhase',
                        'MedianEIMMagnitude', 'MedianEIMPhase', 'MedianJointAngle', 'MeanEIMMagnitude',
                        'MeanEIMPhase', 'MeanJointAngle', 'StdEIMMagnitude', 'StdEIMPhase',	'StdJointAngle',
                        'VarEIMMagnitude', 'VarEIMPhase', 'VarJointAngle', 'KurtEIMMagnitude', 'KurtEIMPhase',
                        'KurtJointAngle', 'ROCEIMMagnitude', 'ROCEIMPhase', 'ROCJointAngle']

scaler.fit(df_last_10_samples[columns_to_normalize].values.reshape(-1,1))
df_grouped = df_last_10_samples.groupby(['Sample'])

# Split the dataset into input features (X) and target variables (y)
num_features = 23
# X = df_grouped[['EIMMagnitude', 'EIMPhase', 'RollingAverageMag', 'RollingAveragePhase','MedianEIMMagnitude',
#         'MedianEIMPhase', 'MeanEIMMagnitude', 'MeanEIMPhase', 'StdEIMMagnitude', 'StdEIMPhase', 'VarEIMMagnitude',
#         'VarEIMPhase', 'KurtEIMMagnitude', 'KurtEIMPhase', 'ROCEIMMagnitude', 'ROCEIMPhase']]
# y = df_grouped[['JointAngle', 'Mass']]

# Initialize scalers
scalers_X = {}
scalers_y = {}

# Scale data and create sequences for each group
X_seq, y_seq = [], []

for group_name, group_data in df_grouped:
    scaler_X = StandardScaler()
    scaler_y = StandardScaler()

    # Scale features
    group_data_scaled = scaler.fit_transform(group_data[['JointAngle', 'MedianJointAngle', 'MeanJointAngle','StdJointAngle',
                                                           'ROCJointAngle', 'VarJointAngle', 'KurtJointAngle',
                                                           'EIMMagnitude', 'EIMPhase', 'RollingAverageMag', 'RollingAveragePhase',
                                                           'MedianEIMMagnitude', 'MedianEIMPhase', 'MeanEIMMagnitude',
                                                           'MeanEIMPhase', 'StdEIMMagnitude', 'StdEIMPhase', 'VarEIMMagnitude',
                                                           'VarEIMPhase', 'KurtEIMMagnitude', 'KurtEIMPhase', 'ROCEIMMagnitude','ROCEIMPhase']])

    # Scale target variables
    group_data_scaled_y = scaler.fit_transform(group_data[['JointAngle', 'Mass']])

    # Create sequences (adjust sequence_length as needed)
    sequence_length = 10
    for i in range(len(group_data) - sequence_length + 1):
        X_seq.append(group_data_scaled[i:i+sequence_length, :])
        y_seq.append(group_data_scaled_y[i+sequence_length-1, :])

# Convert to numpy arrays
X_seq = np.array(X_seq)
y_seq = np.array(y_seq)

timestep = 10

# X_seq, y_seqn = reshape_for_gru(df_last_10_samples, timestep, scaler, columns_to_use)

predictions_scaled = loaded_model.predict(X_seq)
predictions = scaler.inverse_transform(predictions_scaled)