# Lab 1: EMG Signal Processing and Gesture Classification

**BioRobotics**

---

## Learning Objectives

By the end of this lab, you will be able to:

1. **Load and structure EMG data** from the Myo armband
2. **Apply signal processing techniques** including filtering, rectification, and envelope extraction
3. **Extract meaningful features** from EMG signals (time and frequency domain)
4. **Perform exploratory data analysis** to understand gesture patterns
5. **Build and evaluate classifiers** using LDA, QDA, and K-means
6. **Interpret results** using confusion matrices and cross-validation

---

## How to Use This Notebook

This notebook is organized in two phases:

**Phase 1 (Parts 1-3):** Learn the fundamentals using a **single sample recording**
- Understand data format
- Apply signal processing
- Extract features

**Phase 2 (Parts 4-6):** Apply to your **complete dataset**
- Load all trials
- Exploratory analysis
- Classification

**Code guidance:**
- **Completed code** demonstrates techniques
- **`# TODO` sections** require your implementation
- **Questions (Q1-Q10)** must be answered in the designated cells

---

## Part 0: Setup and Imports

In [None]:
# Standard libraries
import numpy as np
import pandas as pd
from pathlib import Path
import os
import glob
import warnings
warnings.filterwarnings('ignore')

# Signal processing
import scipy.signal as signal
from scipy import stats
from scipy.fft import fft, fftfreq

# Machine learning
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.cluster import KMeans
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Configure matplotlib
%matplotlib inline
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 11
plt.rcParams['axes.grid'] = True
plt.rcParams['grid.alpha'] = 0.3
sns.set_style('whitegrid')

# Set random seed for reproducibility
np.random.seed(42)

# Constants
SAMPLE_RATE = 200  # Hz (Myo EMG sample rate)
N_CHANNELS = 8     # Myo has 8 EMG channels

print("✓ All imports successful!")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")

---

# Phase 1: Single Recording Analysis

In this phase, we'll work with **one EMG recording** to understand the data format, signal processing, and feature extraction.

---

## Part 1: Loading and Understanding EMG Data

### 1.1 EMG Data File Format

The data files from the visualizer have this structure:

```
# participant_id: P01
# gesture: fist
# trial_number: 1
# stream_name: Myo_EMG
# stream_type: emg
# timestamp: 20250125_143022
# samples: 1000
# duration_sec: 5.000
#
timestamp,emg_1,emg_2,emg_3,emg_4,emg_5,emg_6,emg_7,emg_8
1234567.123456,3.0,-2.0,5.0,...
```

Key points:
- **Header lines** start with `#` and contain metadata
- **8 EMG channels** from the Myo armband pods (emg_1 through emg_8)
- **Timestamps** are LSL timestamps (seconds since system start)
- **Sample rate** is approximately 200 Hz
- **Amplitude range** is typically -128 to +127 (8-bit signed)

### 1.2 Function to Load EMG Files

In [None]:
def load_emg_file(filepath):
    """
    Load an EMG CSV file with metadata header.
    
    Calculates the EFFECTIVE sample rate from recorded timestamps to handle
    hardware variation (Myo typically runs at ~196Hz, not exactly 200Hz).
    Creates a uniform time vector that preserves correct total duration.
    
    Parameters
    ----------
    filepath : str or Path
        Path to the CSV file
    
    Returns
    -------
    data : pd.DataFrame
        EMG data with uniform 'time' column based on effective sample rate
    metadata : dict
        Includes 'effective_sample_rate' and 'nominal_sample_rate'
    """
    metadata = {}
    header_lines = 0
    
    # Read header lines starting with #
    with open(filepath, 'r') as f:
        for line in f:
            if line.startswith('#'):
                header_lines += 1
                if ':' in line:
                    key, value = line[1:].split(':', 1)
                    key = key.strip()
                    value = value.strip()
                    try:
                        value = int(value)
                    except ValueError:
                        try:
                            value = float(value)
                        except ValueError:
                            pass
                    metadata[key] = value
            else:
                break
    
    # Read the data
    data = pd.read_csv(filepath, skiprows=header_lines)
    
    # Calculate EFFECTIVE sample rate from this recording
    n_samples = len(data)
    if 'timestamp' in data.columns and n_samples > 1:
        total_duration = data['timestamp'].iloc[-1] - data['timestamp'].iloc[0]
        effective_rate = (n_samples - 1) / total_duration
        
        # Create uniform time vector at the effective rate
        # This preserves correct total duration while giving uniform sampling
        data['time'] = np.arange(n_samples) / effective_rate
        
        # Store both rates in metadata
        metadata['effective_sample_rate'] = effective_rate
        metadata['nominal_sample_rate'] = 200  # What Myo claims
        metadata['recorded_duration'] = total_duration
        
        # Keep original timestamps for reference
        data['timestamp_raw'] = data['timestamp']
    else:
        # Fallback if no timestamps
        data['time'] = np.arange(n_samples) / 200
        metadata['effective_sample_rate'] = 200
        metadata['nominal_sample_rate'] = 200
    
    return data, metadata

print("✓ load_emg_file() function defined (with effective sample rate correction)")


### 1.3 Load Sample Recording

We'll generate a synthetic EMG recording to demonstrate the concepts. Later, you'll load your own data.

In [None]:
# Path to sample recording (provided in the data folder)
SAMPLE_FILE = '../data/p1_test_trial002_emg_20260125_202827.csv'

# Load the sample recording
sample_df, sample_meta = load_emg_file(SAMPLE_FILE)

print("Sample Recording Metadata:")
print("-" * 40)
for key, value in sample_meta.items():
    print(f"  {key}: {value}")

print(f"\nData shape: {sample_df.shape}")
print(f"Columns: {list(sample_df.columns)}")
print(f"Duration: {sample_df['time'].iloc[-1]:.2f} seconds")
print(f"Sample rate: {len(sample_df) / sample_df['time'].iloc[-1]:.1f} Hz")
display(sample_df.head(10))
display(sample_df.tail(10))

