### **Connecting with Drive**

In [None]:
#Reading the training data Subject
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install geneticalgorithm




In [None]:
!pip install optuna



In [None]:
import numpy as np
import pandas as pd
from scipy.io import loadmat
from scipy.stats import pearsonr
from tensorflow.keras import layers, Model, Input
from tensorflow.keras.optimizers import Adam
import optuna
import tensorflow as tf
import matplotlib.pyplot as plt

# Augmentation function for EMG signals
def augment_emg_signals(data):
    noise = np.random.normal(0, 0.1, data.shape)  # Add Gaussian noise
    scaling = np.random.uniform(0.8, 1.2, size=(data.shape[0], 1, data.shape[2]))
    return data * scaling + noise

# Define SimpleGNNLayer
class SimpleGNNLayer(layers.Layer):
    def __init__(self, num_features, **kwargs):
        super(SimpleGNNLayer, self).__init__(**kwargs)
        self.num_features = num_features

    def build(self, input_shape):
        self.adjacency_matrix = self.add_weight(
            shape=(input_shape[-1], self.num_features),
            initializer="random_normal",
            trainable=True,
            name="adjacency_matrix"
        )

    def call(self, inputs):
        return tf.einsum('bti,ij->btj', inputs, self.adjacency_matrix)

# Define Domain-Specific Normalization
class DomainSpecificNormalization(layers.Layer):
    def __init__(self, **kwargs):
        super(DomainSpecificNormalization, self).__init__(**kwargs)

    def build(self, input_shape):
        self.gamma = self.add_weight(
            shape=(1, 1, input_shape[-1]),
            initializer="ones",
            trainable=True,
            name="gamma"
        )
        self.beta = self.add_weight(
            shape=(1, 1, input_shape[-1]),
            initializer="zeros",
            trainable=True,
            name="beta"
        )

    def call(self, inputs):
        mean = tf.reduce_mean(inputs, axis=(1, 2), keepdims=True)
        std = tf.math.reduce_std(inputs, axis=(1, 2), keepdims=True)
        normalized = (inputs - mean) / (std + tf.keras.backend.epsilon())
        return self.gamma * normalized + self.beta

