In [72]:
import numpy as np
import pandas as pd
from pathlib import Path
from scipy.signal import resample


In [73]:
import os
os.getcwd()

'c:\\Laukik\\emg-gait\\Notebooks'

In [74]:
# ================= CONFIG =================
BASE_DIR = Path(".")

ML_LABEL_FILE = BASE_DIR / "data" / "clusters" / "features_master_labeled_gmm4.csv"

EMG_CSV_DIR = BASE_DIR / "data" / "filtered"
DL_OUTPUT_DIR = BASE_DIR / "data" / "dl_ready_trials"
DL_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

FS = 2148.1481
TARGET_LEN = 1000

USE_RMS = True
RMS_WINDOW_MS = 50
RMS_WINDOW_SAMPLES = max(1, int((RMS_WINDOW_MS / 1000.0) * FS))

CONF_COL = "performance_level_confidence"
CONF_THRESHOLD = 0.6   # set None to disable filtering


In [75]:
print("CWD:", os.getcwd())
print("EMG_CSV_DIR exists:", EMG_CSV_DIR.exists())
print("Number of CSV files:", len(list(EMG_CSV_DIR.glob("*.csv"))))
print("ML label file exists:", ML_LABEL_FILE.exists())


CWD: c:\Laukik\emg-gait\Notebooks
EMG_CSV_DIR exists: True
Number of CSV files: 210
ML label file exists: True


In [76]:
def moving_rms(x, window):
    if window <= 1:
        return np.abs(x)
    kernel = np.ones(window) / window
    return np.sqrt(np.convolve(x**2, kernel, mode='same'))


In [77]:
def preprocess_emg_csv_for_dl(csv_path):
    df = pd.read_csv(csv_path)

    # drop time columns
    time_cols = [c for c in df.columns if 'Time' in c.lower()]
    if time_cols:
        df = df.drop(columns=time_cols)

    X = df.values.astype(np.float32)  # (time, channels)

    # rectification
    X = np.abs(X)

    # RMS envelope (optional but recommended)
    if USE_RMS:
        X_env = np.zeros_like(X)
        for ch in range(X.shape[1]):
            X_env[:, ch] = moving_rms(X[:, ch], RMS_WINDOW_SAMPLES)
        X = X_env

    # per-trial, per-channel normalization (CRITICAL)
    eps = 1e-8
    mean = X.mean(axis=0, keepdims=True)
    std = X.std(axis=0, keepdims=True)
    X = (X - mean) / (std + eps)

    # transpose to (channels, time)
    X = X.T

    # resample to fixed length
    X_out = np.zeros((X.shape[0], TARGET_LEN), dtype=np.float32)
    for ch in range(X.shape[0]):
        X_out[ch] = resample(X[ch], TARGET_LEN)

    return X_out


In [78]:
df_ml = pd.read_csv(ML_LABEL_FILE)

# optional: confidence filtering
if CONF_THRESHOLD is not None:
    df_ml = df_ml[df_ml[CONF_COL] >= CONF_THRESHOLD]

df_ml = df_ml.reset_index(drop=True)

print("Trials selected for DL:", len(df_ml))
print("Players:", df_ml['player'].nunique())
print("Label distribution:")
print(df_ml['performance_level'].value_counts())


Trials selected for DL: 210
Players: 42
Label distribution:
performance_level
Poor             60
Below-Average    60
Excellent        48
Average          42
Name: count, dtype: int64


In [82]:
failed = []

for _, row in df_ml.iterrows():
    csv_name = Path(row['file']).name
    csv_path = EMG_CSV_DIR / csv_name

    if not csv_path.exists():
        print("Missing CSV:", csv_name)
        failed.append(csv_name)
        continue

    try:
        X_dl = preprocess_emg_csv_for_dl(csv_path)

        out_name = Path(csv_name).stem + ".npy"
        out_path = DL_OUTPUT_DIR / out_name

        np.save(out_path, X_dl)

    except Exception as e:
        print("Failed:", csv_name, "->", e)
        failed.append(csv_name)

print("Done.")
print("Failed files:", failed)
print(csv_path)


Done.
Failed files: []
data\filtered\dummy35_5.csv


In [83]:
sample = next(DL_OUTPUT_DIR.glob("*.npy"))
X = np.load(sample)

print("Sample tensor shape:", X.shape)
print("Mean per channel:", X.mean(axis=1))
print("Std per channel:", X.std(axis=1))


Sample tensor shape: (9, 1000)
Mean per channel: [ 1.2207032e-07  3.0517580e-08 -4.5776368e-08 -3.0517580e-08
  7.6293944e-08  0.0000000e+00 -6.1035159e-08 -3.0517580e-08
  0.0000000e+00]
Std per channel: [0.9997034  0.99999976 0.99999976 0.9999996  0.99999917 0.9999995
 0.9999996  0.9999994  0.99999654]
