Install required packages

In [None]:
!pip install numpy scipy tensorflow scikit-learn pywavelets tf2onnx matplotlib pandas seaborn


# Import necessary libraries

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import scipy.io as sio
from scipy.signal import butter, filtfilt, welch
import pywt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import os
import tf2onnx
import logging
from typing import Tuple, Dict, List, Optional
import zipfile
import urllib.request

# Set random seeds for reproducibility

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

# Configure logging

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 1. Data Loading and Preprocessing

In [None]:
class CapgMyoDataLoader:
    def __init__(self, data_dir: str, window_size: int = 1000, overlap: float = 0.5, fs: int = 1000):
        self.data_dir = data_dir
        self.window_size = window_size
        self.overlap = overlap
        self.fs = fs
        self.stride = int(window_size * (1 - overlap))
        self.num_channels = 8
        self.num_gestures = 8

    def download_dataset(self):
        """Download CapgMyo dataset if not already present"""
        if not os.path.exists(self.data_dir):
            os.makedirs(self.data_dir, exist_ok=True)
            # Note: Replace with actual dataset URL
            url = "https://example.com/capgmyo_dataset.zip"
            logger.info("Downloading CapgMyo dataset...")
            urllib.request.urlretrieve(url, "capgmyo_dataset.zip")
            
            with zipfile.ZipFile("capgmyo_dataset.zip", 'r') as zip_ref:
                zip_ref.extractall(self.data_dir)
            os.remove("capgmyo_dataset.zip")
            logger.info("Dataset downloaded and extracted successfully")

    def load_data(self) -> Tuple[np.ndarray, np.ndarray]:
        """Load and preprocess CapgMyo dataset"""
        logger.info("Loading CapgMyo dataset...")
        X = []
        y = []
        
        for subject_dir in sorted(os.listdir(self.data_dir)):
            if not subject_dir.startswith('subject'):
                continue
                
            subject_path = os.path.join(self.data_dir, subject_dir)
            logger.info(f"Processing {subject_dir}")
            
            for session in ['session1', 'session2']:
                session_path = os.path.join(subject_path, session)
                if not os.path.exists(session_path):
                    continue
                
                for gesture in range(self.num_gestures):
                    gesture_file = f'gesture{gesture+1}.mat'
                    file_path = os.path.join(session_path, gesture_file)
                    
                    if not os.path.exists(file_path):
                        continue
                    
                    try:
                        data = sio.loadmat(file_path)
                        emg_data = data['emg']
                        X.append(emg_data)
                        y.extend([gesture] * len(emg_data))
                    except Exception as e:
                        logger.error(f"Error loading {file_path}: {str(e)}")
        
        if not X:
            raise ValueError("No data was loaded. Please check the data directory path.")
        
        X = np.vstack(X)
        y = np.array(y)
        
        logger.info(f"Loaded dataset shape: {X.shape}")
        return X, y

 Feature Extraction

In [None]:
class EMGFeatureExtractor:
    def __init__(self, fs: int = 1000):
        self.fs = fs

    def bandpass_filter(self, data: np.ndarray) -> np.ndarray:
        """Apply bandpass filter to EMG signals"""
        lowcut = 20
        highcut = 500
        nyquist = 0.5 * self.fs
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(4, [low, high], btype='band')
        return filtfilt(b, a, data, axis=0)

    def extract_time_domain_features(self, signal: np.ndarray) -> Dict[str, np.ndarray]:
        """Extract time domain features"""
        features = {}
        
        # Root Mean Square (RMS)
        features['rms'] = np.sqrt(np.mean(signal ** 2, axis=0))
        
        # Mean Absolute Value (MAV)
        features['mav'] = np.mean(np.abs(signal), axis=0)
        
        # Waveform Length (WL)
        features['wl'] = np.sum(np.abs(np.diff(signal, axis=0)), axis=0)
        
        # Zero Crossing Rate (ZCR)
        features['zcr'] = np.sum(np.diff(np.signbit(signal), axis=0), axis=0)
        
        return features

    def extract_frequency_domain_features(self, signal: np.ndarray) -> Dict[str, np.ndarray]:
        """Extract frequency domain features"""
        features = {}
        
        # Power Spectral Density (PSD)
        freqs, psd = welch(signal, fs=self.fs, nperseg=256)
        features['psd_mean'] = np.mean(psd, axis=0)
        features['psd_std'] = np.std(psd, axis=0)
        
        # Wavelet Transform
        coeffs = pywt.wavedec(signal, 'db4', level=4, axis=0)
        for i, coeff in enumerate(coeffs):
            features[f'wavelet_{i}_mean'] = np.mean(coeff, axis=0)
            features[f'wavelet_{i}_std'] = np.std(coeff, axis=0)
        
        return features

    def extract_features(self, window: np.ndarray) -> Dict[str, np.ndarray]:
        """Extract all features from an EMG window"""
        # Apply bandpass filter
        filtered_window = self.bandpass_filter(window)
        
        # Extract features
        time_features = self.extract_time_domain_features(filtered_window)
        freq_features = self.extract_frequency_domain_features(filtered_window)
        
        # Combine features
        features = {**time_features, **freq_features}
        
        # Convert to feature vector
        feature_vector = np.concatenate([v.flatten() for v in features.values()])
        
        return feature_vector


