## Parts 1–4 — Batch processing for up to 27 subjects

These cells reproduce Parts 1–4 of the main notebook but loop over up to 27 subject `.mat` files found in the `s2` directory. They use the exact same functions and variable names from your notebook, so you can run the notebook cells in order.

In [None]:
# Part 1: Imports and setup (same as your notebook)
import os
import numpy as np
from scipy.io import loadmat
from scipy.ndimage import convolve1d
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, ConfusionMatrixDisplay

from sklearn.feature_selection import mutual_info_classif, SelectKBest
from sklearn.decomposition import PCA
import pandas as pd

np.random.seed(42)

# Directory with subject .mat files and limit to 27 subjects
subject_folder_name = 's2'
subject_files = sorted([f for f in os.listdir(subject_folder_name) if f.lower().endswith('.mat')])
subject_files = subject_files[:27]  # process up to 27 subjects

print(f'Found {len(subject_files)} .mat files (using up to 27):')
for i, f in enumerate(subject_files, start=1):
    print(i, f)

In [None]:
# Part 1 (continued): Visualize & preprocess each subject (same code as notebook)
mov_mean_length = 25
mov_mean_weights = np.ones(mov_mean_length) / mov_mean_length

# We'll store per-subject preprocessed containers in a dict
subjects = {}

for fname in subject_files:
    path = os.path.join(subject_folder_name, fname)
    data = loadmat(path)
    # Print dataset variables (skip __ fields)
    print('\nSubject file:', fname)
    print('Dataset variables:')
    for key in data.keys():
        if not key.startswith('__'):
            print(' ', key)

    # variables named as in your notebook
    emg = data['emg']                 # shape: (T, n_channels)
    stimulus = data['restimulus']     # corrected labels
    repetition = data['rerepetition'] # corrected repetition indices

    print(f"EMG data dimension : {emg.shape}")
    print(f"EMG data type : {type(emg)}")

    # initialize windows & envelopes for this subject (use same method as notebook)
    n_stimuli = len(np.unique(stimulus)) - 1
    n_repetitions = len(np.unique(repetition)) - 1
    emg_windows = [[None for repetition_idx in range(n_repetitions)] for stimuli_idx in range(n_stimuli)]
    emg_envelopes = [[None for repetition_idx in range(n_repetitions)] for stimuli_idx in range(n_stimuli)]

    for stimuli_idx in range(n_stimuli):
        for repetition_idx in range(n_repetitions):
            idx = np.logical_and(stimulus == stimuli_idx + 1, repetition == repetition_idx + 1).flatten()
            emg_windows[stimuli_idx][repetition_idx] = emg[idx, :]
            emg_envelopes[stimuli_idx][repetition_idx] = convolve1d(emg_windows[stimuli_idx][repetition_idx], mov_mean_weights, axis=0)

    # store
    subjects[fname] = {
        'emg': emg,
        'stimulus': stimulus,
        'repetition': repetition,
        'emg_windows': emg_windows,
        'emg_envelopes': emg_envelopes,
        'n_stimuli': n_stimuli,
        'n_repetitions': n_repetitions,
    }

    # optional: plot first channel for the first subject only
    if len(subjects) == 1:
        plt.close('all')
        fig, ax = plt.subplots(figsize=(8,2))
        EMG_channel = 5 if emg.shape[1] > 5 else 0
        ax.plot(emg[:, EMG_channel])
        ax.set_title(f"EMG signal channel {EMG_channel} ({fname})")
        ax.set_xlabel('Data points')
        ax.set_ylabel('Amplitude')
        plt.show()

