In [3]:
import os
import glob
import numpy as np
import scipy.io as sio
from scipy.signal import butter, filtfilt, iirnotch

# ================================================================
# 1. CONFIGURACIÓN
# ================================================================
ROOT = "DATA"      # Carpeta que contiene DB1, DB2, DB3
FS = 2000            # Sampling rate EMG
WINDOW_MS = 200      # Ventana en ms
OVERLAP = 0.5        # 50% overlap
N_CHANNELS = 10      # Usaremos solo 10 canales

# Mapeo unificado de gestos a 0-3
GESTURE_MAP = {
    # DB1
    1: 0, 9: 1, 18: 2, 21: 3,
    # DB2/DB3
    18: 0, 26: 1, 35: 2, 38: 3
}

# Ejercicio a usar por base
EXERCISE_MAP = {
    "DB1": "E3",
    "DB2": "E2",
    "DB3": "E2"
}

# ================================================================
# 2. FUNCIONES DE FILTRADO Y NORMALIZACIÓN
# ================================================================
def bandpass_filter(signal, fs=FS, low=20, high=450, order=4):
    nyq = fs / 2
    b, a = butter(order, [low / nyq, high / nyq], btype="band")
    return filtfilt(b, a, signal, axis=0)

def notch_filter(signal, fs=FS, freq=50, Q=30):
    b, a = iirnotch(freq/(fs/2), Q)
    return filtfilt(b, a, signal, axis=0)

def normalize(signal):
    mean = np.mean(signal, axis=0)
    std = np.std(signal, axis=0) + 1e-8
    return (signal - mean) / std

# ================================================================
# 3. VENTANEO (sliding windows)
# ================================================================
def sliding_window(X, y, window_ms=WINDOW_MS, overlap=OVERLAP, fs=FS):
    window_size = int(fs * window_ms / 1000)
    step = int(window_size * (1 - overlap))

    X_windows = []
    y_windows = []

    for start in range(0, X.shape[0] - window_size + 1, step):
        end = start + window_size
        x_win = X[start:end]
        y_win = y[start:end]

        # etiqueta final = modo de la ventana
        label = np.bincount(y_win).argmax()
        X_windows.append(x_win)
        y_windows.append(label)

    return np.array(X_windows), np.array(y_windows)

# ================================================================
# 4. CARGA DE UN SUJETO
# ================================================================
def load_subject(subject_path, db_name):
    exercise = EXERCISE_MAP[db_name] + ".mat"
    exercise_file = os.path.join(subject_path, exercise)

    if not os.path.exists(exercise_file):
        print(f"⚠ Advertencia: {exercise_file} no existe")
        return None, None

    X_list, y_list = [], []

    mat = sio.loadmat(exercise_file)
    emg = mat["emg"][:, :N_CHANNELS]  # 10 canales

    # Cargar etiqueta restimulus/stimulus
    labels = mat.get("restimulus")
    if labels is None:
        labels = mat["stimulus"]
    labels = labels.flatten()

    # Filtrar los gestos deseados
    mask = np.isin(labels, list(GESTURE_MAP.keys()))
    if not np.any(mask):
        return None, None

    emg = emg[mask]
    labels = labels[mask]
    labels = np.array([GESTURE_MAP[int(l)] for l in labels])

    X_list.append(emg)
    y_list.append(labels)

    return np.vstack(X_list), np.concatenate(y_list)


  
# 5. CARGA DE TODAS LAS BASES
# ================================================================
def load_all(ROOT):
    X_all, y_all = [], []

    for db_name in ["DB1", "DB2", "DB3"]:
        db_path = os.path.join(ROOT, db_name)
        if not os.path.isdir(db_path):
            print(f"⚠ {db_path} no encontrado, saltando")
            continue

        for subject in sorted(os.listdir(db_path)):
            subject_path = os.path.join(db_path, subject)
            if not os.path.isdir(subject_path):
                continue

            X, y = load_subject(subject_path, db_name)
            if X is None:
                continue

            X_all.append(X)
            y_all.append(y)
            print(f"✓ Cargado: {db_name}/{subject} ({X.shape[0]} muestras)")

    if not X_all:
        return None, None

    return np.vstack(X_all), np.concatenate(y_all)

# ================================================================
# 6. PIPELINE COMPLETO: UNIFICACIÓN + PREPROCESAMIENTO + VENTANEO
# ================================================================
def build_final_dataset(ROOT):
    print("Cargando y unificando dataset...")
    X, y = load_all(ROOT)
    if X is None:
        print("No se cargaron datos")
        return None, None

    print("Aplicando preprocesamiento (notch + bandpass + normalización)...")
    X = notch_filter(X)
    X = bandpass_filter(X)
    X = normalize(X)

    print("Aplicando ventaneo...")
    Xw, yw = sliding_window(X, y)

    print("Dataset final listo:")
    print("Xw shape:", Xw.shape)
    print("yw shape:", yw.shape)
    print("Clases:", np.unique(yw))

    # Guardar
    np.save("X.npy", Xw)
    np.save("y.npy", yw)
    print("Archivos guardados: X.npy, y.npy")

    return Xw, yw

# ================================================================
# 7. EJECUCIÓN
# ================================================================
if __name__ == "__main__":
    Xw, yw = build_final_dataset(ROOT)


Cargando y unificando dataset...
✓ Cargado: DB1/s1 (15682 muestras)
✓ Cargado: DB1/s10 (15074 muestras)
✓ Cargado: DB1/s11 (14670 muestras)
✓ Cargado: DB1/s12 (15061 muestras)
✓ Cargado: DB1/s13 (13065 muestras)
✓ Cargado: DB1/s14 (16874 muestras)
✓ Cargado: DB1/s15 (14892 muestras)
✓ Cargado: DB1/s16 (13637 muestras)
✓ Cargado: DB1/s17 (14973 muestras)
✓ Cargado: DB1/s18 (16618 muestras)
✓ Cargado: DB1/s19 (16334 muestras)
✓ Cargado: DB1/s2 (15462 muestras)
✓ Cargado: DB1/s20 (15336 muestras)
✓ Cargado: DB1/s21 (15130 muestras)
✓ Cargado: DB1/s22 (15008 muestras)
✓ Cargado: DB1/s23 (15288 muestras)
✓ Cargado: DB1/s24 (13462 muestras)
✓ Cargado: DB1/s25 (14970 muestras)
✓ Cargado: DB1/s26 (13581 muestras)
✓ Cargado: DB1/s27 (15869 muestras)
⚠ Advertencia: DATA\DB1\s3\E3.mat no existe
✓ Cargado: DB1/s4 (17666 muestras)
✓ Cargado: DB1/s5 (16529 muestras)
✓ Cargado: DB1/s6 (16034 muestras)
✓ Cargado: DB1/s7 (15161 muestras)
✓ Cargado: DB1/s8 (15682 muestras)
✓ Cargado: DB1/s9 (13693 muest