# Wrist Movement Classification with LSTM
## Complete Training Pipeline - Generated from Google Colab
## Generated on: 2026-01-11 08:30:56

### Project Overview
This notebook implements an LSTM-based deep learning model for classifying wrist movements using sensor data from IMU sensors.

### Features:
- Data preprocessing and feature engineering
- LSTM sequence modeling
- Model training with early stopping
- Evaluation metrics and visualization
- Model deployment and prediction functions

### Requirements:
- TensorFlow 2.x
- scikit-learn
- pandas, numpy
- matplotlib, seaborn

## 1. Installation and Setup

In [None]:
# Install required packages
!pip install tensorflow scikit-learn pandas numpy matplotlib seaborn joblib -q

print("✓ All required packages installed")

In [None]:
# Import all libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import warnings
import io
import os
import joblib
import zipfile
import json
import shutil

warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print(f"TensorFlow version: {tf.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print("\n✓ All libraries imported successfully!")

## 2. Data Loading Functions

In [None]:
# Data loading utilities for Google Colab
def upload_file_colab():
    """Upload and load CSV file in Google Colab"""
    try:
        from google.colab import files
        uploaded = files.upload()

        for filename in uploaded.keys():
            print(f'Uploaded file: {filename}')
            return pd.read_csv(io.BytesIO(uploaded[filename]))
    except ImportError:
        print("Not in Google Colab environment")
        return None
    except Exception as e:
        print(f"Error uploading file: {e}")
        return None

def load_from_path(file_path):
    """Load CSV from local path"""
    try:
        return pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"File not found at {file_path}")
        return None

def load_from_content(content):
    """Load CSV from string content"""
    try:
        return pd.read_csv(io.StringIO(content))
    except Exception as e:
        print(f"Error loading from content: {e}")
        return None

print("✓ Data loading functions defined")

## 3. Data Preprocessing Functions

In [None]:
def preprocess_data(df):
    """Preprocess the dataset for wrist movement classification"""
    data = df.copy()

    # Display dataset info
    print(f"Dataset shape: {data.shape}")
    print(f"Columns: {data.columns.tolist()}")
    print(f"Movement types: {data['Movement'].unique()}")
    print(f"Movement distribution:\n{data['Movement'].value_counts()}")

    # Features to use
    possible_features = [
        'AccelX', 'AccelY', 'AccelZ',
        'GyRIGHToX', 'GyRIGHToY', 'GyRIGHToZ',
        'GyroX', 'GyroY', 'GyroZ',
        'AngleX', 'AngleY', 'AngleZ',
        'FlexionAngle', 'DeviationAngle'
    ]

    # Select only features that exist in the dataset
    features = [col for col in possible_features if col in data.columns]

    if not features:
        # Use all numeric columns except label columns
        exclude_cols = ['Timestamp', 'Gender', 'Age', 'Hand', 'Status', 'Movement']
        features = [col for col in data.columns if col not in exclude_cols and data[col].dtype in [np.float64, np.int64]]

    print(f"\nSelected features ({len(features)}): {features}")

    # Check for missing values
    missing_values = data[features].isnull().sum().sum()
    if missing_values > 0:
        print(f"Warning: {missing_values} missing values found in features")
        for col in features:
            data[col] = data[col].fillna(data[col].mean())

    # Extract features and labels
    X = data[features].values
    y = data['Movement'].values

    # Encode labels
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)

    print(f"\nClasses: {le.classes_}")
    print(f"Class mapping: {dict(zip(le.classes_, range(len(le.classes_))))}")

    return X, y_encoded, le, features

print("✓ Preprocessing functions defined")

## 4. Sequence Creation for LSTM

In [None]:
def create_sequences(X, y, sequence_length=50, step_size=25):
    """
    Create sequences for LSTM
    sequence_length: number of timesteps in each sequence
    step_size: how many timesteps to move forward for next sequence
    """
    X_sequences = []
    y_sequences = []

    for i in range(0, len(X) - sequence_length, step_size):
        X_sequences.append(X[i:i + sequence_length])
        # Use the majority label in the sequence
        sequence_labels = y[i:i + sequence_length]
        unique, counts = np.unique(sequence_labels, return_counts=True)
        majority_label = unique[np.argmax(counts)]
        y_sequences.append(majority_label)

    return np.array(X_sequences), np.array(y_sequences)