# 3. Model Definitions

In [None]:
def create_cnn_model(input_shape: Tuple[int, ...], num_classes: int) -> tf.keras.Model:
    """Create CNN model"""
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.Conv1D(64, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Conv1D(128, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling1D(pool_size=2),
        layers.Conv1D(256, kernel_size=3, activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.GlobalAveragePooling1D(),
        layers.Dense(128, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

def create_lstm_model(input_shape: Tuple[int, ...], num_classes: int) -> tf.keras.Model:
    """Create LSTM model"""
    model = models.Sequential([
        layers.Input(shape=input_shape),
        layers.LSTM(64, return_sequences=True),
        layers.BatchNormalization(),
        layers.LSTM(32),
        layers.BatchNormalization(),
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

def create_hybrid_model(input_shape: Tuple[int, ...], num_classes: int) -> tf.keras.Model:
    """Create hybrid CNN-LSTM model"""
    inputs = layers.Input(shape=input_shape)
    
    # CNN branch
    x_cnn = layers.Conv1D(64, kernel_size=3, activation='relu', padding='same')(inputs)
    x_cnn = layers.BatchNormalization()(x_cnn)
    x_cnn = layers.MaxPooling1D(pool_size=2)(x_cnn)
    x_cnn = layers.Conv1D(128, kernel_size=3, activation='relu', padding='same')(x_cnn)
    x_cnn = layers.BatchNormalization()(x_cnn)
    x_cnn = layers.GlobalAveragePooling1D()(x_cnn)
    
    # LSTM branch
    x_lstm = layers.LSTM(64, return_sequences=True)(inputs)
    x_lstm = layers.BatchNormalization()(x_lstm)
    x_lstm = layers.LSTM(32)(x_lstm)
    x_lstm = layers.BatchNormalization()(x_lstm)
    
    # Combine branches
    x = layers.Concatenate()([x_cnn, x_lstm])
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.3)(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    
    return models.Model(inputs=inputs, outputs=outputs)


4. Training and Evaluation

In [None]:
class ModelTrainer:
    def __init__(self, model: tf.keras.Model, learning_rate: float = 0.001):
        self.model = model
        self.model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
            loss='categorical_crossentropy',
            metrics=['accuracy']
        )
        
    def train(self, X_train: np.ndarray, y_train: np.ndarray,
              X_val: np.ndarray, y_val: np.ndarray,
              batch_size: int = 32, epochs: int = 50) -> Dict[str, List[float]]:
        """Train the model"""
        # Convert labels to one-hot encoding
        y_train_onehot = tf.keras.utils.to_categorical(y_train)
        y_val_onehot = tf.keras.utils.to_categorical(y_val)
        
        # Define callbacks
        callbacks = [
            tf.keras.callbacks.EarlyStopping(
                monitor='val_loss',
                patience=10,
                restore_best_weights=True
            ),
            tf.keras.callbacks.ReduceLROnPlateau(
                monitor='val_loss',
                factor=0.5,
                patience=5,
                min_lr=1e-6
            ),
            tf.keras.callbacks.ModelCheckpoint(
                'best_model.h5',
                monitor='val_accuracy',
                save_best_only=True
            )
        ]
        
        # Train model
        history = self.model.fit(
            X_train, y_train_onehot,
            validation_data=(X_val, y_val_onehot),
            batch_size=batch_size,
            epochs=epochs,
            callbacks=callbacks,
            verbose=1
        )
        
        return history.history
    
    def evaluate(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
        """Evaluate model performance"""
        y_pred = self.model.predict(X_test)
        y_pred_classes = np.argmax(y_pred, axis=1)
        
        metrics = {
            'accuracy': accuracy_score(y_test, y_pred_classes),
            'f1_score': f1_score(y_test, y_pred_classes, average='weighted'),
            'confusion_matrix': confusion_matrix(y_test, y_pred_classes)
        }
        
        return metrics


# 5. Model Export

In [None]:
def export_model(model: tf.keras.Model, export_dir: str):
    """Export model to different formats"""
    os.makedirs(export_dir, exist_ok=True)
    
    # Save TensorFlow model
    model.save(os.path.join(export_dir, 'tf_model'))
    
    # Export to ONNX
    input_signature = (tf.TensorSpec((None,) + model.input_shape[1:], tf.float32),)
    model_proto, _ = tf2onnx.convert.from_keras(model, input_signature=input_signature)
    with open(os.path.join(export_dir, 'model.onnx'), 'wb') as f:
        f.write(model_proto.SerializeToString())
    
    # Export to TFLite
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    tflite_model = converter.convert()
    with open(os.path.join(export_dir, 'model.tflite'), 'wb') as f:
        f.write(tflite_model)


6. Visualization

In [None]:
 def plot_training_history(history: Dict[str, List[float]], save_path: str):
    """Plot training history"""
    plt.figure(figsize=(12, 4))
    
    # Plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history['accuracy'], label='Training Accuracy')
    plt.plot(history['val_accuracy'], label='Validation Accuracy')
    plt.title('Model Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # Plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history['loss'], label='Training Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

def plot_confusion_matrix(cm: np.ndarray, save_path: str):
    """Plot confusion matrix"""
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

# Main execution
def main():
    # Configuration
    DATA_DIR = "capgmyo_dataset"
    WINDOW_SIZE = 1000
    OVERLAP = 0.5
    SAMPLING_RATE = 1000
    BATCH_SIZE = 32
    EPOCHS = 50
    LEARNING_RATE = 0.001
    
    # Create output directories
    os.makedirs('results', exist_ok=True)
    os.makedirs('exported_models', exist_ok=True)
    
    try:
        # Load and preprocess data
        data_loader = CapgMyoDataLoader(DATA_DIR, WINDOW_SIZE, OVERLAP, SAMPLING_RATE)
        data_loader.download_dataset()
        X, y = data_loader.load_data()
        
        # Extract features
        feature_extractor = EMGFeatureExtractor(SAMPLING_RATE)
        X_features = np.array([feature_extractor.extract_features(window) 
                             for window in X.reshape(-1, WINDOW_SIZE, X.shape[1])])
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X_features, y, test_size=0.2, random_state=42, stratify=y
        )
        X_train, X_val, y_train, y_val = train_test_split(
            X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
        )
        
        # Train different models
        models_to_train = {
            'cnn': create_cnn_model,
            'lstm': create_lstm_model,
            'hybrid': create_hybrid_model
        }
        
        results = {}
        for model_name, model_fn in models_to_train.items():
            logger.info(f"Training {model_name} model...")
            
            # Create and train model
            model = model_fn(X_train.shape[1:], len(np.unique(y)))
            trainer = ModelTrainer(model, LEARNING_RATE)
            history = trainer.train(X_train, y_train, X_val, y_val, BATCH_SIZE, EPOCHS)
            
            # Evaluate model
            metrics = trainer.evaluate(X_test, y_test)
            results[model_name] = {
                'history': history,
                'metrics': metrics
            }
            
            # Plot results
            plot_training_history(
                history,
                f'results/{model_name}_training_history.png'
            )
            plot_confusion_matrix(
                metrics['confusion_matrix'],
                f'results/{model_name}_confusion_matrix.png'
            )
            
            # Export model
            export_model(model, f'exported_models/{model_name}')
            
            logger.info(f"{model_name} model results:")
            logger.info(f"Accuracy: {metrics['accuracy']:.4f}")
            logger.info(f"F1 Score: {metrics['f1_score']:.4f}")
        
        # Save all results
        import json
        with open('results/all_results.json', 'w') as f:
            json.dump(results, f, indent=4)
        
        logger.info("Pipeline completed successfully!")
        
    except Exception as e:
        logger.error(f"Error in pipeline: {str(e)}")
        raise

if __name__ == "__main__":
    main()