### 1.4 Visualize the Raw EMG Recording

In [None]:
def plot_emg_recording(data, title="EMG Recording", channels=None):
    """
    Plot all EMG channels from a single recording.
    """
    if channels is None:
        channels = [f'emg_{i+1}' for i in range(N_CHANNELS)]
    
    fig, axes = plt.subplots(len(channels), 1, figsize=(14, 2*len(channels)), sharex=True)
    if len(channels) == 1:
        axes = [axes]
    
    colors = plt.cm.Set2(np.linspace(0, 1, N_CHANNELS))
    
    for i, (ax, ch) in enumerate(zip(axes, channels)):
        ax.plot(data['time'], data[ch], linewidth=0.5, color=colors[i])
        ax.set_ylabel(ch.replace('_', ' ').upper(), fontsize=10)
        ax.set_ylim([-100, 100])
        ax.axhline(y=0, color='gray', linestyle='-', linewidth=0.5, alpha=0.5)
    
    axes[-1].set_xlabel('Time (s)', fontsize=12)
    fig.suptitle(title, fontsize=14, fontweight='bold')
    plt.tight_layout()
    plt.show()

# Plot the sample recording
gesture_name = sample_meta.get('gesture', 'unknown').upper()
plot_emg_recording(sample_df, title=f"Raw EMG - {gesture_name} Gesture (Trial {sample_meta.get('trial_number', '?')})")

### 1.5 Examine a Single Channel

Let's look closely at one channel to understand the signal characteristics.

In [None]:
# Select channel 1 for detailed analysis
channel = 'emg_1'
raw_signal = sample_df[channel].values
time = sample_df['time'].values

# Basic statistics
print(f"Channel: {channel}")
print(f"  Samples: {len(raw_signal)}")
print(f"  Duration: {time[-1]:.2f} seconds")
print(f"  Sample rate: {len(raw_signal) / time[-1]:.1f} Hz")
print(f"  Min: {raw_signal.min():.2f}")
print(f"  Max: {raw_signal.max():.2f}")
print(f"  Mean: {raw_signal.mean():.2f}")
print(f"  Std Dev: {raw_signal.std():.2f}")

In [None]:
# Plot time domain and frequency domain for single channel
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Time domain
axes[0].plot(time, raw_signal, linewidth=0.5)
axes[0].set_xlabel('Time (s)')
axes[0].set_ylabel('Amplitude')
axes[0].set_title(f'{channel.upper()} - Time Domain (Raw Signal)', fontsize=12, fontweight='bold')
axes[0].set_ylim([-80, 80])

# Frequency domain (Power Spectral Density)
freqs, psd = signal.welch(raw_signal, fs=SAMPLE_RATE, nperseg=256)
axes[1].semilogy(freqs, psd, linewidth=1)
axes[1].axvline(x=60, color='r', linestyle='--', alpha=0.7, label='60 Hz powerline')
axes[1].axvspan(20, 95, alpha=0.15, color='green', label='EMG band (20-95 Hz)')
axes[1].set_xlabel('Frequency (Hz)')
axes[1].set_ylabel('Power Spectral Density')
axes[1].set_title(f'{channel.upper()} - Frequency Domain (Power Spectrum)', fontsize=12, fontweight='bold')
axes[1].set_xlim([0, 100])
axes[1].legend(loc='upper right')

plt.tight_layout()
plt.show()

**Observations:**
- The 60 Hz peak in the frequency spectrum is powerline interference
- EMG activity is spread across 20-95 Hz
- The signal oscillates around zero (bipolar)

---

# 1.6 Understanding Sample Rate and Timing (Important!)


In [None]:
print("=" * 70)
print("SAMPLE RATE ANALYSIS")
print("=" * 70)

# The Myo armband claims 200 Hz, but what's the actual rate?
n_samples = len(sample_df)
recorded_duration = sample_df['timestamp_raw'].iloc[-1] - sample_df['timestamp_raw'].iloc[0]
effective_rate = sample_meta['effective_sample_rate']
nominal_rate = sample_meta['nominal_sample_rate']

print(f"\nRecording statistics:")
print(f"  Total samples: {n_samples}")
print(f"  Recorded duration: {recorded_duration:.3f} sec")
print(f"  Nominal sample rate (Myo spec): {nominal_rate} Hz")
print(f"  Effective sample rate (actual): {effective_rate:.2f} Hz")
print(f"  Difference: {effective_rate - nominal_rate:.2f} Hz ({(effective_rate/nominal_rate - 1)*100:.2f}%)")

In [None]:
# Visualize the timing problem
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# 1. Raw timestamp differences (shows BLE jitter)
raw_ts = sample_df['timestamp_raw'].values
ts_diffs = np.diff(raw_ts) * 1000  # Convert to ms

axes[0, 0].hist(ts_diffs, bins=50, edgecolor='black', alpha=0.7)
axes[0, 0].axvline(x=5.0, color='r', linestyle='--', label='Expected (5ms @ 200Hz)')
axes[0, 0].axvline(x=1000/effective_rate, color='g', linestyle='--', label=f'Effective ({1000/effective_rate:.2f}ms)')
axes[0, 0].set_xlabel('Time between samples (ms)')
axes[0, 0].set_ylabel('Count')
axes[0, 0].set_title('A) Recorded Timestamp Differences\n(Shows BLE timing jitter)')
axes[0, 0].legend()

# 2. Cumulative timing drift comparison
sample_indices = np.arange(n_samples)
time_assumed_200 = sample_indices / 200  # If we assumed 200 Hz
time_effective = sample_indices / effective_rate  # Using effective rate
time_raw = raw_ts - raw_ts[0]  # Actual recorded timestamps

