<a href="https://colab.research.google.com/github/ashhyyyy-vis/sEMG_ML/blob/main/ML_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [47]:
!unzip /content/drive/MyDrive/Synapse/Synapse_Dataset.zip

Archive:  /content/drive/MyDrive/Synapse/Synapse_Dataset.zip
replace Synapse_Dataset/README.md? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


In [48]:
#imports
import os
import numpy as np
import pandas as pd

from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

from scipy.stats import mode

In [49]:
#Main Features

def rms(x):
    return np.sqrt(np.mean(x**2))

def mav(x):
    return np.mean(np.abs(x))

def wl(x):
    return np.sum(np.abs(np.diff(x)))


In [50]:
#Function to Extract RMS,MAV,WL

def extract_features_from_csv(
    csv_path,
    window_size=256,   # 0.5 sec at 512 Hz
    step_size=128      # 50% overlap
):
    data = pd.read_csv(csv_path).values
    # data shape → (samples, 8)

    num_channels = data.shape[1]
    features = []

    for ch in range(num_channels):
        signal = data[:, ch]
        rms_vals, mav_vals, wl_vals = [], [], []

        for start in range(0, len(signal) - window_size + 1, step_size):
            window = signal[start:start + window_size]

            rms_vals.append(rms(window))
            mav_vals.append(mav(window))
            wl_vals.append(wl(window))

        features.extend([
            np.mean(rms_vals),
            np.mean(mav_vals),
            np.mean(wl_vals)
        ])

    return np.array(features)  # shape → (24,)


In [51]:
#Extracting features from Dataset
def load_dataset(root_dir):
    X, y, subjects, sessions = [], [], [], []

    for session_name in os.listdir(root_dir):
        session_path = os.path.join(root_dir, session_name)
        if not os.path.isdir(session_path):
            continue

        session_id = int(session_name.replace("Session", ""))

        for subject_name in os.listdir(session_path):
            subject_path = os.path.join(session_path, subject_name)
            if not os.path.isdir(subject_path):
                continue

            subject_id = int(subject_name.split("_")[-1])

            for file in os.listdir(subject_path):
                if not file.endswith(".csv"):
                    continue

                gesture_id = int(file.split("_")[0].replace("gesture", ""))

                csv_path = os.path.join(subject_path, file)
                features = extract_features_from_csv(csv_path)

                X.append(features)
                y.append(gesture_id)
                subjects.append(subject_id)
                sessions.append(session_id)

    return (
        np.array(X),
        np.array(y),
        np.array(subjects),
        np.array(sessions)
    )


In [52]:
#Loading Dataset

ROOT_DIR = '/content/Synapse_Dataset'

X, y, subjects, sessions = load_dataset(ROOT_DIR)

print("Total samples:", X.shape[0])
print("Feature dimension:", X.shape[1])


Total samples: 2625
Feature dimension: 24


In [53]:
#Train-Test-Split

train_subjects = np.arange(1, 21)   # subjects 1–20
test_subjects  = np.arange(21, 26)  # subjects 21–25

train_idx = np.isin(subjects, train_subjects)
test_idx  = np.isin(subjects, test_subjects)

X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]


In [54]:
#Scaling

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test  = scaler.transform(X_test)


In [55]:
#Random forest Training

rf = RandomForestClassifier(
    n_estimators=300,
    max_depth=None,
    min_samples_split=2,
    min_samples_leaf=1,
    max_features="sqrt",
    class_weight="balanced",
    random_state=42,
    n_jobs=-1
)

rf.fit(X_train, y_train)


In [56]:
#Metric calculation

y_pred = rf.predict(X_test)

acc = accuracy_score(y_test, y_pred)
cm  = confusion_matrix(y_test, y_pred)

print(f"Accuracy: {acc:.4f}")
print("\nConfusion Matrix:")
print(cm)

print("\nClassification Report:")
print(classification_report(y_test, y_pred))


Accuracy: 0.6495

Confusion Matrix:
[[66 13  3 23  0]
 [ 1 61 28 15  0]
 [ 5 35 55 10  0]
 [18 15  5 67  0]
 [ 0  1  0 12 92]]

Classification Report:
              precision    recall  f1-score   support

           0       0.73      0.63      0.68       105
           1       0.49      0.58      0.53       105
           2       0.60      0.52      0.56       105
           3       0.53      0.64      0.58       105
           4       1.00      0.88      0.93       105

    accuracy                           0.65       525
   macro avg       0.67      0.65      0.66       525
weighted avg       0.67      0.65      0.66       525



In [57]:
#feature importance

