# EMG Gesture Recognition Model

This notebook creates a machine learning model to predict hand gestures from EMG (Electromyography) signals.

## Dataset Overview
- **Channels**: 3 EMG sensors (ch1, ch2, ch3)
- **Gestures**: 12 different hand gestures
- **Features**: Time and frequency domain features extracted from sliding windows


## 1. Import Libraries and Setup

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from scipy import signal
from scipy.stats import skew, kurtosis
import joblib
import warnings
warnings.filterwarnings('ignore')

# Set plotting style
plt.style.use('default')
sns.set_palette("husl")

print("Libraries imported successfully!")

## 2. Load and Explore Data

In [None]:
# Load the dataset
data = pd.read_csv('data/combined_emg_data (1).csv')

print(f"Dataset shape: {data.shape}")
print(f"\nColumns: {list(data.columns)}")
print(f"\nFirst few rows:")
data.head()

In [None]:
# Explore gesture distribution
gesture_counts = data['gesture'].value_counts()
print("Gesture distribution:")
print(gesture_counts)

# Plot gesture distribution
plt.figure(figsize=(12, 6))
gesture_counts.plot(kind='bar')
plt.title('Distribution of Gestures in Dataset')
plt.xlabel('Gesture')
plt.ylabel('Number of Samples')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Visualize EMG signals for different gestures
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
gestures_to_plot = ['0-OPEN', '1-CLOSE', '2-PINCH', '3-POINT']

