# EMG Signal Processing Pipeline Demo

This notebook demonstrates a complete Electromyography (EMG) signal processing pipeline. EMG signals measure the electrical activity produced by skeletal muscles and are widely used in medical diagnostics, rehabilitation, and human-computer interaction.

The pipeline includes:
- Data loading and preprocessing (filtering, denoising, normalization)
- Feature extraction (time domain, frequency domain, and nonlinear features)
- Model training and evaluation (using classical ML approaches)
- Real-time processing demonstration

## Setup

First, we'll import the required modules. Our custom modules include:
- `preprocessing`: Signal filtering and normalization tools
- `features`: Feature extraction methods for biosignals
- `models`: Machine learning model implementations
- `acquisition`: Data acquisition and real-time processing
- `utils`: Utility functions for visualization
- `simulation`: Biosignal simulation capabilities

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import signal
import ipywidgets as widgets
from IPython.display import display, clear_output

# Import local modules
from preprocessing import SignalDenoising, SignalNormalization, SignalSegmentation
from features import TimeDomainFeatures, FrequencyDomainFeatures, NonlinearFeatures
from models import RandomForestModel, SVMModel, EnsembleModel
from acquisition import EMGAcquisition
from utils import plot_signals, plot_features
from simulation import EMGSimulator, ECGSimulator, EOGSimulator, NoiseSimulator
from sklearn.metrics import auc
from sklearn.model_selection import train_test_split

## Interactive Examples

This section provides interactive widgets to explore signal generation, processing, and analysis in real-time.

### 1. Signal Generation Explorer

Experiment with different signal types and parameters:

In [None]:
# Create widgets for signal generation
signal_type = widgets.Dropdown(
    options=['EMG', 'ECG', 'EOG'],
    value='EMG',
    description='Signal Type:'
)

# EMG parameters
emg_pattern = widgets.Dropdown(
    options=['isometric', 'dynamic', 'repetitive', 'complex'],
    value='isometric',
    description='Pattern:'
)
emg_intensity = widgets.FloatSlider(
    value=0.7,
    min=0.1,
    max=1.0,
    step=0.1,
    description='Intensity:'
)
emg_fatigue = widgets.Checkbox(
    value=False,
    description='Add Fatigue'
)

# ECG parameters
ecg_condition = widgets.Dropdown(
    options=['normal', 'pvc', 'st_elevation', 'af'],
    value='normal',
    description='Condition:'
)
heart_rate = widgets.FloatSlider(
    value=75.0,
    min=40.0,
    max=150.0,
    step=5.0,
    description='Heart Rate:'
)

# EOG parameters
eog_movement = widgets.Dropdown(
    options=['saccades', 'pursuit', 'fixation', 'blinks'],
    value='saccades',
    description='Movement:'
)
movement_amplitude = widgets.FloatSlider(
    value=10.0,
    min=1.0,
    max=30.0,
    step=1.0,
    description='Amplitude:'
)

# Noise parameters
add_noise = widgets.Checkbox(
    value=True,
    description='Add Noise'
)
noise_level = widgets.FloatSlider(
    value=0.1,
    min=0.0,
    max=0.5,
    step=0.05,
    description='Noise Level:'
)