feature_names = []
for ch in range(1, 9):
    feature_names.extend([
        f"Ch{ch}_RMS",
        f"Ch{ch}_MAV",
        f"Ch{ch}_WL"
    ])

importances = rf.feature_importances_

for name, imp in sorted(zip(feature_names, importances), key=lambda x: x[1], reverse=True):
    print(f"{name}: {imp:.4f}")


Ch1_WL: 0.0696
Ch2_WL: 0.0622
Ch6_WL: 0.0547
Ch4_MAV: 0.0534
Ch6_RMS: 0.0523
Ch4_RMS: 0.0495
Ch7_RMS: 0.0481
Ch6_MAV: 0.0477
Ch7_MAV: 0.0473
Ch4_WL: 0.0468
Ch8_WL: 0.0447
Ch8_RMS: 0.0389
Ch5_MAV: 0.0385
Ch7_WL: 0.0371
Ch8_MAV: 0.0365
Ch5_RMS: 0.0348
Ch3_WL: 0.0317
Ch5_WL: 0.0315
Ch2_RMS: 0.0307
Ch3_MAV: 0.0303
Ch2_MAV: 0.0294
Ch3_RMS: 0.0287
Ch1_RMS: 0.0282
Ch1_MAV: 0.0276


In [58]:
#Window-vise voting

def extract_window_features_from_csv(
    csv_path,
    window_size=256,
    step_size=128
):
    df = pd.read_csv(csv_path)
    df = df.select_dtypes(include=[np.number])

    if df.shape[1] > 8:
        df = df.iloc[:, -8:]

    data = df.to_numpy(dtype=np.float64)

    window_features = []  # ← LIST of feature vectors

    for start in range(0, data.shape[0] - window_size + 1, step_size):
        features = []

        for ch in range(8):
            window = data[start:start + window_size, ch]

            features.extend([
                np.sqrt(np.mean(window**2)),     # RMS
                np.mean(np.abs(window)),         # MAV
                np.sum(np.abs(np.diff(window)))  # WL
            ])

        window_features.append(features)

    return np.array(window_features)
    # shape → (num_windows, 24)


In [59]:
#Window-features

def load_dataset_windows(root_dir):
    X, y, trial_ids, subjects = [], [], [], []

    trial_counter = 0

    for session_name in os.listdir(root_dir):
        session_path = os.path.join(root_dir, session_name)
        if not os.path.isdir(session_path):
            continue

        for subject_name in os.listdir(session_path):
            subject_path = os.path.join(session_path, subject_name)
            if not os.path.isdir(subject_path):
                continue

            subject_id = int(subject_name.split("_")[-1])

            for file in os.listdir(subject_path):
                if not file.endswith(".csv"):
                    continue

                gesture_id = int(file.split("_")[0].replace("gesture", ""))

                csv_path = os.path.join(subject_path, file)
                win_feats = extract_window_features_from_csv(csv_path)

                for wf in win_feats:
                    X.append(wf)
                    y.append(gesture_id)
                    trial_ids.append(trial_counter)
                    subjects.append(subject_id)

                trial_counter += 1

    return (
        np.array(X),
        np.array(y),
        np.array(trial_ids),
        np.array(subjects)
    )


In [60]:
#Test-train-split

X, y, trial_ids, subjects = load_dataset_windows(ROOT_DIR)

train_subjects = np.arange(1, 21)
test_subjects  = np.arange(21, 26)

train_idx = np.isin(subjects, train_subjects)
test_idx  = np.isin(subjects, test_subjects)

X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
trial_test = trial_ids[test_idx]


In [61]:
#scaling

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test  = scaler.transform(X_test)


In [62]:
rf.fit(X_train, y_train)


In [63]:
y_pred_windows = rf.predict(X_test)


In [64]:
#Voting

y_true_trials = []
y_pred_trials = []

for t in np.unique(trial_test):
    idx = trial_test == t

    true_label = mode(y_test[idx], keepdims=False).mode
    pred_label = mode(y_pred_windows[idx], keepdims=False).mode

    y_true_trials.append(true_label)
    y_pred_trials.append(pred_label)


In [65]:
acc = accuracy_score(y_true_trials, y_pred_trials)
cm  = confusion_matrix(y_true_trials, y_pred_trials)

print("Trial-level accuracy:", acc)
print("Confusion Matrix:\n", cm)


Trial-level accuracy: 0.6685714285714286
Confusion Matrix:
 [[70 12  3 20  0]
 [ 0 72 23 10  0]
 [ 2 38 54 11  0]
 [21 15  5 64  0]
 [ 1  0  0 13 91]]