print("✓ Sequence creation function defined")

## 5. LSTM Model Architectures

In [None]:
def build_lstm_model(input_shape, num_classes):
    """Build and compile simple LSTM model"""
    model = keras.Sequential([
        layers.LSTM(64, return_sequences=True, input_shape=input_shape),
        layers.Dropout(0.3),
        layers.LSTM(32),
        layers.Dropout(0.3),
        layers.Dense(32, activation='relu'),
        layers.Dropout(0.3),
        layers.Dense(num_classes, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

def build_advanced_lstm_model(input_shape, num_classes):
    """Build and compile advanced LSTM model with batch normalization"""
    model = keras.Sequential([
        layers.LSTM(64, return_sequences=True, input_shape=input_shape),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.LSTM(32, return_sequences=True),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.LSTM(16),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(32, activation='relu'),
        layers.BatchNormalization(),
        layers.Dropout(0.2),
        layers.Dense(num_classes, activation='softmax')
    ])

    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )

    return model

print("✓ LSTM model architectures defined")

## 6. Visualization Functions

In [None]:
def plot_training_history(history):
    """Plot training and validation metrics"""
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    # Accuracy
    axes[0].plot(history.history['accuracy'], label='Train Accuracy', linewidth=2)
    axes[0].plot(history.history['val_accuracy'], label='Val Accuracy', linewidth=2)
    axes[0].set_title('Model Accuracy', fontsize=14)
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Accuracy', fontsize=12)
    axes[0].legend(fontsize=12)
    axes[0].grid(True, alpha=0.3)

    # Loss
    axes[1].plot(history.history['loss'], label='Train Loss', linewidth=2)
    axes[1].plot(history.history['val_loss'], label='Val Loss', linewidth=2)
    axes[1].set_title('Model Loss', fontsize=14)
    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Loss', fontsize=12)
    axes[1].legend(fontsize=12)
    axes[1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, class_names):
    """Plot confusion matrix"""
    cm = confusion_matrix(y_true, y_pred)

    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names,
                yticklabels=class_names)
    plt.title('Confusion Matrix - Test Set', fontsize=14)
    plt.ylabel('True Label', fontsize=12)
    plt.xlabel('Predicted Label', fontsize=12)
    plt.tight_layout()
    plt.show()

    # Calculate per-class metrics
    print("\nPer-class metrics:")
    print("-" * 50)
    for i, class_name in enumerate(class_names):
        precision = cm[i, i] / cm[:, i].sum() if cm[:, i].sum() > 0 else 0
        recall = cm[i, i] / cm[i, :].sum() if cm[i, :].sum() > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        print(f"{class_name:20s} Precision: {precision:.3f} | Recall: {recall:.3f} | F1-Score: {f1:.3f}")

print("✓ Visualization functions defined")

## 7. Main Training Pipeline

In [None]:
def train_wrist_movement_model(df=None, model_type='simple'):
    """
    Main training pipeline for wrist movement classification
    """
    print("=" * 60)
    print("WRIST MOVEMENT CLASSIFICATION - LSTM MODEL")
    print("=" * 60)

    # Load data
    if df is None:
        print("\nUpload your wrist movement dataset...")
        df = upload_file_colab()
        if df is None:
            print("No data loaded. Exiting...")
            return None

    # 1. Preprocess data
    print("\n[1/7] Preprocessing data...")
    X, y_encoded, label_encoder, feature_names = preprocess_data(df)

    # 2. Normalize features
    print("\n[2/7] Normalizing features...")
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # 3. Create sequences
    print("\n[3/7] Creating sequences for LSTM...")
    SEQUENCE_LENGTH = min(50, len(X_scaled) // 10)
    STEP_SIZE = SEQUENCE_LENGTH // 2
    X_sequences, y_sequences = create_sequences(X_scaled, y_encoded, SEQUENCE_LENGTH, STEP_SIZE)
    print(f"  Sequence shape: {X_sequences.shape}")
    print(f"  Labels shape: {y_sequences.shape}")

    # 4. Split data
    print("\n[4/7] Splitting data...")
    X_train, X_temp, y_train, y_temp = train_test_split(
        X_sequences, y_sequences, test_size=0.3, random_state=42, stratify=y_sequences
    )
    X_val, X_test, y_val, y_test = train_test_split(
        X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
    )
    print(f"  Train: {X_train.shape}")
    print(f"  Validation: {X_val.shape}")
    print(f"  Test: {X_test.shape}")

    # 5. Build model
    print("\n[5/7] Building LSTM model...")
    input_shape = (SEQUENCE_LENGTH, len(feature_names))
    num_classes = len(label_encoder.classes_)

    if model_type == 'simple':
        model = build_lstm_model(input_shape, num_classes)
    else:
        model = build_advanced_lstm_model(input_shape, num_classes)

    model.summary()

    # 6. Train model
    print("\n[6/7] Training model...")
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=20,
        restore_best_weights=True,
        verbose=1
    )

    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=10,
        min_lr=0.00001,
        verbose=1
    )

    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=100,
        batch_size=min(32, len(X_train)),
        callbacks=[early_stopping, reduce_lr],
        verbose=1
    )

    # 7. Evaluate model
    print("\n[7/7] Evaluating model...")
    test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"\nTest Accuracy: {test_accuracy:.4f}")
    print(f"Test Loss: {test_loss:.4f}")

    # Predictions
    y_pred = np.argmax(model.predict(X_test, verbose=0), axis=1)

    # Classification report
    print("\nClassification Report:")
    print("=" * 60)
    print(classification_report(y_test, y_pred, target_names=label_encoder.classes_))

    # Visualizations
    plot_training_history(history)
    plot_confusion_matrix(y_test, y_pred, label_encoder.classes_)

    # Save model
    model.save('wrist_movement_model.keras')
    joblib.dump(scaler, 'wrist_scaler.pkl')
    joblib.dump(label_encoder, 'wrist_label_encoder.pkl')

    print("\n✓ Model saved as 'wrist_movement_model.keras'")
    print("✓ Scaler saved as 'wrist_scaler.pkl'")
    print("✓ Label encoder saved as 'wrist_label_encoder.pkl'")

    # Create prediction function
    def predict_movement(new_data, sequence_length=SEQUENCE_LENGTH):
        """Predict movement for new data"""
        if isinstance(new_data, np.ndarray):
            if new_data.ndim == 1:
                new_data = new_data.reshape(1, -1)
            new_data = pd.DataFrame(new_data, columns=feature_names)

        # Scale
        X_new = scaler.transform(new_data[feature_names].values)

        # Create sequence
        if len(X_new) < sequence_length:
            padding_needed = sequence_length - len(X_new)
            X_new = np.pad(X_new, ((0, padding_needed), (0, 0)), 'edge')

        sequence = X_new[:sequence_length].reshape(1, sequence_length, -1)
        prediction_proba = model.predict(sequence, verbose=0)
        prediction_idx = np.argmax(prediction_proba)
        prediction_label = label_encoder.inverse_transform([prediction_idx])[0]
        confidence = np.max(prediction_proba)

        return {
            'predicted_movement': prediction_label,
            'confidence': confidence,
            'probabilities': prediction_proba[0]
        }

    results = {
        'model': model,
        'scaler': scaler,
        'label_encoder': label_encoder,
        'feature_names': feature_names,
        'test_accuracy': test_accuracy,
        'test_loss': test_loss,
        'predict_function': predict_movement,
        'sequence_length': SEQUENCE_LENGTH,
        'class_names': label_encoder.classes_
    }

    print("\n" + "=" * 60)
    print("TRAINING COMPLETE!")
    print("=" * 60)

    return results

