In [15]:
import os
import numpy as np
!pip install -q tensorflow
import tensorflow as tf
from sklearn.svm import SVC
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
import seaborn as sns
import matplotlib.pyplot as plt

UTILITIES

In [16]:
"""Method for adding +4 dead channels to the wrist channels, 
   so they match the forearm shape.  (1229,12) → (1229,16)
"""
def pad_to_16ch(windows):
    if windows.shape[-1] == 16:
        return windows
    padded = np.zeros((windows.shape[0], windows.shape[1], 16), dtype=np.float32)
    padded[:, :, :windows.shape[-1]] = windows
    return padded

def normalize_batch(batch):
    # Z-score normalize per batch to avoid global mean memory issues
    mean = np.mean(batch, axis=(0, 1), keepdims=True)
    std = np.std(batch, axis=(0, 1), keepdims=True) + 1e-8
    return (batch - mean) / std

In [19]:
import random

""" Data Generator (loads npz files in small chunks) """
class EMGDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, files, batch_size=64, steps_per_epoch=1000):
        self.files = files
        self.batch_size = batch_size
        self.steps_per_epoch = steps_per_epoch

    def __len__(self):
        return self.steps_per_epoch

    def __getitem__(self, idx):
        X_batch, y_batch = [], []
        while len(X_batch) < self.batch_size:
            f = random.choice(self.files)
            d = np.load(f)
            # Randomly pick forearm or wrist from this file
            if random.random() < 0.5 and len(d['forearm_windows']) > 0:
                x = d['forearm_windows']
                y = d['forearm_labels'] - 1
            else:
                x = pad_to_16ch(d['wrist_windows'])
                y = d['wrist_labels'] - 1
            if len(x) == 0: 
                continue
            i = np.random.randint(0, len(x))  # random window from that file
            X_batch.append(x[i])
            y_batch.append(y[i])
        X_batch = np.array(X_batch, dtype=np.float32)
        y_batch = np.array(y_batch, dtype=np.int32)
        # normalize batch
        X_batch = normalize_batch(X_batch)
        return X_batch, y_batch

In [20]:
# ---------- CNN feature extractor ----------

cnn = tf.keras.Sequential([
    tf.keras.layers.Conv1D(64, 5, dilation_rate=1, activation='relu', input_shape=(1229, 16)),
    tf.keras.layers.Conv1D(128, 5, dilation_rate=2, activation='relu'),
    tf.keras.layers.Conv1D(256, 3, dilation_rate=4, activation='relu'),
    tf.keras.layers.GlobalMaxPooling1D()  # -> (batch, 256)
])

# Add classification head temporarily for training
model = tf.keras.Sequential([
    cnn,
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(17, activation='softmax')  # 16 gestures + 1 rest
])

model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
# ---------- Collect file paths & train CNN ----------

data_dir = r".\Processed_data"
all_files = [os.path.join(dp, f)
             for dp, dn, fn in os.walk(data_dir)
             for f in fn if f.endswith(".npz")]

# Split file list into train/test (80/20)
train_files, test_files = train_test_split(all_files, test_size=0.2, random_state=42)
train_gen = EMGDataGenerator(train_files, batch_size=64, steps_per_epoch=1000)
test_gen = EMGDataGenerator(test_files, batch_size=64, steps_per_epoch=200)

# Train the CNN
model.fit(train_gen, validation_data=test_gen, epochs=10)