axes[0, 1].plot(sample_indices, time_assumed_200, 'b-', label='Assumed 200 Hz', alpha=0.7)
axes[0, 1].plot(sample_indices, time_effective, 'g-', label=f'Effective {effective_rate:.1f} Hz', alpha=0.7)
axes[0, 1].plot(sample_indices, time_raw, 'r.', markersize=1, label='Raw timestamps', alpha=0.3)
axes[0, 1].set_xlabel('Sample index')
axes[0, 1].set_ylabel('Time (s)')
axes[0, 1].set_title('B) Time Assignment Methods')
axes[0, 1].legend()

# 3. Error accumulation over time
error_200 = time_assumed_200 - time_raw
error_effective = time_effective - time_raw

axes[1, 0].plot(time_raw, error_200 * 1000, 'b-', label='Assumed 200 Hz', linewidth=2)
axes[1, 0].plot(time_raw, error_effective * 1000, 'g-', label='Effective rate', linewidth=2)
axes[1, 0].axhline(y=0, color='k', linestyle='-', alpha=0.3)
axes[1, 0].set_xlabel('Recording time (s)')
axes[1, 0].set_ylabel('Timing error (ms)')
axes[1, 0].set_title('C) Cumulative Timing Error')
axes[1, 0].legend()

# 4. Extrapolated error for longer recordings
durations = np.array([5, 30, 60, 120, 300])  # seconds
error_per_sec = (1/200 - 1/effective_rate) * effective_rate  # seconds of error per second
extrapolated_error = durations * error_per_sec

axes[1, 1].bar(range(len(durations)), extrapolated_error, color=['green', 'yellow', 'orange', 'red', 'darkred'])
axes[1, 1].set_xticks(range(len(durations)))
axes[1, 1].set_xticklabels(['5s', '30s', '1min', '2min', '5min'])
axes[1, 1].set_xlabel('Recording duration')
axes[1, 1].set_ylabel('Timing error (seconds)')
axes[1, 1].set_title('D) Extrapolated Error if Assuming 200 Hz')
for i, (d, e) in enumerate(zip(durations, extrapolated_error)):
    axes[1, 1].text(i, e + 0.1, f'{e:.2f}s', ha='center', fontsize=10)