print("✓ Main training pipeline defined")

## 8. Utility Functions

In [None]:
def quick_test():
    """Create sample dataset for testing"""
    np.random.seed(42)
    n_samples = 1000
    n_features = 11

    sample_features = ['AccelX', 'AccelY', 'AccelZ',
                      'GyRIGHToX', 'GyRIGHToY', 'GyRIGHToZ',
                      'AngleX', 'AngleY', 'AngleZ',
                      'FlexionAngle', 'DeviationAngle']

    X = np.random.randn(n_samples, n_features)
    movements = ['STEADY', 'FLEXION', 'EXTENSION', 'RADIAL_DEVIATION', 'ULNAR_DEVIATION']
    y = np.random.choice(movements, n_samples)

    df = pd.DataFrame(X, columns=sample_features)
    df['Movement'] = y
    df['Timestamp'] = np.arange(n_samples)
    df['Gender'] = np.random.choice(['M', 'F'], n_samples)
    df['Age'] = np.random.randint(10, 20, n_samples)
    df['Hand'] = np.random.choice(['LEFT', 'RIGHT'], n_samples)
    df['Status'] = 'H'

    columns = ['Timestamp', 'Gender', 'Age', 'Hand', 'Status', 'Movement'] + sample_features
    df = df[columns]

    print(f"Sample dataset created: {df.shape}")
    print(f"Movement distribution:\n{df['Movement'].value_counts()}")

    return df