Epoch 1/10
[1m   1/1000[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m14:20:06[0m 52s/step - accuracy: 0.0156 - loss: 2.9465

In [None]:
# ---------- Freeze CNN and extract features ----------

for layer in cnn.layers:
    layer.trainable = False

def extract_features(files):
    X_feats, y_all = [], []
    for f in files:
        d = np.load(f)
        fw, fl = d['forearm_windows'], d['forearm_labels']
        ww, wl = d['wrist_windows'], d['wrist_labels']
        if len(fw) > 0:
            fw = normalize_batch(fw)
            feats = cnn.predict(fw, batch_size=64, verbose=0)
            X_feats.append(feats)
            y_all.append(fl)
        if len(ww) > 0:
            ww = normalize_batch(pad_to_16ch(ww))
            feats = cnn.predict(ww, batch_size=64, verbose=0)
            X_feats.append(feats)
            y_all.append(wl)
    return np.concatenate(X_feats, axis=0), np.concatenate(y_all, axis=0)

train_feats, y_train = extract_features(train_files)
test_feats, y_test = extract_features(test_files)

print("Feature shapes:", train_feats.shape, test_feats.shape)

In [None]:
# ---------- Train and evaluate SVM ----------

scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_feats)
test_scaled = scaler.transform(test_feats)

svm = SVC(kernel='rbf', C=10, gamma='scale')
svm.fit(train_scaled, y_train)

In [None]:
y_pred = svm.predict(test_scaled)

acc = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred, average='macro')
rec = recall_score(y_test, y_pred, average='macro')
f1 = f1_score(y_test, y_pred, average='macro')

print("Accuracy :", acc)
print("Precision:", prec)
print("Recall   :", rec)
print("F1-score :", f1)
print("\nFull classification report:\n")
print(classification_report(y_test, y_pred))

cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

In [None]:
#========================================================================================================================

LOAD ALL FOREARM + WRIST DATA

In [22]:
"""Method for adding 4 aditional dead channels to the wrist channels, 
   so they match the forearm shape.  (1229,12) → (1229,16)
"""
def pad_to_16ch(windows):
    if windows.shape[-1] == 16:
        return windows
    padded = np.zeros((windows.shape[0], windows.shape[1], 16), dtype=np.float32)
    padded[:, :, :windows.shape[-1]] = windows
    return padded

In [23]:
data_dir = r".\Processed_data"
X_list, y_list = [], []

for session_folder in os.listdir(data_dir):
    session_path = os.path.join(data_dir, session_folder)
    for file in os.listdir(session_path):
        if not file.endswith(".npz"):
            continue
        d = np.load(os.path.join(session_path, file))

        # Forearm
        fw = d['forearm_windows']
        fl = d['forearm_labels'] - 1
        if len(fw) > 0:
            X_list.append(fw)
            y_list.append(fl)

        # Wrist
        ww = d['wrist_windows']
        wl = d['wrist_labels'] - 1
        if len(ww) > 0:
            ww_padded = pad_to_16ch(ww)
            X_list.append(ww_padded)
            y_list.append(wl)

X = np.concatenate(X_list, axis=0)   # (N, 1229, 16)
y = np.concatenate(y_list, axis=0)   # (N,)
print("Loaded data:", X.shape, y.shape)

MemoryError: Unable to allocate 33.7 GiB for an array with shape (460530, 1228, 16) and data type float32

In [None]:
# Optional normalization (z-score per channel)
X = (X - np.mean(X, axis=(0,1))) / (np.std(X, axis=(0,1)) + 1e-8)

# Do a train/test split (80/20)
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

In [None]:
# Define 1D CNN feature extractor
cnn = tf.keras.Sequential([
    tf.keras.layers.Conv1D(filters=64, kernel_size=5, dilation_rate=1, activation='relu', input_shape=(1229,16)),
    tf.keras.layers.Conv1D(filters=128, kernel_size=5, dilation_rate=2, activation='relu'),
    tf.keras.layers.Conv1D(filters=256, kernel_size=3, dilation_rate=4, activation='relu'),
    tf.keras.layers.GlobalMaxPooling1D()  # -> (batch, 256)
])

# Build a temp classification head for CNN training only
model = tf.keras.Sequential([
    cnn,
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(len(np.unique(y)), activation='softmax')
])

model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

In [None]:
# Train CNN feature extractor
model.fit(X_train, y_train, validation_split=0.1, epochs=10, batch_size=64, verbose=1)

In [None]:
# Freeze CNN and extract features
for layer in cnn.layers:
    layer.trainable = False

In [None]:
train_features = cnn.predict(X_train, batch_size=64)
test_features = cnn.predict(X_test, batch_size=64)
print("Feature shape:", train_features.shape)  # (N_train, 256)

In [None]:
# Train SVM on extracted features
scaler = StandardScaler()
train_scaled = scaler.fit_transform(train_features)
test_scaled = scaler.transform(test_features)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

svm = SVC(kernel='rbf', C=10, gamma='scale')
svm.fit(train_scaled, y_train)

# Evaluate the SVM model
y_pred = svm.predict(test_scaled)

# metrics
acc = accuracy_score(y_test, y_pred)
prec = precision_score(y_test, y_pred, average='macro')
rec = recall_score(y_test, y_pred, average='macro')
f1 = f1_score(y_test, y_pred, average='macro')

print("Accuracy :", acc)
print("Precision:", prec)
print("Recall   :", rec)
print("F1-score :", f1)
print("\nFull classification report:\n")
print(classification_report(y_test, y_pred))

# confusion matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()