for i, gesture in enumerate(gestures_to_plot):
    ax = axes[i//2, i%2]
    gesture_data = data[data['gesture'] == gesture].head(500)
    
    ax.plot(gesture_data['ch1'], label='Channel 1', alpha=0.7)
    ax.plot(gesture_data['ch2'], label='Channel 2', alpha=0.7)
    ax.plot(gesture_data['ch3'], label='Channel 3', alpha=0.7)
    
    ax.set_title(f'EMG Signals for {gesture}')
    ax.set_xlabel('Sample')
    ax.set_ylabel('EMG Amplitude')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 3. Feature Extraction Functions

In [None]:
def extract_time_domain_features(signal_window):
    """Extract comprehensive time domain features from EMG signal window"""
    features = {}
    
    # Basic statistical features
    features['mean'] = np.mean(signal_window)
    features['std'] = np.std(signal_window)
    features['var'] = np.var(signal_window)
    features['rms'] = np.sqrt(np.mean(signal_window**2))
    features['max'] = np.max(signal_window)
    features['min'] = np.min(signal_window)
    features['range'] = features['max'] - features['min']
    
    # Advanced statistical features
    features['skewness'] = skew(signal_window)
    features['kurtosis'] = kurtosis(signal_window)
    features['mad'] = np.mean(np.abs(signal_window - features['mean']))
    
    # Percentiles
    features['q25'] = np.percentile(signal_window, 25)
    features['q75'] = np.percentile(signal_window, 75)
    features['iqr'] = features['q75'] - features['q25']
    
    # Zero crossing rate
    zero_crossings = np.where(np.diff(np.signbit(signal_window - np.mean(signal_window))))[0]
    features['zcr'] = len(zero_crossings) / len(signal_window)
    
    # Slope sign changes
    diff_signal = np.diff(signal_window)
    slope_changes = np.where(np.diff(np.signbit(diff_signal)))[0]
    features['ssc'] = len(slope_changes) / len(signal_window)
    
    # Waveform length
    features['wl'] = np.sum(np.abs(np.diff(signal_window)))
    
    # Average amplitude change
    features['aac'] = np.mean(np.abs(np.diff(signal_window)))
    
    return features

def extract_frequency_domain_features(signal_window, fs=1000):
    """Extract frequency domain features from EMG signal window"""
    features = {}
    
    # Power Spectral Density
    freqs, psd = signal.welch(signal_window, fs=fs, nperseg=min(256, len(signal_window)//4))
    
    total_power = np.sum(psd)
    if total_power > 0:
        # Mean and median frequency
        features['mean_freq'] = np.sum(freqs * psd) / total_power
        
        cumsum_psd = np.cumsum(psd)
        median_idx = np.where(cumsum_psd >= total_power/2)[0]
        features['median_freq'] = freqs[median_idx[0]] if len(median_idx) > 0 else 0
        
        # Band power features
        low_freq_band = (freqs >= 20) & (freqs <= 80)
        mid_freq_band = (freqs >= 80) & (freqs <= 150)
        high_freq_band = (freqs >= 150) & (freqs <= 250)
        
        features['low_band_power'] = np.sum(psd[low_freq_band]) / total_power
        features['mid_band_power'] = np.sum(psd[mid_freq_band]) / total_power
        features['high_band_power'] = np.sum(psd[high_freq_band]) / total_power
        
        # Peak frequency
        features['peak_freq'] = freqs[np.argmax(psd)]
    else:
        features.update({key: 0 for key in ['mean_freq', 'median_freq', 'low_band_power', 
                                           'mid_band_power', 'high_band_power', 'peak_freq']})
    
    return features

print("Feature extraction functions defined!")

## 4. Create Sliding Windows and Extract Features

In [None]:
def create_sliding_windows(data, window_size=150, overlap=0.7):
    """Create sliding windows from continuous EMG data"""
    step_size = int(window_size * (1 - overlap))
    windows = []
    labels = []
    
    emg_columns = ['ch1', 'ch2', 'ch3']
    label_column = 'gesture'
    
    # Group by gesture to maintain consistency
    for gesture in data[label_column].unique():
        gesture_data = data[data[label_column] == gesture].reset_index(drop=True)
        
        for i in range(0, len(gesture_data) - window_size + 1, step_size):
            window_data = gesture_data.iloc[i:i+window_size]
            windows.append(window_data[emg_columns].values)
            labels.append(gesture)
    
    return np.array(windows), np.array(labels)

# Create windows
print("Creating sliding windows...")
windows, labels = create_sliding_windows(data, window_size=150, overlap=0.7)
print(f"Created {len(windows)} windows")
print(f"Window shape: {windows[0].shape}")

# Show distribution of windows per gesture
unique_labels, counts = np.unique(labels, return_counts=True)
window_distribution = pd.DataFrame({'Gesture': unique_labels, 'Windows': counts})
print("\nWindows per gesture:")
print(window_distribution)

In [None]:
def extract_features_from_windows(windows):
    """Extract comprehensive features from all windows"""
    all_features = []
    
    print("Extracting features from windows...")
    for i, window in enumerate(windows):
        if i % 1000 == 0:
            print(f"Processing window {i}/{len(windows)}")
            
        window_features = {}
        
        # Extract features for each EMG channel
        for channel_idx in range(window.shape[1]):
            channel_data = window[:, channel_idx]
            
            # Time domain features
            td_features = extract_time_domain_features(channel_data)
            for key, value in td_features.items():
                window_features[f'ch{channel_idx+1}_{key}'] = value
            
            # Frequency domain features
            fd_features = extract_frequency_domain_features(channel_data)
            for key, value in fd_features.items():
                window_features[f'ch{channel_idx+1}_{key}'] = value
        
        # Cross-channel correlation features
        corr_12 = np.corrcoef(window[:, 0], window[:, 1])[0, 1]
        corr_13 = np.corrcoef(window[:, 0], window[:, 2])[0, 1]
        corr_23 = np.corrcoef(window[:, 1], window[:, 2])[0, 1]
        
        window_features['corr_ch1_ch2'] = corr_12 if not np.isnan(corr_12) else 0
        window_features['corr_ch1_ch3'] = corr_13 if not np.isnan(corr_13) else 0
        window_features['corr_ch2_ch3'] = corr_23 if not np.isnan(corr_23) else 0
        
        all_features.append(window_features)
    
    # Convert to DataFrame and then to numpy array
    features_df = pd.DataFrame(all_features)
    feature_names = features_df.columns.tolist()
    
    return features_df.values, feature_names

# Extract features
features, feature_names = extract_features_from_windows(windows)
print(f"\nExtracted {features.shape[1]} features per window")
print(f"Total feature matrix shape: {features.shape}")

# Handle any NaN or infinite values
features = np.nan_to_num(features, nan=0.0, posinf=0.0, neginf=0.0)
print("Features cleaned and ready for training!")

## 5. Prepare Data for Training

In [None]:
# Encode labels
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

print(f"Label mapping:")
for i, gesture in enumerate(label_encoder.classes_):
    print(f"{i}: {gesture}")

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    features, labels_encoded, test_size=0.2, random_state=42, stratify=labels_encoded
)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"\nTraining set size: {X_train_scaled.shape}")
print(f"Test set size: {X_test_scaled.shape}")

## 6. Train and Compare Multiple Models

In [None]:
# Define models to compare
models = {
    'Random Forest': RandomForestClassifier(n_estimators=200, max_depth=20, random_state=42, n_jobs=-1),
    'Gradient Boosting': GradientBoostingClassifier(n_estimators=200, learning_rate=0.1, max_depth=5, random_state=42),
    'SVM': SVC(kernel='rbf', C=10, gamma='scale', random_state=42, probability=True),
    'Neural Network': MLPClassifier(hidden_layer_sizes=(200, 100), max_iter=1000, random_state=42)
}

# Train and evaluate each model
results = {}
trained_models = {}

for name, model in models.items():
    print(f"\n{'='*50}")
    print(f"Training {name}...")
    print(f"{'='*50}")
    
    # Train model
    model.fit(X_train_scaled, y_train)
    
    # Make predictions
    y_pred = model.predict(X_test_scaled)
    
    # Calculate accuracy
    accuracy = accuracy_score(y_test, y_pred)
    
    # Cross-validation
    cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5)
    
    results[name] = {
        'accuracy': accuracy,
        'cv_mean': cv_scores.mean(),
        'cv_std': cv_scores.std(),
        'predictions': y_pred
    }
    
    trained_models[name] = model
    
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"CV Accuracy: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