def load_trained_model(model_path='wrist_movement_model.keras',
                      scaler_path='wrist_scaler.pkl',
                      encoder_path='wrist_label_encoder.pkl'):
    """Load previously trained model"""
    try:
        model = keras.models.load_model(model_path)
        scaler = joblib.load(scaler_path)
        label_encoder = joblib.load(encoder_path)

        print("✓ Model loaded successfully!")
        return model, scaler, label_encoder
    except Exception as e:
        print(f"Error loading model: {e}")
        return None, None, None

print("✓ Utility functions defined")

## 9. Complete Usage Example

In [None]:
# Example 1: Train with sample data
print("Example 1: Training with sample data")
print("-" * 40)

# Uncomment to run:
# sample_df = quick_test()
# results = train_wrist_movement_model(df=sample_df, model_type='simple')

# Example 2: Train with uploaded data
print("\nExample 2: Training with uploaded data")
print("-" * 40)
print("To train with your own data:")
print("1. Uncomment the code below")
print("2. Upload your CSV file when prompted")
print("3. Choose model type ('simple' or 'advanced')")

# Uncomment to run:
# results = train_wrist_movement_model(model_type='simple')  # or 'advanced'

# Example 3: Load and use trained model
print("\nExample 3: Loading trained model")
print("-" * 40)
print("To load and use a trained model:")
print("1. Make sure model files exist in the current directory")
print("2. Uncomment and run the code below")

# Uncomment to run:
# model, scaler, label_encoder = load_trained_model()
# if model is not None:
#     print("Model loaded successfully!")
#     # Create sample data for prediction
#     sample_data = np.random.randn(50, 11)  # 50 timesteps, 11 features
#     print(f"Sample data shape: {sample_data.shape}")

print("\n✓ Usage examples ready")

## 10. Download Notebook and Models

In [None]:
# Create a zip file with everything
def create_complete_package():
    """Create a zip file with notebook and all related files"""
    import zipfile
    from datetime import datetime
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    zip_filename = f'wrist_movement_complete_{timestamp}.zip'
    
    # Files to include (if they exist)
    files_to_include = []
    
    # Check for model files
    model_files = [
        'wrist_movement_model.keras',
        'wrist_scaler.pkl',
        'wrist_label_encoder.pkl'
    ]
    
    for file in model_files:
        if os.path.exists(file):
            files_to_include.append(file)
    
    # Create the zip
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Add model files
        for file in files_to_include:
            zipf.write(file)
            print(f"✓ Added: {file}")
        
        # Add notebook file (will be created below)
        notebook_file = 'wrist_movement_notebook.ipynb'
        zipf.write(notebook_file)
        print(f"✓ Added: {notebook_file}")
    
    print(f"\n✓ Complete package created: {zip_filename}")
    return zip_filename

# Download function
def download_all_files():
    """Download all files including notebook and models"""
    try:
        from google.colab import files
        
        print("Creating complete package...")
        zip_file = create_complete_package()
        
        print("\nDownloading files...")
        files.download(zip_file)
        
        # Also offer individual downloads
        print("\nWould you like to download individual files?")
        print("1. Download just the notebook")
        print("2. Download model files")
        print("3. Skip individual downloads")
        
        choice = input("Enter choice (1-3): ").strip()
        
        if choice == '1':
            files.download('wrist_movement_notebook.ipynb')
            print("✓ Notebook downloaded")
        elif choice == '2':
            model_files = ['wrist_movement_model.keras', 'wrist_scaler.pkl', 'wrist_label_encoder.pkl']
            for file in model_files:
                if os.path.exists(file):
                    files.download(file)
                    print(f"✓ Downloaded: {file}")
        else:
            print("Skipping individual downloads")
            
        print("\n✓ All downloads complete!")
        
    except ImportError:
        print("Not in Google Colab. Files saved locally.")
        print(f"Notebook: wrist_movement_notebook.ipynb")
        print(f"Model: wrist_movement_model.keras (if exists)")
    except Exception as e:
        print(f"Error downloading files: {e}")