def update_signal(*args):
    # Clear previous output
    clear_output(wait=True)
    
    # Display all widgets
    display(signal_type)
    
    if signal_type.value == 'EMG':
        display(emg_pattern, emg_intensity, emg_fatigue)
    elif signal_type.value == 'ECG':
        display(ecg_condition, heart_rate)
    else:  # EOG
        display(eog_movement, movement_amplitude)
        
    display(add_noise, noise_level)
    
    # Generate signal
    if signal_type.value == 'EMG':
        sim = EMGSimulator(sampling_rate=1000, duration=5.0)
        if emg_pattern.value == 'isometric':
            signal = sim.generate(activation_level=emg_intensity.value, fatigue=emg_fatigue.value)
        elif emg_pattern.value == 'dynamic':
            signal = sim.simulate_dynamic_contraction(pattern='ramp', max_intensity=emg_intensity.value)
        elif emg_pattern.value == 'repetitive':
            signal = sim.simulate_repetitive_movement(frequency=1.0, intensity=emg_intensity.value)
        else:  # complex
            signal = sim.simulate_complex_pattern(
                movements=['isometric', 'dynamic', 'repetitive'],
                durations=[1.0, 2.0, 2.0],
                intensities=[emg_intensity.value]*3
            )
            
    elif signal_type.value == 'ECG':
        sim = ECGSimulator(sampling_rate=1000, duration=10.0)
        if ecg_condition.value == 'normal':
            signal = sim.simulate_normal_sinus(heart_rate=heart_rate.value)
        elif ecg_condition.value == 'pvc':
            signal = sim.simulate_arrhythmias('pvc', base_heart_rate=heart_rate.value)
        elif ecg_condition.value == 'st_elevation':
            signal = sim.simulate_ischemia('st_elevation', heart_rate=heart_rate.value)
        else:  # af
            signal = sim.simulate_arrhythmias('af', base_heart_rate=heart_rate.value)
            
    else:  # EOG
        sim = EOGSimulator(sampling_rate=1000, duration=5.0)
        if eog_movement.value == 'saccades':
            signal = sim.simulate_saccades(
                amplitudes=[movement_amplitude.value, -movement_amplitude.value],
                directions=['horizontal']*2
            )
        elif eog_movement.value == 'pursuit':
            signal = sim.simulate_smooth_pursuit(
                pattern='sinusoidal',
                amplitude=movement_amplitude.value
            )
        elif eog_movement.value == 'fixation':
            signal = sim.simulate_fixations(duration=5.0)
        else:  # blinks
            signal = sim.simulate_blinks(n_blinks=4)
    
    # Add noise if selected
    if add_noise.value:
        noise_sim = NoiseSimulator(sampling_rate=1000, duration=len(signal)/1000)
        signal += noise_sim.simulate_noise('gaussian', std=noise_level.value)
        signal += noise_sim.simulate_noise('powerline', amplitude=noise_level.value/2)
    
    # Plot signal
    plt.figure(figsize=(12, 4))
    plt.plot(np.linspace(0, len(signal)/1000, len(signal)), signal)
    plt.title(f'{signal_type.value} Signal')
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')
    plt.grid(True)
    plt.show()

# Connect widgets to update function
signal_type.observe(update_signal, 'value')
emg_pattern.observe(update_signal, 'value')
emg_intensity.observe(update_signal, 'value')
emg_fatigue.observe(update_signal, 'value')
ecg_condition.observe(update_signal, 'value')
heart_rate.observe(update_signal, 'value')
eog_movement.observe(update_signal, 'value')
movement_amplitude.observe(update_signal, 'value')
add_noise.observe(update_signal, 'value')
noise_level.observe(update_signal, 'value')

# Initial display
update_signal()

### 2. Real-time Processing Demo

Watch the signal processing pipeline in action:

In [None]:
# Create widgets for processing parameters
filter_lowcut = widgets.FloatSlider(
    value=20.0,
    min=1.0,
    max=100.0,
    step=1.0,
    description='Low Cut (Hz):'
)

filter_highcut = widgets.FloatSlider(
    value=450.0,
    min=100.0,
    max=500.0,
    step=10.0,
    description='High Cut (Hz):'
)

notch_freq = widgets.FloatSlider(
    value=50.0,
    min=0.0,
    max=60.0,
    step=10.0,
    description='Notch Freq:'
)

wavelet_level = widgets.IntSlider(
    value=3,
    min=1,
    max=5,
    description='Wavelet Level:'
)

def process_signal(*args):
    # Clear previous output
    clear_output(wait=True)
    
    # Display widgets
    display(filter_lowcut, filter_highcut, notch_freq, wavelet_level)
    
    # Generate a noisy signal
    sim = EMGSimulator(sampling_rate=1000, duration=5.0)
    signal = sim.generate(activation_level=0.7)
    
    # Add noise
    noise_sim = NoiseSimulator(sampling_rate=1000, duration=5.0)
    noisy_signal = signal + \
                   noise_sim.simulate_noise('gaussian', std=0.1) + \
                   noise_sim.simulate_noise('powerline', frequency=notch_freq.value)
    
    # Process signal
    denoiser = SignalDenoising()
    normalizer = SignalNormalization()
    
    # Apply filters
    filtered = denoiser.bandpass_filter(
        noisy_signal,
        lowcut=filter_lowcut.value,
        highcut=filter_highcut.value,
        fs=1000
    )
    
    if notch_freq.value > 0:
        filtered = denoiser.notch_filter(filtered, freq=notch_freq.value, fs=1000)
        
    # Apply wavelet denoising
    denoised = denoiser.wavelet_denoise(filtered, wavelet='db4', level=wavelet_level.value)
    
    # Normalize
    processed = normalizer.robust_scale(denoised)
    
    # Plot signals
    t = np.linspace(0, 5, 5000)
    fig, axes = plt.subplots(3, 1, figsize=(12, 8))
    
    axes[0].plot(t, noisy_signal)
    axes[0].set_title('Raw Signal')
    axes[0].grid(True)
    
    axes[1].plot(t, filtered)
    axes[1].set_title('Filtered Signal')
    axes[1].grid(True)
    
    axes[2].plot(t, processed)
    axes[2].set_title('Final Processed Signal')
    axes[2].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # Calculate and display signal quality metrics
    snr = 10 * np.log10(np.var(signal) / np.var(noisy_signal - signal))
    snr_processed = 10 * np.log10(np.var(signal) / np.var(processed - signal))
    
    print(f"Signal Quality Metrics:")
    print(f"SNR (Raw): {snr:.2f} dB")
    print(f"SNR (Processed): {snr_processed:.2f} dB")
    print(f"Improvement: {snr_processed - snr:.2f} dB")