# Display results summary
print(f"\n{'='*60}")
print("MODEL COMPARISON SUMMARY")
print(f"{'='*60}")

results_df = pd.DataFrame({
    'Model': list(results.keys()),
    'Test Accuracy': [results[name]['accuracy'] for name in results.keys()],
    'CV Mean': [results[name]['cv_mean'] for name in results.keys()],
    'CV Std': [results[name]['cv_std'] for name in results.keys()]
})

results_df = results_df.sort_values('Test Accuracy', ascending=False)
print(results_df)

best_model_name = results_df.iloc[0]['Model']
best_model = trained_models[best_model_name]
print(f"\nBest Model: {best_model_name} with accuracy: {results_df.iloc[0]['Test Accuracy']:.4f}")

## 7. Detailed Analysis of Best Model

In [None]:
# Get predictions from best model
best_predictions = results[best_model_name]['predictions']

# Classification report
print(f"Classification Report for {best_model_name}:")
print("="*60)
print(classification_report(y_test, best_predictions, target_names=label_encoder.classes_))

In [None]:
# Confusion Matrix
cm = confusion_matrix(y_test, best_predictions)

plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
           xticklabels=label_encoder.classes_,
           yticklabels=label_encoder.classes_)
plt.title(f'Confusion Matrix - {best_model_name}')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

