In [1]:
import numpy as np
import pandas as pd
from scipy.signal import butter, lfilter
from sklearn.preprocessing import StandardScaler
from typing import Tuple
import os

# --- 1. CONFIGURATION CONSTANTS ---
# Use the same constants derived from the previous step (assuming Fs=1000Hz)
FS = 1000             # Sampling frequency in Hz
LOW_CUTOFF = 20       # Bandpass low cutoff in Hz
HIGH_CUTOFF = 450     # Bandpass high cutoff in Hz
FILTER_ORDER = 4      # Order of the Butterworth filter

WINDOW_SIZE_MS = 200  # Time window for segmentation (e.g., 200 ms)
OVERLAP_MS = 100      # Overlap between consecutive windows (e.g., 100 ms)

# 🆕 NEW: Set the maximum number of samples (rows) you want per gesture class.
# This will drastically reduce the overall dataset size while keeping class balance.
MAX_SAMPLES_PER_CLASS = 50000 # Example: Limit to 50,000 samples per class (50,000 * 8 classes = 400,000 total samples)

# --- 2. CORE UTILITY FUNCTIONS (Retained from previous step) ---

def butter_bandpass(lowcut: float, highcut: float, fs: float, order: int=4) -> Tuple[np.ndarray, np.ndarray]:
    """Generates the Butterworth filter coefficients (b, a)."""
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def apply_bandpass_filter(data: np.ndarray, lowcut: float, highcut: float, fs: float, order: int=4) -> np.ndarray:
    """Applies the Bandpass filter to all channels of the EMG data."""
    # ... implementation (same as before) ...
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    filtered_data = np.zeros_like(data, dtype=np.float64)
    for i in range(data.shape[1]):
        filtered_data[:, i] = lfilter(b, a, data[:, i])
    return filtered_data

def normalize_data(data: np.ndarray) -> np.ndarray:
    """Applies Z-score (StandardScaler) normalization to the data."""
    scaler = StandardScaler()
    return scaler.fit_transform(data)

def segment_data(X: np.ndarray, y: np.ndarray, fs: float, window_size_ms: int, overlap_ms: int) -> Tuple[np.ndarray, np.ndarray]:
    """Segments the time-series data into overlapping windows."""
    # ... implementation (same as before) ...
    window_samples = int(fs * (window_size_ms / 1000.0))
    overlap_samples = int(fs * (overlap_ms / 1000.0))
    stride = window_samples - overlap_samples

    X_segments, y_segments = [], []

    if window_samples <= 0 or stride <= 0 or window_samples > len(X):
         raise ValueError("Invalid window/overlap configuration.")

    for i in range(0, len(X) - window_samples + 1, stride):
        segment_X = X[i : i + window_samples, :]
        X_segments.append(segment_X)

        segment_y = y[i : i + window_samples]
        unique, counts = np.unique(segment_y, return_counts=True)
        y_segments.append(unique[np.argmax(counts)])

    X_final = np.array(X_segments, dtype=np.float32)
    y_final = np.array(y_segments, dtype=np.int32)
    return X_final, y_final

# --- 3. DATA LOADING AND SAMPLING FUNCTIONS ---

def load_emg_data(file_path: str) -> pd.DataFrame:
    """Loads the entire dataset into a DataFrame."""
    print(f"Loading data from: {file_path}")
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"Error: File not found at {file_path}. Please check the path.")
    
    df = pd.read_csv(file_path)
    return df

def limit_data_by_class_count(df: pd.DataFrame, max_samples_per_class: int) -> Tuple[np.ndarray, np.ndarray]:
    """
    Limits the number of samples for each class to max_samples_per_class
    to create a smaller, class-balanced dataset.
    """
    label_column = 'class'
    emg_columns = [f'channel{i}' for i in range(1, 9)]
    
    print(f"Original total samples: {len(df)}")
    print(f"Limiting to {max_samples_per_class} samples per class...")
    
    # Group by the 'class' label and take a sample of up to max_samples_per_class
    # .sample() with replace=False (default) will take all samples if count is larger than group size.
    df_limited = df.groupby(label_column).apply(
        lambda x: x.sample(min(len(x), max_samples_per_class), random_state=42)
    ).reset_index(drop=True)
    
    X_raw = df_limited[emg_columns].values
    y_raw = df_limited[label_column].values 

    print(f"New total samples after limiting: {len(df_limited)}")
    print(f"Class distribution in new dataset:\n{df_limited[label_column].value_counts().sort_index()}")
    
    return X_raw, y_raw

# --- 4. MAIN EXECUTION ---

if __name__ == '__main__':
    KAGGLE_FILE_PATH = 'EMG-data.csv'

    try:
        # 1. Load Data (Load the full, large DataFrame first)
        df_full = load_emg_data(KAGGLE_FILE_PATH)

        # 2. Limit and Balance Data (Crucial step for small model training) 🤏
        X_raw, y_raw = limit_data_by_class_count(df_full, MAX_SAMPLES_PER_CLASS)
        del df_full # Free up memory

        # --- Data Preprocessing Pipeline (Applied to the smaller subset) ---

        # 3. Bandpass Filter (20-450 Hz)
        print("\nApplying Bandpass Filter...")
        X_filtered = apply_bandpass_filter(X_raw, LOW_CUTOFF, HIGH_CUTOFF, FS, FILTER_ORDER)

        # 4. Normalize Data
        print("Normalizing Data (Z-score)...")
        X_normalized = normalize_data(X_filtered)

        # 5. Segment Data into Time Windows
        print(f"Segmenting data into {WINDOW_SIZE_MS}ms windows with {OVERLAP_MS}ms overlap...")
        X_final, y_final = segment_data(X_normalized, y_raw, FS, WINDOW_SIZE_MS, OVERLAP_MS)

        print("\n--- Final Preprocessing Results ---")
        print(f"Final Time Windows (X.npy) shape: {X_final.shape}")
        print(f"Final Labels (y.npy) shape: {y_final.shape}")

        # 6. Deliverables: Save Final Arrays
        print("Saving X.npy and y.npy to disk...")
        np.save('X.npy', X_final)
        np.save('y.npy', y_final)
        print("✅ Preprocessing complete. Small, class-balanced dataset saved.")

    except Exception as e:
        print(f"Data pipeline failed. Error: {e}")

Loading data from: EMG-data.csv
Original total samples: 4237907
Limiting to 50000 samples per class...


  df_limited = df.groupby(label_column).apply(


New total samples after limiting: 363696
Class distribution in new dataset:
class
0    50000
1    50000
2    50000
3    50000
4    50000
5    50000
6    50000
7    13696
Name: count, dtype: int64

Applying Bandpass Filter...
Normalizing Data (Z-score)...
Segmenting data into 200ms windows with 100ms overlap...

--- Final Preprocessing Results ---
Final Time Windows (X.npy) shape: (3635, 200, 8)
Final Labels (y.npy) shape: (3635,)
Saving X.npy and y.npy to disk...
✅ Preprocessing complete. Small, class-balanced dataset saved.