plt.suptitle('Why Effective Sample Rate Matters', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

print(f"""
KEY TAKEAWAYS:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

1. BLE TIMING JITTER (Plot A):
   - Samples arrive in bursts due to Bluetooth connection intervals
   - Raw timestamps are NOT suitable for signal processing
   
2. SYSTEMATIC RATE DIFFERENCE (Plots B, C):
   - Myo runs at ~{effective_rate:.1f} Hz, not exactly 200 Hz
   - Small difference, but accumulates over time!
   
3. PRACTICAL IMPACT (Plot D):
   - After 5 minutes: {extrapolated_error[-1]:.2f} seconds of drift
   - Critical for multi-sensor synchronization
   
4. OUR SOLUTION:
   - Calculate effective rate from total samples / total duration
   - Create uniform time vector at effective rate
   - Preserves correct duration AND uniform sampling for signal processing
""")

## Part 2: Signal Processing

Raw EMG contains noise that must be removed before analysis.

### 2.1 Filtering Theory

**Why filter?**
- **High-pass (>20 Hz)**: Remove DC offset and slow motion artifacts
- **Low-pass (<95 Hz)**: Remove high-frequency noise (Myo already low-passes at ~95 Hz)
- **Notch (60 Hz)**: Remove powerline interference

**Filter types:**
- **Butterworth**: Maximally flat passband (no ripples)
- **filtfilt**: Zero-phase filtering (no time delay)

### 2.2 Implement Filters (Student Exercise)

Complete the bandpass and notch filter functions.

In [None]:
def bandpass_filter(data, lowcut, highcut, sample_rate, order=4):
    """
    Apply a Butterworth bandpass filter.
    
    Parameters
    ----------
    data : np.ndarray
        Input signal
    lowcut : float
        Low cutoff frequency (Hz)
    highcut : float
        High cutoff frequency (Hz)
    sample_rate : float
        Sampling frequency (Hz)
    order : int
        Filter order (default: 4)
    
    Returns
    -------
    filtered : np.ndarray
        Filtered signal
    """
    # TODO: Calculate the Nyquist frequency (half of sample rate)
    
    
    # TODO: Normalize cutoff frequencies to Nyquist (0 to 1 range)
    
    
    # TODO: Design the Butterworth bandpass filter
    # Use: signal.butter(order, [low, high], btype='band')
 
    
    # TODO: Apply zero-phase filtering
    # Use: signal.filtfilt(b, a, data)

    
    return filtered

print("✓ bandpass_filter() defined")

In [None]:
def notch_filter(data, notch_freq, sample_rate, Q=30):
    """
    Apply a notch filter to remove a specific frequency.
    
    Parameters
    ----------
    data : np.ndarray
        Input signal
    notch_freq : float
        Frequency to remove (Hz)
    sample_rate : float
        Sampling frequency (Hz)
    Q : float
        Quality factor - higher = narrower notch (default: 30)
    
    Returns
    -------
    filtered : np.ndarray
        Filtered signal
    """
    # TODO: Design the notch filter
    # Use: signal.iirnotch(notch_freq, Q, sample_rate)
    
    
    # TODO: Apply zero-phase filtering

    return filtered

print("✓ notch_filter() defined")

### 2.3 Apply Filters to the Sample Recording

In [None]:
# Apply filters to channel 1
# Step 1: Bandpass filter (20-95 Hz)
bandpassed = bandpass_filter(raw_signal, lowcut=20, highcut=95, sample_rate=SAMPLE_RATE)

# Step 2: Notch filter (remove 60 Hz)
filtered = notch_filter(bandpassed, notch_freq=60, sample_rate=SAMPLE_RATE)

print(f"Raw signal stats:      mean={raw_signal.mean():.2f}, std={raw_signal.std():.2f}")
print(f"Filtered signal stats: mean={filtered.mean():.2f}, std={filtered.std():.2f}")

In [None]:
# Compare raw vs filtered signals
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

axes[0].plot(time, raw_signal, linewidth=0.5)
axes[0].set_ylabel('Amplitude')
axes[0].set_title('1. Raw Signal', fontsize=12, fontweight='bold')
axes[0].set_ylim([-80, 80])

axes[1].plot(time, bandpassed, linewidth=0.5, color='orange')
axes[1].set_ylabel('Amplitude')
axes[1].set_title('2. After Bandpass Filter (20-95 Hz)', fontsize=12, fontweight='bold')
axes[1].set_ylim([-80, 80])

axes[2].plot(time, filtered, linewidth=0.5, color='green')
axes[2].set_ylabel('Amplitude')
axes[2].set_title('3. After Notch Filter (60 Hz removed)', fontsize=12, fontweight='bold')
axes[2].set_xlabel('Time (s)')
axes[2].set_ylim([-80, 80])

plt.suptitle(f'{channel.upper()} - Signal Processing Pipeline', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
# Compare frequency spectra
freqs_raw, psd_raw = signal.welch(raw_signal, fs=SAMPLE_RATE, nperseg=256)
freqs_filt, psd_filt = signal.welch(filtered, fs=SAMPLE_RATE, nperseg=256)

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Full spectrum comparison
axes[0].semilogy(freqs_raw, psd_raw, 'b', alpha=0.7, label='Raw', linewidth=1.5)
axes[0].semilogy(freqs_filt, psd_filt, 'g', alpha=0.9, label='Filtered', linewidth=1.5)
axes[0].axvline(x=60, color='r', linestyle='--', alpha=0.5, label='60 Hz')
axes[0].axvspan(20, 95, alpha=0.1, color='green')
axes[0].set_xlabel('Frequency (Hz)')
axes[0].set_ylabel('Power Spectral Density (log)')
axes[0].set_title('Power Spectrum Comparison', fontsize=12, fontweight='bold')
axes[0].legend()
axes[0].set_xlim([0, 100])

# Zoomed around 60 Hz (linear scale)
axes[1].plot(freqs_raw, psd_raw, 'b', alpha=0.7, label='Raw', linewidth=1.5)
axes[1].plot(freqs_filt, psd_filt, 'g', alpha=0.9, label='Filtered', linewidth=1.5)
axes[1].axvline(x=60, color='r', linestyle='--', alpha=0.5)
axes[1].set_xlabel('Frequency (Hz)')
axes[1].set_ylabel('Power Spectral Density (linear)')
axes[1].set_title('Zoomed: 60 Hz Notch Effect', fontsize=12, fontweight='bold')
axes[1].legend()
axes[1].set_xlim([50, 70])

plt.tight_layout()
plt.show()

### 2.4 Rectification and Envelope Extraction

EMG is **bipolar** (oscillates around zero). To measure muscle activation:

1. **Rectify**: Take absolute value → all positive
2. **Smooth**: Low-pass filter → envelope showing activation level

In [None]:
def compute_envelope(data, sample_rate, cutoff_freq=5):
    """
    Compute EMG envelope using rectification and low-pass filtering.
    
    Parameters
    ----------
    data : np.ndarray
        Filtered EMG signal
    sample_rate : float
        Sampling frequency
    cutoff_freq : float
        Low-pass cutoff for smoothing (default: 5 Hz)
    
    Returns
    -------
    envelope : np.ndarray
        Smoothed envelope
    """
    # Step 1: Rectify (absolute value)
    rectified = np.abs(data)
    
    # Step 2: Low-pass filter to smooth
    nyquist = sample_rate / 2
    normalized_cutoff = cutoff_freq / nyquist
    b, a = signal.butter(4, normalized_cutoff, btype='low')
    envelope = signal.filtfilt(b, a, rectified)
    
    return envelope

# Compute envelope for channel 1
rectified = np.abs(filtered)
envelope = compute_envelope(filtered, SAMPLE_RATE, cutoff_freq=5)

print("✓ Envelope computed")

In [None]:
# Visualize the envelope extraction process
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

axes[0].plot(time, filtered, linewidth=0.5)
axes[0].set_ylabel('Amplitude')
axes[0].set_title('1. Filtered EMG Signal', fontsize=12, fontweight='bold')
axes[0].set_ylim([-60, 60])

axes[1].plot(time, rectified, linewidth=0.5, color='orange')
axes[1].set_ylabel('Amplitude')
axes[1].set_title('2. Rectified (Absolute Value)', fontsize=12, fontweight='bold')
axes[1].set_ylim([0, 60])

axes[2].fill_between(time, 0, rectified, alpha=0.3, color='orange', label='Rectified')
axes[2].plot(time, envelope, linewidth=2.5, color='red', label='Envelope')
axes[2].set_ylabel('Amplitude')
axes[2].set_title('3. EMG Envelope (Muscle Activation Level)', fontsize=12, fontweight='bold')
axes[2].set_xlabel('Time (s)')
axes[2].set_ylim([0, 40])
axes[2].legend(loc='upper right')

plt.suptitle(f'{channel.upper()} - Envelope Extraction', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

### 2.5 Process All Channels

In [None]:
def process_emg_channel(data, sample_rate):
    """
    Complete processing pipeline for one EMG channel.
    """
    filtered = bandpass_filter(data, 20, 95, sample_rate)
    filtered = notch_filter(filtered, 60, sample_rate)
    envelope = compute_envelope(filtered, sample_rate)
    return filtered, envelope

# Process all 8 channels
processed_df = sample_df[['time']].copy()
envelope_df = sample_df[['time']].copy()

for ch in range(1, N_CHANNELS + 1):
    col = f'emg_{ch}'
    raw = sample_df[col].values
    filt, env = process_emg_channel(raw, SAMPLE_RATE)
    processed_df[col] = filt
    envelope_df[f'env_{ch}'] = env

print("✓ All channels processed")

In [None]:
# Plot all channel envelopes
fig, ax = plt.subplots(figsize=(14, 6))

colors = plt.cm.Set2(np.linspace(0, 1, N_CHANNELS))
for ch in range(1, N_CHANNELS + 1):
    ax.plot(time, envelope_df[f'env_{ch}'], linewidth=2, 
            color=colors[ch-1], label=f'Ch {ch}', alpha=0.8)

ax.set_xlabel('Time (s)', fontsize=12)
ax.set_ylabel('Envelope Amplitude', fontsize=12)
ax.set_title(f'EMG Envelopes - All Channels ({sample_meta["gesture"].upper()})', 
             fontsize=14, fontweight='bold')
ax.legend(loc='upper right', ncol=4)
ax.set_xlim([0, time[-1]])
plt.tight_layout()
plt.show()

### Question 1

**Which EMG channels show the strongest activation when you make a fist?**

*Your answer here:*

> 

### Question 2

**What is the approximate amplitude range of the EMG signal at rest vs. during a strong contraction?**

*Your answer here:*

> 

### Question 3

**Looking at the PSD plots, what happened to the 60 Hz component after filtering? Why is the log scale (semilogy) useful for viewing PSD?**

*Your answer here:*

> 

---

## Part 3: Feature Extraction

Features are **numerical summaries** of the signal that can be used for classification.

### 3.1 Time-Domain Features (Student Exercise)

| Feature | Formula | Description |
|---------|---------|-------------|
| **MAV** | $\frac{1}{N}\sum_{i=1}^{N}|x_i|$ | Mean Absolute Value |
| **RMS** | $\sqrt{\frac{1}{N}\sum_{i=1}^{N}x_i^2}$ | Root Mean Square (signal power) |
| **WL** | $\sum_{i=1}^{N-1}|x_{i+1} - x_i|$ | Waveform Length |
| **ZC** | Count of sign changes | Zero Crossings |
| **SSC** | Count of slope sign changes | Slope Sign Changes |

**Your task:** Implement MAV, RMS, and WL. We provide ZC and SSC.

In [None]:
def compute_mav(data):
    """
    Mean Absolute Value: average of |x|
    
    MAV = (1/N) * sum(|x_i|)
    """
    # TODO: Implement using np.abs() and np.mean()
    mav = None  # Replace with your implementation
    return mav


def compute_rms(data):
    """
    Root Mean Square: square root of average squared value
    
    RMS = sqrt((1/N) * sum(x_i^2))
    """
    # TODO: Implement using np.sqrt() and np.mean()
    rms = None  # Replace with your implementation
    return rms


def compute_wl(data):
    """
    Waveform Length: sum of absolute differences
    
    WL = sum(|x_{i+1} - x_i|)
    """
    # TODO: Implement using np.diff(), np.abs(), np.sum()
    wl = None  # Replace with your implementation
    return wl


# Test on filtered signal
print("Testing your implementations on Channel 1:")
print(f"  MAV = {compute_mav(filtered)}")
print(f"  RMS = {compute_rms(filtered)}")
print(f"  WL  = {compute_wl(filtered)}")

In [None]:
# We provide Zero Crossings and Slope Sign Changes

def compute_zc(data, threshold=0):
    """
    Zero Crossings: count of times signal crosses zero
    """
    signs = np.sign(data)
    sign_changes = np.diff(signs)
    crossings = np.abs(sign_changes) == 2
    
    if threshold > 0:
        amplitude_check = np.abs(np.diff(data)) >= threshold
        crossings = crossings & amplitude_check
    
    return np.sum(crossings)


def compute_ssc(data, threshold=0):
    """
    Slope Sign Changes: count of local peaks and valleys
    """
    diff1 = data[1:-1] - data[:-2]
    diff2 = data[1:-1] - data[2:]
    ssc = np.sum((diff1 * diff2) > threshold)
    return ssc


print(f"  ZC  = {compute_zc(filtered)}")
print(f"  SSC = {compute_ssc(filtered)}")

### 3.2 Frequency-Domain Features

In [None]:
def compute_frequency_features(data, sample_rate):
    """
    Extract frequency-domain features from EMG signal.
    
    Returns: mean_freq, median_freq, total_power
    """
    # Compute power spectral density
    freqs, psd = signal.welch(data, fs=sample_rate, nperseg=min(256, len(data)))
    
    # Total power
    total_power = np.sum(psd)
    
    # Mean frequency (centroid)
    mean_freq = np.sum(freqs * psd) / total_power if total_power > 0 else 0
    
    # Median frequency (divides power in half)
    cumulative = np.cumsum(psd)
    median_idx = np.searchsorted(cumulative, total_power / 2)
    median_freq = freqs[min(median_idx, len(freqs)-1)]
    
    return {
        'mean_freq': mean_freq,
        'median_freq': median_freq,
        'total_power': total_power
    }

freq_feats = compute_frequency_features(filtered, SAMPLE_RATE)
print("Frequency features:")
for name, value in freq_feats.items():
    print(f"  {name}: {value:.4f}")

### 3.3 Complete Feature Extraction Function

In [None]:
def extract_channel_features(data, sample_rate):
    """
    Extract all features from a single EMG channel.
    
    Applies filtering first, then extracts features.
    """
    # Filter the signal
    filtered = bandpass_filter(data, 20, 95, sample_rate)
    filtered = notch_filter(filtered, 60, sample_rate)
    
    # Time-domain features
    features = {
        'mav': compute_mav(filtered),
        'rms': compute_rms(filtered),
        'wl': compute_wl(filtered),
        'zc': compute_zc(filtered),
        'ssc': compute_ssc(filtered)
    }
    
    # Frequency-domain features
    freq_feats = compute_frequency_features(filtered, sample_rate)
    features.update(freq_feats)
    
    return features


def extract_all_features(data_df, sample_rate):
    """
    Extract features from all 8 EMG channels.
    
    Returns dict with keys like 'ch1_mav', 'ch2_rms', etc.
    """
    all_features = {}
    
    for ch in range(1, N_CHANNELS + 1):
        col = f'emg_{ch}'
        if col in data_df.columns:
            ch_features = extract_channel_features(data_df[col].values, sample_rate)
            for feat_name, feat_value in ch_features.items():
                all_features[f'ch{ch}_{feat_name}'] = feat_value
    
    return all_features


# Extract features from sample recording
sample_features = extract_all_features(sample_df, SAMPLE_RATE)

print(f"Extracted {len(sample_features)} features from sample recording:")
print("-" * 50)
for i, (name, value) in enumerate(sample_features.items()):
    if value is not None:
        print(f"  {name}: {value:.4f}")
    else:
        print(f"  {name}: None (implement the function above)")
    if i >= 15:
        print(f"  ... and {len(sample_features) - 16} more features")
        break

---

# Phase 2: Full Dataset Analysis

Now that you understand the processing pipeline, let's apply it to your complete dataset.

---

## Part 4: Load and Process Complete Dataset

### 4.1 Load All Trial Files (Student Exercise)

In [None]:
# TODO: Set this to your data folder
DATA_DIR = '../recordings/'  # Change this path!

def load_all_trials(data_dir):
    """
    Load all EMG trial files from a directory.
    
    Returns: dict with gesture names as keys, list of (data, metadata) as values
    """
    all_data = {}
    
    # Find all EMG CSV files
    pattern = os.path.join(data_dir, '*_emg_*.csv')
    files = glob.glob(pattern)
    
    print(f"Found {len(files)} EMG files in {data_dir}")
    
    for filepath in sorted(files):
        # TODO: Load each file using load_emg_file()
        # data, metadata = load_emg_file(filepath)
        
        # TODO: Get gesture name from metadata
        # gesture = metadata['gesture']
        
        # TODO: Add to dictionary
        # if gesture not in all_data:
        #     all_data[gesture] = []
        # all_data[gesture].append((data, metadata))
        
        pass  # Remove when implemented
    
    return all_data


my_data = load_all_trials(DATA_DIR)
for gesture, trials in my_data.items():
    print(f"  {gesture}: {len(trials)} trials")

''' Below is synthetic data if you absolutely need it '''

# def generate_synthetic_recording(gesture='fist', duration_sec=3.0, sample_rate=200):
#     """
#     Generate synthetic EMG recording for demonstration.
#     Used when students don't have their own collected data.
#     """
#     n_samples = int(duration_sec * sample_rate)
#     t = np.arange(n_samples) / sample_rate
    
#     # Gesture-specific channel activation patterns
#     gesture_patterns = {
#         'rest':             [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
#         'fist':             [0.8, 0.9, 0.7, 0.3, 0.4, 0.6, 0.8, 0.7],
#         'open':             [0.3, 0.4, 0.8, 0.9, 0.7, 0.3, 0.4, 0.5],
#         'wrist_flexion':    [0.9, 0.7, 0.4, 0.2, 0.3, 0.5, 0.6, 0.8],
#         'wrist_extension':  [0.3, 0.4, 0.6, 0.8, 0.9, 0.7, 0.4, 0.3],
#         'pronation':        [0.5, 0.7, 0.8, 0.6, 0.3, 0.4, 0.7, 0.6],
#         'supination':       [0.6, 0.4, 0.3, 0.5, 0.7, 0.8, 0.6, 0.5],
#     }
#     pattern = gesture_patterns.get(gesture, gesture_patterns['rest'])
    
#     # Create activation envelope (rest -> active -> rest)
#     envelope = np.ones(n_samples)
#     hold_start = int(0.5 * sample_rate)
#     hold_end = int((duration_sec - 0.5) * sample_rate)
#     envelope[:hold_start] = np.linspace(0, 1, hold_start)
#     envelope[hold_end:] = np.linspace(1, 0, n_samples - hold_end)
    
#     # Generate EMG for each channel
#     data = {'timestamp': t}
#     for ch in range(N_CHANNELS):
#         noise = np.random.randn(n_samples) * 3
#         powerline = 2.5 * np.sin(2 * np.pi * 60 * t)
#         activation = pattern[ch] * envelope * 35
#         emg_burst = np.random.randn(n_samples) * (activation + 1)
#         emg = np.clip(noise + powerline + emg_burst, -127, 127)
#         data[f'emg_{ch+1}'] = emg
    
#     df = pd.DataFrame(data)
#     df['time'] = t
    
#     metadata = {
#         'participant_id': 'synthetic',
#         'gesture': gesture,
#         'trial_number': 1,
#         'samples': n_samples,
#         'duration_sec': duration_sec
#     }
    
#     return df, metadata

### 4.2 Generate Full Dataset (For Demo)

For demonstration, we'll generate synthetic data for all gestures and trials.

In [None]:
# =============================================================================
# 4.2 Load Student Data (or Generate Synthetic Data for Demo)
# =============================================================================

# Path to your collected data
DATA_DIR = '../data'  # Adjust this path to where your CSV files are located

# Gesture list (should match what you collected)
GESTURES = ['rest', 'fist', 'open', 'wrist_flexion', 'wrist_extension', 'pronation', 'supination']

def load_all_trials(data_dir, gestures=None):
    """
    Load all EMG trial files from a directory.
    
    Expects filenames in format: {participant}_{gesture}_trial{NNN}_emg_{timestamp}.csv
    
    Parameters
    ----------
    data_dir : str
        Path to directory containing CSV files
    gestures : list, optional
        List of gesture names to load. If None, loads all found gestures.
    
    Returns
    -------
    all_trials : dict
        Dictionary mapping gesture names to lists of (df, metadata) tuples
    """
    all_trials = {}
    
    # Find all EMG CSV files
    pattern = os.path.join(data_dir, '*_emg_*.csv')
    files = glob.glob(pattern)
    
    if not files:
        print(f"⚠ No EMG files found in {data_dir}")
        print(f"  Looking for pattern: {pattern}")
        return all_trials
    
    print(f"Found {len(files)} EMG files in {data_dir}")
    
    for filepath in sorted(files):
        filename = os.path.basename(filepath)
        
        # Parse filename: {participant}_{gesture}_trial{NNN}_emg_{timestamp}.csv
        parts = filename.replace('.csv', '').split('_')
        
        # Find gesture name (may be one or two parts, e.g., 'fist' or 'wrist_flexion')
        gesture = None
        for g in GESTURES:
            if g in filename:
                gesture = g
                break
        
        if gesture is None:
            print(f"  Skipping (unknown gesture): {filename}")
            continue
        
        # Filter by requested gestures
        if gestures is not None and gesture not in gestures:
            continue
        
        # Load the file
        try:
            df, meta = load_emg_file(filepath)
            
            # Initialize gesture list if needed
            if gesture not in all_trials:
                all_trials[gesture] = []
            
            all_trials[gesture].append((df, meta))
            
        except Exception as e:
            print(f"  Error loading {filename}: {e}")
    
    return all_trials

# Try to load student data first
all_trials = load_all_trials(DATA_DIR, GESTURES)


# Summarize loaded data
print("\n" + "="*60)
print("LOADED STUDENT DATA")
print("="*60)
total_trials = 0
for gesture in GESTURES:
    n_trials = len(all_trials.get(gesture, []))
    total_trials += n_trials
    status = "✓" if n_trials > 0 else "✗"
    print(f"  {status} {gesture}: {n_trials} trials")
    
print(f"\n✓ Loaded {total_trials} total recordings across {len([g for g in all_trials if all_trials[g]])} gestures")

### 4.3 Extract Features from All Trials

In [None]:
# Extract features from all trials
feature_rows = []

print("Extracting features...")
for gesture, trials in all_trials.items():
    for data_df, metadata in trials:
        # Extract features
        features = extract_all_features(data_df, SAMPLE_RATE)
        
        # Add metadata
        features['gesture'] = gesture
        features['participant'] = metadata.get('participant_id', 'unknown')
        features['trial'] = metadata.get('trial_number', 0)
        
        feature_rows.append(features)

# Create DataFrame
features_df = pd.DataFrame(feature_rows)

print(f"\n✓ Feature matrix shape: {features_df.shape}")
print(f"  Samples: {len(features_df)}")
print(f"  Features per sample: {len([c for c in features_df.columns if c not in ['gesture', 'participant', 'trial']])}")

# Show first few rows
features_df.head()

---

## Part 5: Exploratory Data Analysis

### 5.1 Feature Distributions by Gesture

In [None]:
# Box plot of MAV for each channel by gesture
mav_cols = [f'ch{i}_mav' for i in range(1, 9) if f'ch{i}_mav' in features_df.columns]

fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

for i, col in enumerate(mav_cols):
    if features_df[col].notna().any():
        sns.boxplot(data=features_df, x='gesture', y=col, ax=axes[i], palette='Set2')
        axes[i].set_title(f'Channel {i+1} MAV')
        axes[i].set_xlabel('')
        axes[i].tick_params(axis='x', rotation=45)
    else:
        axes[i].text(0.5, 0.5, 'Implement MAV', ha='center', va='center', fontsize=12)
        axes[i].set_title(f'Channel {i+1} MAV')

plt.suptitle('Mean Absolute Value by Gesture and Channel', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
# Heatmap: Average MAV by gesture and channel
mav_cols = [col for col in features_df.columns if 'mav' in col and features_df[col].notna().any()]

if mav_cols:
    gesture_means = features_df.groupby('gesture')[mav_cols].mean()
    
    plt.figure(figsize=(12, 6))
    sns.heatmap(gesture_means, annot=True, fmt='.1f', cmap='YlOrRd',
                xticklabels=[f'Ch{i}' for i in range(1, len(mav_cols)+1)])
    plt.title('Average MAV by Gesture and Channel', fontsize=14, fontweight='bold')
    plt.xlabel('Channel')
    plt.ylabel('Gesture')
    plt.tight_layout()
    plt.show()
else:
    print("Implement compute_mav() to see this visualization")

### 5.2 Feature Space Visualization

In [None]:
# Scatter plot: two features colored by gesture
feat_x = 'ch1_mav' if 'ch1_mav' in features_df.columns and features_df['ch1_mav'].notna().any() else 'ch1_zc'
feat_y = 'ch3_mav' if 'ch3_mav' in features_df.columns and features_df['ch3_mav'].notna().any() else 'ch3_zc'

if features_df[feat_x].notna().any() and features_df[feat_y].notna().any():
    plt.figure(figsize=(10, 8))
    
    colors = plt.cm.Set2(np.linspace(0, 1, len(GESTURES)))
    for i, gesture in enumerate(GESTURES):
        mask = features_df['gesture'] == gesture
        plt.scatter(features_df.loc[mask, feat_x], 
                   features_df.loc[mask, feat_y],
                   label=gesture, alpha=0.7, s=100, color=colors[i])
    
    plt.xlabel(feat_x.replace('_', ' ').upper())
    plt.ylabel(feat_y.replace('_', ' ').upper())
    plt.title('Feature Space: Gesture Separation', fontsize=14, fontweight='bold')
    plt.legend(loc='upper right')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
else:
    print("Implement feature functions to see this visualization")

### Question 4

**How consistent are your EMG patterns across trials of the same gesture? What factors might cause variability?**

*Your answer here:*

> 

### Question 5

**Do different group members show similar or different EMG patterns for the same gesture? Why might this be?**

*Your answer here:*

> 

---

## Part 6: Gesture Classification

### 6.1 Prepare Data

In [None]:
# Select feature columns (exclude metadata)
feature_cols = [col for col in features_df.columns 
                if col not in ['gesture', 'participant', 'trial']
                and features_df[col].notna().all()]

print(f"Using {len(feature_cols)} features for classification")

# Create feature matrix and labels
X = features_df[feature_cols].values
y = features_df['gesture'].values

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)

print(f"X shape: {X.shape}")
print(f"Classes: {label_encoder.classes_}")

In [None]:
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y_encoded, test_size=0.3, random_state=42, stratify=y_encoded
)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"Training: {len(X_train)} samples")
print(f"Testing: {len(X_test)} samples")

### 6.2 Linear Discriminant Analysis (LDA)

LDA finds linear boundaries between classes by maximizing between-class variance and minimizing within-class variance.

In [None]:
# Train LDA
lda = LinearDiscriminantAnalysis()
lda.fit(X_train_scaled, y_train)

# Predict
y_pred_lda = lda.predict(X_test_scaled)

# Evaluate
accuracy_lda = accuracy_score(y_test, y_pred_lda)
print(f"LDA Accuracy: {accuracy_lda:.2%}")
print("\nClassification Report:")
print(classification_report(y_test, y_pred_lda, target_names=label_encoder.classes_))

In [None]:
# Confusion matrix
cm_lda = confusion_matrix(y_test, y_pred_lda)

plt.figure(figsize=(10, 8))
sns.heatmap(cm_lda, annot=True, fmt='d', cmap='Blues',
            xticklabels=label_encoder.classes_,
            yticklabels=label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title(f'LDA Confusion Matrix (Accuracy: {accuracy_lda:.2%})', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
# LDA projection visualization
X_lda = lda.transform(X_train_scaled)

plt.figure(figsize=(10, 8))
colors = plt.cm.Set2(np.linspace(0, 1, len(label_encoder.classes_)))

for i, gesture in enumerate(label_encoder.classes_):
    mask = y_train == i
    plt.scatter(X_lda[mask, 0], X_lda[mask, 1], 
               label=gesture, alpha=0.7, s=100, color=colors[i])

plt.xlabel('LDA Component 1')
plt.ylabel('LDA Component 2')
plt.title('LDA Projection of Training Data', fontsize=14, fontweight='bold')
plt.legend(loc='best')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

### 6.3 Quadratic Discriminant Analysis (QDA) - Student Exercise

QDA allows each class to have its own covariance matrix (quadratic boundaries).

**Your task:** Follow the LDA pattern above to implement QDA.

In [None]:
# TODO: Implement QDA classification

# Step 1: Create and train QDA


# Step 2: Predict on test set


# Step 3: Evaluate


# Step 4: Confusion matrix


print("TODO: Implement QDA classification")

### 6.4 K-Means Clustering - Student Exercise

K-Means is **unsupervised** - it groups data without using labels.

**Your task:** Cluster the data and compare to true labels.

In [None]:
# TODO: Implement K-Means clustering

# Step 1: Create K-Means with k = number of gestures


# Step 2: Fit to all scaled data


# Step 3: Compare clusters to true labels


print("TODO: Implement K-Means clustering")

### 6.5 Cross-Validation

In [None]:
# 5-fold cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
X_scaled = scaler.fit_transform(X)

# LDA CV
lda_cv = cross_val_score(LinearDiscriminantAnalysis(), X_scaled, y_encoded, cv=cv)
print(f"LDA Cross-Validation: {lda_cv.mean():.2%} ± {lda_cv.std():.2%}")
print(f"  Folds: {[f'{s:.2%}' for s in lda_cv]}")

# TODO: Add QDA cross-validation
# qda_cv = cross_val_score(QuadraticDiscriminantAnalysis(), X_scaled, y_encoded, cv=cv)
# print(f"\nQDA Cross-Validation: {qda_cv.mean():.2%} ± {qda_cv.std():.2%}")

### Question 6

**What is the approximate delay between your muscle contraction and the visual feedback in the proportional control demo? What factors contribute to this delay?**

*Your answer here:*

> 

### Question 7

**How might proportional EMG control be used in a prosthetic hand or robotic interface?**

*Your answer here:*

> 

### Question 8

**Which features are most useful for distinguishing between gestures?**

*Your answer here:*

> 

### Question 9

**Compare the classification accuracy of LDA, QDA, and K-means. Which performs best and why?**

*Your answer here:*

> 

### Question 10

**If you were designing a gesture recognition system for a real application, what gestures would you choose and why?**

*Your answer here:*

> 

---

## Summary

### What We Learned

1. **EMG signals** encode muscle activation across multiple channels
2. **Signal processing** (bandpass, notch, envelope) cleans the signal
3. **Feature extraction** converts signals into discriminative numbers
4. **Classification** automatically recognizes gestures:
   - **LDA**: Linear boundaries, fast, works well with limited data
   - **QDA**: Quadratic boundaries, more flexible
   - **K-Means**: Unsupervised clustering

### Applications

- Prosthetic control
- Exoskeletons
- Human-robot interaction
- Rehabilitation
- Gaming/VR interfaces

---

## Submission Checklist

- [ ] Implemented `bandpass_filter()` and `notch_filter()`
- [ ] Implemented `compute_mav()`, `compute_rms()`, `compute_wl()`
- [ ] Implemented QDA classification (Section 6.3)
- [ ] Implemented K-Means clustering (Section 6.4)
- [ ] Answered all 10 questions
- [ ] Run all cells
- [ ] Exported to PDF