print("✓ Download functions defined")

## 11. Main Execution Menu

In [None]:
def main_menu():
    """Main menu for the wrist movement classification system"""
    print("=" * 70)
    print("WRIST MOVEMENT CLASSIFICATION WITH LSTM")
    print("=" * 70)

    while True:
        print("\nMAIN MENU:")
        print("1. Train model with uploaded data")
        print("2. Train model with sample data")
        print("3. Download complete notebook (.ipynb)")
        print("4. Download everything (notebook + models)")
        print("5. Exit")

        choice = input("\nEnter your choice (1-5): ").strip()

        if choice == '1':
            print("\n" + "=" * 70)
            print("TRAINING WITH UPLOADED DATA")
            print("=" * 70)

            model_choice = input("\nChoose model type (1 for simple, 2 for advanced): ").strip()
            model_type = 'advanced' if model_choice == '2' else 'simple'

            results = train_wrist_movement_model(model_type=model_type)
            if results:
                print("\n✓ Training completed successfully!")

        elif choice == '2':
            print("\n" + "=" * 70)
            print("TRAINING WITH SAMPLE DATA")
            print("=" * 70)

            sample_df = quick_test()
            model_choice = input("\nChoose model type (1 for simple, 2 for advanced): ").strip()
            model_type = 'advanced' if model_choice == '2' else 'simple'

            results = train_wrist_movement_model(df=sample_df, model_type=model_type)
            if results:
                print("\n✓ Sample training completed successfully!")

        elif choice == '3':
            print("\n" + "=" * 70)
            print("DOWNLOADING NOTEBOOK")
            print("=" * 70)
            
            # First save the notebook
            notebook_content = create_complete_colab_notebook()
            notebook_file = 'wrist_movement_notebook.ipynb'
            
            with open(notebook_file, 'w', encoding='utf-8') as f:
                json.dump(notebook_content, f, indent=2, ensure_ascii=False)
            
            print(f"✓ Notebook saved as '{notebook_file}'")
            
            # Download it
            try:
                from google.colab import files
                print("\nDownloading notebook file...")
                files.download(notebook_file)
                print("✓ Notebook downloaded successfully!")
            except ImportError:
                print(f"\nNot in Colab. Notebook saved at: {notebook_file}")
            except Exception as e:
                print(f"Error downloading notebook: {e}")

        elif choice == '4':
            print("\n" + "=" * 70)
            print("DOWNLOADING COMPLETE PACKAGE")
            print("=" * 70)
            
            # First save the notebook
            notebook_content = create_complete_colab_notebook()
            notebook_file = 'wrist_movement_notebook.ipynb'
            
            with open(notebook_file, 'w', encoding='utf-8') as f:
                json.dump(notebook_content, f, indent=2, ensure_ascii=False)
            
            print(f"✓ Notebook saved as '{notebook_file}'")
            
            # Download everything
            download_all_files()

        elif choice == '5':
            print("\nExiting... Thank you for using Wrist Movement Classification!")
            break

        else:
            print("\nInvalid choice. Please enter a number between 1 and 5.")

    print("\nProgram terminated.")

# Run the main menu
if __name__ == "__main__":
    main_menu()

## How to Use This Notebook

### Option 1: Run Everything at Once
Run all cells above, then execute the cell below to start the interactive menu.

In [None]:
# Start the interactive menu
print("Starting Wrist Movement Classification System...")
print("-" * 50)

# Save the notebook first
notebook_content = create_complete_colab_notebook()
with open('wrist_movement_notebook.ipynb', 'w', encoding='utf-8') as f:
    json.dump(notebook_content, f, indent=2, ensure_ascii=False)
print("✓ Notebook saved as 'wrist_movement_notebook.ipynb'")

# Start the main menu
main_menu()