# Model architecture
def build_model(input_features=24, timesteps=15, numResponses=22):
    inputs = Input(shape=(timesteps, input_features))
    x = DomainSpecificNormalization()(inputs)
    x = layers.Conv1D(32, kernel_size=3, strides=1, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling1D(pool_size=2, strides=1, padding='same')(x)
    x = layers.Conv1D(64, kernel_size=3, strides=1, padding='same', use_bias=False)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.MaxPooling1D(pool_size=2, strides=1, padding='same')(x)
    x = SimpleGNNLayer(num_features=64)(x)
    x = layers.Bidirectional(layers.LSTM(500, return_sequences=True))(x)
    x = layers.Dropout(0.2)(x)
    attn_output = layers.MultiHeadAttention(num_heads=4, key_dim=64)(x, x)
    x = layers.Add()([x, attn_output])
    x = layers.LayerNormalization()(x)
    x = layers.Dense(400, activation='relu')(x)
    x = layers.Dense(200, activation='relu')(x)
    x = layers.GlobalAveragePooling1D()(x)
    outputs = layers.Dense(numResponses, activation='linear')(x)
    return Model(inputs, outputs)

# Data utilities
def load_subject_data(subjects_to_load, base_path):
    all_data = []
    for i in subjects_to_load:
        data_path = f"{base_path}/S{i}_E1.mat"
        subject_data = loadmat(data_path)["Data"]
        all_data.append(subject_data)
    return all_data

def split_data(data, sequence_length=15, n_features=24):
    X = data[:, 36:58]
    Z = data[:, 58:82]
    n_sequences = len(Z) - sequence_length + 1
    inputs = np.zeros((n_sequences, sequence_length, n_features))
    outputs = np.zeros((n_sequences, X.shape[1]))
    for j in range(n_sequences):
        inputs[j] = Z[j:j + sequence_length]
        outputs[j] = X[j + sequence_length - 1]
    return inputs, outputs

# Fitness function
def fitness_function(subject_selection, all_subjects_data, test_subject_data):
    try:
        # Ensure at least 5 subjects are selected
        selected_indices = [i for i, val in enumerate(subject_selection) if val == 1]
        if len(selected_indices) < 5:
            return -1e6  # Penalize invalid selection

        # Combine data from selected subjects
        selected_data = [all_subjects_data[i] for i in selected_indices]
        combined_data = np.concatenate(selected_data, axis=0)
        in_train, out_train = split_data(combined_data)
        in_val, out_val = split_data(test_subject_data)

        # Train the model
        model = build_model()
        model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
        model.fit(in_train, out_train, validation_data=(in_val, out_val), epochs=10, verbose=1)

        # Predict on validation data
        predictions = model.predict(in_val)

        # Compute Pearson correlation coefficient for each DOF
        correlations = []
        for i in range(predictions.shape[1]):
            corr, _ = pearsonr(out_val[:, i], predictions[:, i])
            correlations.append(corr)

        # Return negative mean CC (maximize CC by minimizing negative CC)
        return -np.mean(correlations)

    except Exception as e:
        print(f"Error in fitness function: {str(e)}")
        return -1e6  # Penalize invalid configurations

# Bayesian Optimization
def bayesian_optimization(all_subjects_data, test_subject_data, num_trials=50):
    def objective(trial):
        # Generate binary vector for subject selection
        subject_selection = [
            trial.suggest_int(f"subject_{i}", 0, 1) for i in range(len(all_subjects_data))
        ]
        # Evaluate fitness function
        return fitness_function(subject_selection, all_subjects_data, test_subject_data)

    # Create and run Bayesian optimization study
    study = optuna.create_study(direction="minimize")  # Minimize negative CC
    study.optimize(objective, n_trials=num_trials)

    # Extract the best subset
    best_params = study.best_params
    best_selection = [best_params[f"subject_{i}"] for i in range(len(all_subjects_data))]
    return best_selection, -study.best_value  # Return positive CC

def train_and_test_model(best_selection, all_subjects_data, test_subject_data):
    selected_indices = [i for i, selected in enumerate(best_selection) if selected == 1]
    selected_data = [all_subjects_data[i] for i in selected_indices]
    combined_data = np.concatenate(selected_data, axis=0)
    print(combined_data.shape)
    in_train, out_train = split_data(combined_data)
    print(in_train.shape)
    print(out_train.shape)
    in_test, out_test = split_data(test_subject_data)
    print(in_test.shape)
    print(out_test.shape)

    # Build and train the model
    model = build_model()
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
    model.fit(in_train, out_train, validation_split=0.2, epochs=150, batch_size=32, verbose=1)

    # Make predictions
    predictions = model.predict(in_test)

    # Compute Pearson correlation coefficients for each DOF
    correlations = [pearsonr(out_test[:, i], predictions[:, i])[0] for i in range(predictions.shape[1])]

    # Plot predicted vs actual for each DOF
    for i in range(predictions.shape[1]):  # Loop through each degree of freedom
        n_rows = out_test.shape[0]
        x = np.arange(n_rows)

        y1 = out_test[:, i]  # Actual values for this DOF
        y2 = predictions[:, i]  # Predicted values for this DOF

        plt.figure(figsize=(12, 6))
        plt.plot(x, y1, label='Actual', color='blue')  # Actual line
        plt.plot(x, y2, label='Predicted', color='red')  # Predicted line

        # Title with mean and current correlation coefficient
        cc_current = correlations[i]  # Current DOF correlation
        plot_title = f"DOF {i+1} - CC Current: {cc_current:.2f}, CC Mean: {np.mean(correlations):.2f}"

        plt.title(plot_title)
        plt.xlabel('Index')
        plt.ylabel('Values')
        plt.legend()
        plt.show()

    # Return correlations and their mean
    return correlations, np.mean(correlations)




In [21]:
# Main program
if __name__ == "__main__":
    base_path = '/content/drive/My Drive/Colab Notebooks/processed withstim'
    all_subjects = list(range(1, 41))  # Use 40 subjects
    test_subject = 1
    train_subjects = [s for s in all_subjects if s != test_subject]

    # Load data
    all_subjects_data = load_subject_data(train_subjects, base_path)
    test_subject_data = load_subject_data([test_subject], base_path)[0]

    # Bayesian Optimization
    print("Starting Bayesian Optimization...")
    bo_selection, best_cc = bayesian_optimization(all_subjects_data, test_subject_data, num_trials=70)
    print(f"Best selection: {bo_selection}")
    print(f"Best average CC: {best_cc}")

    # Train and test the model
    correlations, avg_correlation = train_and_test_model(bo_selection, all_subjects_data, test_subject_data)
    print(f"Correlations per degree of freedom: {correlations}")
    print(f"Average Pearson correlation coefficient: {avg_correlation}")

Output hidden; open in https://colab.research.google.com to view.