In [None]:
# Feature importance (for tree-based models)
if hasattr(best_model, 'feature_importances_'):
    importance_df = pd.DataFrame({
        'feature': feature_names,
        'importance': best_model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    # Plot top 20 features
    plt.figure(figsize=(12, 8))
    sns.barplot(data=importance_df.head(20), x='importance', y='feature')
    plt.title(f'Top 20 Feature Importances - {best_model_name}')
    plt.tight_layout()
    plt.show()
    
    print("Top 10 most important features:")
    print(importance_df.head(10))
else:
    print(f"Feature importance not available for {best_model_name}")

## 8. Save the Best Model

In [None]:
# Save the best model and preprocessors
model_data = {
    'model': best_model,
    'scaler': scaler,
    'label_encoder': label_encoder,
    'feature_names': feature_names,
    'model_name': best_model_name
}

model_filename = f'best_emg_gesture_model_{best_model_name.lower().replace(" ", "_")}.pkl'
joblib.dump(model_data, model_filename)
print(f"Best model saved as: {model_filename}")

# Also save a summary
summary = {
    'best_model': best_model_name,
    'accuracy': results[best_model_name]['accuracy'],
    'cv_accuracy': results[best_model_name]['cv_mean'],
    'num_features': len(feature_names),
    'num_gestures': len(label_encoder.classes_),
    'gestures': list(label_encoder.classes_),
    'window_size': 150,
    'overlap': 0.7
}

import json
with open('model_summary.json', 'w') as f:
    json.dump(summary, f, indent=2)

print("Model summary saved as: model_summary.json")

## 9. Test Prediction Function

In [None]:
def predict_gesture(emg_window, model, scaler, label_encoder, feature_names):
    """Predict gesture from a single EMG window"""
    
    # Extract features from the window
    window_features = {}
    
    # Extract features for each EMG channel
    for channel_idx in range(emg_window.shape[1]):
        channel_data = emg_window[:, channel_idx]
        
        # Time domain features
        td_features = extract_time_domain_features(channel_data)
        for key, value in td_features.items():
            window_features[f'ch{channel_idx+1}_{key}'] = value
        
        # Frequency domain features
        fd_features = extract_frequency_domain_features(channel_data)
        for key, value in fd_features.items():
            window_features[f'ch{channel_idx+1}_{key}'] = value
    
    # Cross-channel correlation features
    corr_12 = np.corrcoef(emg_window[:, 0], emg_window[:, 1])[0, 1]
    corr_13 = np.corrcoef(emg_window[:, 0], emg_window[:, 2])[0, 1]
    corr_23 = np.corrcoef(emg_window[:, 1], emg_window[:, 2])[0, 1]
    
    window_features['corr_ch1_ch2'] = corr_12 if not np.isnan(corr_12) else 0
    window_features['corr_ch1_ch3'] = corr_13 if not np.isnan(corr_13) else 0
    window_features['corr_ch2_ch3'] = corr_23 if not np.isnan(corr_23) else 0
    
    # Convert to array in correct order
    features_array = np.array([window_features[name] for name in feature_names]).reshape(1, -1)
    features_array = np.nan_to_num(features_array, nan=0.0, posinf=0.0, neginf=0.0)
    
    # Scale features
    features_scaled = scaler.transform(features_array)
    
    # Predict
    prediction = model.predict(features_scaled)[0]
    probability = model.predict_proba(features_scaled)[0]
    
    # Decode label
    gesture_name = label_encoder.inverse_transform([prediction])[0]
    confidence = np.max(probability)
    
    return gesture_name, confidence, probability

# Test the prediction function with a sample from the test set
test_window = windows[0]  # Use first window as example
predicted_gesture, confidence, probabilities = predict_gesture(
    test_window, best_model, scaler, label_encoder, feature_names
)

print(f"Test prediction:")
print(f"Predicted gesture: {predicted_gesture}")
print(f"Confidence: {confidence:.4f}")
print(f"Actual gesture: {labels[0]}")

# Show probability distribution
prob_df = pd.DataFrame({
    'Gesture': label_encoder.classes_,
    'Probability': probabilities
}).sort_values('Probability', ascending=False)

print("\nProbability distribution:")
print(prob_df.head())

## 10. Model Performance Summary

In [None]:
print("\n" + "="*80)
print("EMG GESTURE RECOGNITION MODEL - FINAL SUMMARY")
print("="*80)

print(f"Dataset: {data.shape[0]} samples, {len(label_encoder.classes_)} gestures")
print(f"Features: {len(feature_names)} features extracted from 3 EMG channels")
print(f"Windows: {len(windows)} sliding windows (size={150}, overlap=70%)")
print(f"\nBest Model: {best_model_name}")
print(f"Test Accuracy: {results[best_model_name]['accuracy']:.4f}")
print(f"Cross-validation Accuracy: {results[best_model_name]['cv_mean']:.4f} ± {results[best_model_name]['cv_std']:.4f}")

print(f"\nGestures recognized:")
for i, gesture in enumerate(label_encoder.classes_):
    print(f"  {i}: {gesture}")

print(f"\nModel saved as: {model_filename}")
print(f"Summary saved as: model_summary.json")

print("\n" + "="*80)
print("MODEL READY FOR REAL-TIME GESTURE PREDICTION!")
print("="*80)