In [None]:
# Part 2: build_dataset_from_ninapro function (exactly as in notebook)
def build_dataset_from_ninapro(emg, stimulus, repetition, features=None):
    # Calculate the number of unique stimuli and repetitions, subtracting 1 to exclude the resting condition
    n_stimuli = np.unique(stimulus).size - 1
    n_repetitions = np.unique(repetition).size - 1
    # Total number of samples is the product of stimuli and repetitions
    n_samples = n_stimuli * n_repetitions
    
    # Number of channels in the EMG data
    n_channels = emg.shape[1]
    # Calculate the total number of features by summing the number of channels for each feature
    n_features = sum(n_channels for feature in features)
    
    # Initialize the dataset and labels arrays with zeros
    dataset = np.zeros((n_samples, n_features))
    labels = np.zeros(n_samples)
    current_sample_index = 0
    
    # Loop over each stimulus and repetition to extract features
    for i in range(n_stimuli):
        for j in range(n_repetitions):
            # Assign the label for the current sample
            labels[current_sample_index] = i + 1
            # Calculate the current sample index based on stimulus and repetition
            current_sample_index = i * n_repetitions + j
            current_feature_index = 0
            # Select the time steps corresponding to the current stimulus and repetition
            selected_tsteps = np.logical_and(stimulus == i + 1, repetition == j + 1).squeeze()
            
            # Loop over each feature function provided
            for feature in features:
                # Determine the indices in the dataset where the current feature will be stored
                selected_features = np.arange(current_feature_index, current_feature_index + n_channels)
                # Apply the feature function to the selected EMG data and store the result
                dataset[current_sample_index, selected_features] = feature(emg[selected_tsteps, :])
                # Update the feature index for the next feature
                current_feature_index += n_channels

            # Move to the next sample
            current_sample_index += 1
            
    # Return the constructed dataset and corresponding labels
    return dataset, labels

## Part 3: Feature extraction & visualization (for each subject)

We define the same features as in the notebook (MAV, STD, ...). Default features are `[mav, std]`.

In [None]:
# Define the features (same lambdas as notebook)
mav = lambda x: np.mean(np.abs(x), axis=0)
std = lambda x: np.std(x, axis=0)
maxav = lambda x: np.max(np.abs(x), axis=0)
rms = lambda x: np.sqrt(np.mean(x**2, axis=0))
wl = lambda x: np.sum(np.abs(np.diff(x, axis=0)), axis=0)
ssc = lambda x: np.sum((np.diff(x, axis=0)[:-1, :] * np.diff(x, axis=0)[1:, :]) < 0, axis=0)

# Choose features (default to the two used in the notebook)
features = [mav, std]

# Build dataset and labels per subject using the same function
subject_datasets = {}
for fname, sub in subjects.items():
    print('Building dataset for', fname)
    dataset, labels = build_dataset_from_ninapro(
        emg=sub['emg'],
        stimulus=sub['stimulus'],
        repetition=sub['repetition'],
        features=features
    )
    print(f"  dataset shape: {dataset.shape}, labels shape: {labels.shape}")
    subject_datasets[fname] = {'dataset': dataset, 'labels': labels}

## Part 4: Gradient boosting classification (per subject)

We run the same train/val/test split, scaling, baseline GradientBoostingClassifier fit, and evaluation (`accuracy`, `macro-F1`) that your notebook uses. Results are collected into a pandas DataFrame for the processed subjects.

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd

results = []

for fname, data_pair in subject_datasets.items():
    X = data_pair['dataset']
    y = data_pair['labels']
    print('\nTraining subject:', fname)
    # train/val/test split as in notebook
    X_train, X_temp, y_train, y_temp = train_test_split(
        X, y, test_size=0.4, stratify=y, random_state=0
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=0
    )

    scaler = StandardScaler()
    X_train_z = scaler.fit_transform(X_train)
    X_val_z = scaler.transform(X_val)
    X_test_z = scaler.transform(X_test)

    gb = GradientBoostingClassifier(random_state=0)
    gb.fit(X_train_z, y_train)

    y_val_pred = gb.predict(X_val_z)
    print('  Baseline val accuracy:', accuracy_score(y_val, y_val_pred))
    print('  Baseline val macro-F1:', f1_score(y_val, y_val_pred, average='macro'))

    y_test_pred = gb.predict(X_test_z)
    baseline_acc = accuracy_score(y_test, y_test_pred)
    baseline_f1 = f1_score(y_test, y_test_pred, average='macro')
    print('  Test accuracy:', baseline_acc)
    print('  Test macro-F1:', baseline_f1)

    # Collect results
    results.append({
        'subject_file': fname,
        'baseline_acc': float(baseline_acc),
        'baseline_f1': float(baseline_f1),
    })

results_df = pd.DataFrame(results).sort_values('subject_file').reset_index(drop=True)
print('\nSummary for all processed subjects:')
print(results_df)
results_df.to_csv('results_27_subjects_part1to4.csv', index=False)
print('\nSaved results to results_27_subjects_part1to4.csv')