In [66]:
#New features

def zero_crossing(x, threshold=0.01):
    x1 = x[:-1]
    x2 = x[1:]
    return np.sum((x1 * x2 < 0) & (np.abs(x1 - x2) >= threshold))


def slope_sign_change(x, threshold=0.01):
    x1 = x[:-2]
    x2 = x[1:-1]
    x3 = x[2:]

    return np.sum(
        ((x2 - x1) * (x2 - x3) > 0) &
        ((np.abs(x2 - x1) >= threshold) |
         (np.abs(x2 - x3) >= threshold))
    )


In [67]:
#Exctracting all features

def extract_window_features_from_csv(
    csv_path,
    window_size=256,
    step_size=128
):
    df = pd.read_csv(csv_path)
    df = df.select_dtypes(include=[np.number])

    if df.shape[1] > 8:
        df = df.iloc[:, -8:]

    data = df.to_numpy(dtype=np.float64)

    window_features = []

    for start in range(0, data.shape[0] - window_size + 1, step_size):
        features = []

        for ch in range(8):
            window = data[start:start + window_size, ch]

            features.extend([
                np.sqrt(np.mean(window**2)),          # RMS
                np.mean(np.abs(window)),              # MAV
                np.sum(np.abs(np.diff(window))),      # WL
                zero_crossing(window),                # ZC
                slope_sign_change(window)             # SSC
            ])

        window_features.append(features)

    return np.array(window_features)
    # shape → (num_windows, 40)


In [68]:
X, y, trial_ids, subjects = load_dataset_windows(ROOT_DIR)

train_subjects = np.arange(1, 21)
test_subjects  = np.arange(21, 26)

train_idx = np.isin(subjects, train_subjects)
test_idx  = np.isin(subjects, test_subjects)

X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
trial_test = trial_ids[test_idx]


In [69]:
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test  = scaler.transform(X_test)


In [70]:

rf.fit(X_train, y_train)
y_pred_windows = rf.predict(X_test)


In [71]:
y_true_trials = []
y_pred_trials = []

for t in np.unique(trial_test):
    idx = trial_test == t

    true_label = mode(y_test[idx], keepdims=False).mode
    pred_label = mode(y_pred_windows[idx], keepdims=False).mode

    y_true_trials.append(true_label)
    y_pred_trials.append(pred_label)


In [72]:
acc = accuracy_score(y_true_trials, y_pred_trials)
cm  = confusion_matrix(y_true_trials, y_pred_trials)

print("Trial-level accuracy:", acc)
print("Confusion Matrix:\n", cm)


Trial-level accuracy: 0.6723809523809524
Confusion Matrix:
 [[73 13  1 18  0]
 [ 0 69 27  9  0]
 [ 1 38 55 11  0]
 [23 17  6 59  0]
 [ 0  0  0  8 97]]


In [73]:
def subject_wise_normalization(X, subjects):
    X_norm = np.zeros_like(X)

    for subj in np.unique(subjects):
        idx = subjects == subj

        mu = X[idx].mean(axis=0)
        std = X[idx].std(axis=0)

        # avoid divide-by-zero
        std[std == 0] = 1.0

        X_norm[idx] = (X[idx] - mu) / std

    return X_norm


In [74]:
X_norm = subject_wise_normalization(X, subjects)


In [75]:
train_subjects = np.arange(1, 21)
test_subjects  = np.arange(21, 26)

train_idx = np.isin(subjects, train_subjects)
test_idx  = np.isin(subjects, test_subjects)

X_train, X_test = X_norm[train_idx], X_norm[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
trial_test = trial_ids[test_idx]


In [76]:
rf.fit(X_train, y_train)
y_pred_windows = rf.predict(X_test)


In [77]:
y_true_trials = []
y_pred_trials = []

for t in np.unique(trial_test):
    idx = trial_test == t

    true_label = mode(y_test[idx], keepdims=False).mode
    pred_label = mode(y_pred_windows[idx], keepdims=False).mode

    y_true_trials.append(true_label)
    y_pred_trials.append(pred_label)


In [78]:

acc = accuracy_score(y_true_trials, y_pred_trials)
cm  = confusion_matrix(y_true_trials, y_pred_trials)

print("Trial-level accuracy:", acc)
print("Confusion Matrix:\n", cm)


Trial-level accuracy: 0.7561904761904762
Confusion Matrix:
 [[ 68   1   6  28   2]
 [  1  70  26   7   1]
 [  6  18  74   7   0]
 [ 13   5   5  81   1]
 [  0   0   0   1 104]]