# Connect widgets to update function
filter_lowcut.observe(process_signal, 'value')
filter_highcut.observe(process_signal, 'value')
notch_freq.observe(process_signal, 'value')
wavelet_level.observe(process_signal, 'value')

# Initial display
process_signal()

### 3. Live Feature Visualization

Explore how different signal characteristics affect extracted features:

In [None]:
# Create widgets for feature extraction
window_size = widgets.IntSlider(
    value=500,
    min=100,
    max=1000,
    step=100,
    description='Window Size:'
)

feature_type = widgets.Dropdown(
    options=['time', 'frequency', 'nonlinear'],
    value='time',
    description='Feature Type:'
)

def update_features(*args):
    # Clear previous output
    clear_output(wait=True)
    
    # Display widgets
    display(window_size, feature_type)
    
    # Generate and process signal
    sim = EMGSimulator(sampling_rate=1000, duration=5.0)
    signal = sim.generate(activation_level=0.7)
    
    # Add noise and process
    noise_sim = NoiseSimulator(sampling_rate=1000, duration=5.0)
    signal += noise_sim.simulate_noise('gaussian', std=0.1)
    
    denoiser = SignalDenoising()
    normalizer = SignalNormalization()
    segmenter = SignalSegmentation()
    
    # Process signal
    processed = denoiser.bandpass_filter(signal, lowcut=20, highcut=450, fs=1000)
    processed = denoiser.notch_filter(processed, freq=50, fs=1000)
    processed = normalizer.robust_scale(processed)
    
    # Segment signal
    segments = segmenter.overlap_window(processed, window_size=window_size.value, overlap=0.5)
    
    # Extract features
    time_features = TimeDomainFeatures(sampling_rate=1000)
    freq_features = FrequencyDomainFeatures(sampling_rate=1000)
    nonlinear_features = NonlinearFeatures(sampling_rate=1000)
    
    features_list = []
    for segment in segments:
        if feature_type.value == 'time':
            features = {
                'RMS': time_features.rms(segment),
                'MAV': time_features.mav(segment),
                'ZCR': time_features.zero_crossing_rate(segment),
                'SSC': time_features.slope_sign_changes(segment),
                'WL': time_features.waveform_length(segment)
            }
        elif feature_type.value == 'frequency':
            features = {
                'Mean Freq': freq_features.mean_frequency(segment),
                'Median Freq': freq_features.median_frequency(segment),
                'Spectral Entropy': freq_features.spectral_entropy(segment)
            }
            band_powers = freq_features.frequency_band_power(segment, [(20,50), (50,100), (100,200)])
            for i, power in enumerate(band_powers):
                features[f'Band Power {i+1}'] = power
        else:  # nonlinear
            features = {
                'Sample Entropy': nonlinear_features.sample_entropy(segment),
                'Approx Entropy': nonlinear_features.approximate_entropy(segment),
                'Fractal Dim': nonlinear_features.fractal_dimension(segment)
            }
        features_list.append(features)
    
    # Convert to DataFrame and plot
    features_df = pd.DataFrame(features_list)
    
    # Plot signal and features
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    
    # Plot signal
    t = np.linspace(0, 5, len(signal))
    ax1.plot(t, signal)
    ax1.set_title('Signal')
    ax1.grid(True)
    
    # Plot features
    features_df.boxplot(ax=ax2)
    ax2.set_title(f'{feature_type.value.capitalize()} Domain Features')
    ax2.tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # Display feature statistics
    print("\nFeature Statistics:")
    print(features_df.describe().round(3))

# Connect widgets to update function
window_size.observe(update_features, 'value')
feature_type.observe(update_features, 'value')

# Initial display
update_features()