# LSTM Model for EMG to Finger Position Prediction

This notebook trains an LSTM model to predict finger positions from EMG sensor data.

## Pipeline Overview:

1. Load and clean data
1. Create temporal sequences
1. Split into train/test sets (80/20)
1. Train LSTM model with proper batching
1. Evaluate on unseen test data
1. Visualize predictions

In [None]:
print("hei")

## 1. Import Libraries

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from sklearn.preprocessing import StandardScaler

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

print(f"PyTorch version: {torch.__version__}")
print(f"Device: {'GPU' if torch.cuda.is_available() else 'CPU'}")

: 

## 2. Load and Clean Data

Load CSV data and handle corrupted rows where sensor values are missing or misaligned.

In [None]:
# Load data
path = os.path.join("../../data", "integrated_data_20251030_195505.csv")
df = pd.read_csv(path)

print("="*60)
print("RAW DATA INFO")
print("="*60)
print(f"Original shape: {df.shape}")
print(f"Columns: {list(df.columns)}")
print(f"\nFirst 3 rows:\n{df.head(3)}")

# Define required numeric columns (excluding timestamp and hand_label)
numeric_columns = ['iteration', 'env0', 'raw0', 'env1', 'raw1', 'env2', 'raw2', 'env3', 'raw3', 
                   'thumb_tip', 'thumb_base', 'index', 'middle', 'ring', 'pinky']

# Convert all numeric columns, coercing errors to NaN (handles corrupted rows)
for col in numeric_columns:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

# Remove rows with any NaN values in numeric columns
df_clean = df.dropna(subset=numeric_columns)

print(f"\nRows removed (corrupted/missing data): {len(df) - len(df_clean)}")
print(f"Clean data shape: {df_clean.shape}")

# Extract only numeric data (drop timestamp and hand_label)
data = df_clean[numeric_columns].values.astype(np.float32)

print(f"\nFinal numeric data shape: {data.shape}")
print(f"Data statistics:\n{pd.DataFrame(data, columns=numeric_columns).describe()}")

## 3. Prepare Data for LSTM

Separate sensor inputs from finger outputs and create temporal sequences.

In [None]:
# Define sensor (input) and finger (output) columns
sensor_columns = ['env0', 'raw0', 'env1', 'raw1', 'env2', 'raw2', 'env3', 'raw3']
finger_columns = ['thumb_tip', 'thumb_base', 'index', 'middle', 'ring', 'pinky']

# Get column indices in the data array
sensor_indices = [numeric_columns.index(col) for col in sensor_columns]
finger_indices = [numeric_columns.index(col) for col in finger_columns]

print("="*60)
print("DATA CONFIGURATION")
print("="*60)
print(f"Sensor columns (inputs): {sensor_columns}")
print(f"Sensor indices: {sensor_indices}")
print(f"Number of input features: {len(sensor_indices)}")
print(f"\nFinger columns (outputs): {finger_columns}")
print(f"Finger indices: {finger_indices}")
print(f"Number of output features: {len(finger_indices)}")

## 4. Create Temporal Sequences

LSTMs need sequences of data. We'll create sliding windows of sensor readings to predict the next finger position.

In [None]:
def create_sequences(data, seq_length):
    """
    Create sequences for LSTM training.
    
    Args:
        data: numpy array of shape (num_samples, num_features)
        seq_length: length of input sequence
    
    Returns:
        X: sequences of shape (num_sequences, seq_length, num_features)
        y: targets of shape (num_sequences, num_features)
    """
    xs, ys = [], []
    for i in range(len(data) - seq_length):
        x = data[i:i + seq_length, :]  # Sequence of seq_length time steps
        y = data[i + seq_length, :]    # Next time step (target)
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

# Create sequences
seq_length = 15
X, y = create_sequences(data, seq_length)

print(f"\nSequence length: {seq_length} time steps")
print(f"Total sequences created: {len(X)}")
print(f"X shape: {X.shape} (num_sequences, seq_length, num_features)")
print(f"y shape: {y.shape} (num_sequences, num_features)")

## 5. Train/Test Split and Feature Selection

Split data temporally (80/20) and separate sensor inputs from finger outputs.

In [None]:
# Split data temporally (80% train, 20% test)
split_ratio = 0.8
split_index = int(len(X) * split_ratio)

X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]

print("="*60)
print("TRAIN/TEST SPLIT")
print("="*60)
print(f"Total sequences: {len(X)}")
print(f"Training sequences: {len(X_train)} ({split_ratio*100:.0f}%)")
print(f"Testing sequences: {len(X_test)} ({(1-split_ratio)*100:.0f}%)")

# Extract sensor and finger features
trainX = torch.tensor(X_train[:, :, sensor_indices], dtype=torch.float32)
trainY = torch.tensor(y_train[:, finger_indices], dtype=torch.float32)
testX = torch.tensor(X_test[:, :, sensor_indices], dtype=torch.float32)
testY = torch.tensor(y_test[:, finger_indices], dtype=torch.float32)

print(f"\nTraining data:")
print(f"  trainX: {trainX.shape} (sequences, seq_length, input_features)")
print(f"  trainY: {trainY.shape} (sequences, output_features)")
print(f"\nTesting data:")
print(f"  testX: {testX.shape}")
print(f"  testY: {testY.shape}")

print(f"\nInput features: {len(sensor_indices)}")
print(f"Output features: {len(finger_indices)}")

## 6. Normalize Data

Normalize inputs and outputs for better training stability.

In [None]:
# Normalize sensor data (inputs)
scaler_X = StandardScaler()
trainX_reshaped = trainX.reshape(-1, trainX.shape[-1]).numpy()
trainX_scaled = scaler_X.fit_transform(trainX_reshaped).reshape(trainX.shape)
trainX = torch.tensor(trainX_scaled, dtype=torch.float32)

testX_reshaped = testX.reshape(-1, testX.shape[-1]).numpy()
testX_scaled = scaler_X.transform(testX_reshaped).reshape(testX.shape)
testX = torch.tensor(testX_scaled, dtype=torch.float32)

# Normalize finger data (outputs)
scaler_Y = StandardScaler()
trainY_scaled = scaler_Y.fit_transform(trainY.numpy())
trainY = torch.tensor(trainY_scaled, dtype=torch.float32)

testY_scaled = scaler_Y.transform(testY.numpy())
testY = torch.tensor(testY_scaled, dtype=torch.float32)

print("✓ Data normalized using StandardScaler")
print(f"  Sensor mean: {scaler_X.mean_[:3].round(2)} (first 3 features)")
print(f"  Sensor std: {scaler_X.scale_[:3].round(2)} (first 3 features)")
print(f"  Finger mean: {scaler_Y.mean_[:3].round(2)} (first 3 features)")
print(f"  Finger std: {scaler_Y.scale_[:3].round(2)} (first 3 features)")

## 7. Define LSTM Model

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, dropout=0.2):
        """
        LSTM model for sequence prediction.
        
        Args:
            input_dim: Number of input features
            hidden_dim: Number of hidden units in LSTM
            layer_dim: Number of LSTM layers
            output_dim: Number of output features
            dropout: Dropout probability for regularization
        """
        super(LSTMModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        
        # LSTM layer with dropout between layers
        self.lstm = nn.LSTM(
            input_dim, 
            hidden_dim, 
            layer_dim, 
            batch_first=True,
            dropout=dropout if layer_dim > 1 else 0
        )
        
        # Dropout layer
        self.dropout = nn.Dropout(dropout)
        
        # Fully connected output layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Initialize hidden and cell states (set to zeros)
        h0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).to(x.device)
        
        # LSTM forward pass
        out, (hn, cn) = self.lstm(x, (h0, c0))
        
        # Take the last time step output
        out = out[:, -1, :]
        
        # Apply dropout
        out = self.dropout(out)
        
        # Fully connected layer
        out = self.fc(out)
        
        return out

# Model hyperparameters
input_dim = len(sensor_indices)  # Number of sensor features
hidden_dim = 128                  # Number of hidden units
layer_dim = 2                     # Number of LSTM layers
output_dim = len(finger_indices)  # Number of finger features
dropout = 0.2                     # Dropout probability

# Initialize model
model = LSTMModel(input_dim, hidden_dim, layer_dim, output_dim, dropout)

print("="*60)
print("MODEL ARCHITECTURE")
print("="*60)
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"Trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

## 8. Setup Training

In [None]:
# Loss function and optimizer
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode='min', 
    factor=0.5, 
    patience=10, 
    verbose=True
)

# Create DataLoader for batching
batch_size = 32
train_dataset = TensorDataset(trainX, trainY)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

print("="*60)
print("TRAINING CONFIGURATION")
print("="*60)
print(f"Loss function: MSELoss")
print(f"Optimizer: Adam (lr=0.001)")
print(f"Scheduler: ReduceLROnPlateau (factor=0.5, patience=10)")
print(f"Batch size: {batch_size}")
print(f"Batches per epoch: {len(train_loader)}")

## 9. Train Model

Train with proper batching, validation tracking, and early stopping.

In [None]:
num_epochs = 100
train_losses = []
val_losses = []
best_val_loss = float('inf')
patience = 20
patience_counter = 0

print("="*60)
print("TRAINING STARTED")
print("="*60)

for epoch in range(num_epochs):
    # Training phase
    model.train()
    epoch_loss = 0
    
    for batch_X, batch_Y in train_loader:
        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(batch_X)
        
        # Compute loss
        loss = criterion(outputs, batch_Y)
        
        # Backward pass
        loss.backward()
        
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        # Update weights
        optimizer.step()
        
        epoch_loss += loss.item()
    
    # Average training loss
    avg_train_loss = epoch_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    
    # Validation phase
    model.eval()
    with torch.no_grad():
        val_outputs = model(testX)
        val_loss = criterion(val_outputs, testY)
        val_losses.append(val_loss.item())
    
    # Learning rate scheduling
    scheduler.step(val_loss)
    
    # Early stopping check
    if val_loss.item() < best_val_loss:
        best_val_loss = val_loss.item()
        patience_counter = 0
        # Save best model
        torch.save(model.state_dict(), "../../models/best_lstm_model.pth")
    else:
        patience_counter += 1
    
    # Print progress
    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {avg_train_loss:.4f}, '
              f'Val Loss: {val_loss.item():.4f}, '
              f'Best Val: {best_val_loss:.4f}')
    
    # Early stopping
    if patience_counter >= patience:
        print(f"\nEarly stopping triggered at epoch {epoch+1}")
        print(f"Best validation loss: {best_val_loss:.4f}")
        break

print("\n" + "="*60)
print("TRAINING COMPLETED")
print("="*60)
print(f"Final train loss: {train_losses[-1]:.4f}")
print(f"Final validation loss: {val_losses[-1]:.4f}")
print(f"Best validation loss: {best_val_loss:.4f}")

# Load best model
model.load_state_dict(torch.load("../../models/best_lstm_model.pth"))
print("\n✓ Best model loaded")

## 10. Plot Training History

In [None]:
plt.figure(figsize=(12, 5))

plt.plot(train_losses, label='Train Loss', alpha=0.8, linewidth=2)
plt.plot(val_losses, label='Validation Loss', alpha=0.8, linewidth=2)
plt.axhline(y=best_val_loss, color='r', linestyle='--', label=f'Best Val Loss ({best_val_loss:.4f})', alpha=0.5)

plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss (MSE)', fontsize=12)
plt.title('Training vs Validation Loss', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

# Check for overfitting
final_gap = val_losses[-1] - train_losses[-1]
if final_gap > 0.1:
    print(f"⚠ Warning: Possible overfitting detected (gap: {final_gap:.4f})")
else:
    print(f"✓ Good generalization (train-val gap: {final_gap:.4f})")

## 11. Evaluate on Test Set

In [None]:
# Generate predictions on test set
model.eval()
with torch.no_grad():
    predictions_scaled = model(testX).numpy()
    actual_scaled = testY.numpy()

# Inverse transform to original scale
predictions = scaler_Y.inverse_transform(predictions_scaled)
actual = scaler_Y.inverse_transform(actual_scaled)

# Calculate metrics
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

print("="*60)
print("TEST SET EVALUATION (Per Finger)")
print("="*60)
print(f"{'Finger':<15} {'MAE':<10} {'RMSE':<10} {'R²':<10}")
print("-"*60)

for i, finger in enumerate(finger_columns):
    mae = mean_absolute_error(actual[:, i], predictions[:, i])
    rmse = np.sqrt(mean_squared_error(actual[:, i], predictions[:, i]))
    r2 = r2_score(actual[:, i], predictions[:, i])
    print(f"{finger:<15} {mae:<10.4f} {rmse:<10.4f} {r2:<10.4f}")

# Overall metrics
overall_mae = mean_absolute_error(actual, predictions)
overall_rmse = np.sqrt(mean_squared_error(actual, predictions))
overall_r2 = r2_score(actual.flatten(), predictions.flatten())

print("-"*60)
print(f"{'OVERALL':<15} {overall_mae:<10.4f} {overall_rmse:<10.4f} {overall_r2:<10.4f}")
print("="*60)

## 12. Visualize Predictions

Plot predicted vs actual finger positions on test set.

In [None]:
# Plot predictions for each finger
fig, axes = plt.subplots(3, 2, figsize=(16, 12))
axes = axes.flatten()

for i, finger in enumerate(finger_columns):
    ax = axes[i]
    
    # Plot actual and predicted
    ax.plot(actual[:, i], label='Actual', alpha=0.7, linewidth=2)
    ax.plot(predictions[:, i], label='Predicted', linestyle='--', alpha=0.7, linewidth=2)
    
    # Calculate metrics for this finger
    mae = mean_absolute_error(actual[:, i], predictions[:, i])
    r2 = r2_score(actual[:, i], predictions[:, i])
    
    ax.set_title(f'{finger} (MAE: {mae:.4f}, R²: {r2:.3f})', fontsize=12, fontweight='bold')
    ax.set_xlabel('Time Step', fontsize=10)
    ax.set_ylabel('Position', fontsize=10)
    ax.legend(fontsize=9)
    ax.grid(True, alpha=0.3)

plt.suptitle('Finger Position Predictions on Test Set', fontsize=16, fontweight='bold', y=1.00)
plt.tight_layout()
plt.show()

## 13. Scatter Plots (Predicted vs Actual)

In [None]:
# Scatter plots to visualize prediction accuracy
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
axes = axes.flatten()

for i, finger in enumerate(finger_columns):
    ax = axes[i]
    
    # Scatter plot
    ax.scatter(actual[:, i], predictions[:, i], alpha=0.5, s=20)
    
    # Perfect prediction line
    min_val = min(actual[:, i].min(), predictions[:, i].min())
    max_val = max(actual[:, i].max(), predictions[:, i].max())
    ax.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='Perfect Prediction')
    
    # Calculate R²
    r2 = r2_score(actual[:, i], predictions[:, i])
    
    ax.set_xlabel('Actual', fontsize=10)
    ax.set_ylabel('Predicted', fontsize=10)
    ax.set_title(f'{finger} (R² = {r2:.3f})', fontsize=11, fontweight='bold')
    ax.legend(fontsize=8)
    ax.grid(True, alpha=0.3)

plt.suptitle('Predicted vs Actual (Scatter Plots)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.show()

## 14. Save Final Model

In [None]:
# Save model and scalers
model_save_path = "../../models/lstm_model_final.pth"
torch.save({
    'model_state_dict': model.state_dict(),
    'scaler_X_mean': scaler_X.mean_,
    'scaler_X_scale': scaler_X.scale_,
    'scaler_Y_mean': scaler_Y.mean_,
    'scaler_Y_scale': scaler_Y.scale_,
    'input_dim': input_dim,
    'hidden_dim': hidden_dim,
    'layer_dim': layer_dim,
    'output_dim': output_dim,
    'seq_length': seq_length,
    'sensor_columns': sensor_columns,
    'finger_columns': finger_columns
}, model_save_path)

print("="*60)
print("MODEL SAVED")
print("="*60)
print(f"Model saved to: {model_save_path}")
print(f"Best validation loss: {best_val_loss:.4f}")
print(f"Overall test R²: {overall_r2:.4f}")
print("\n✓ Training complete!")

## Summary

This notebook implemented a complete LSTM pipeline for EMG to finger position prediction:

1. ✓ Data cleaning (removed corrupted rows)
1. ✓ Proper train/test split (80/20)
1. ✓ Data normalization (StandardScaler)
1. ✓ Sequence creation for temporal modeling
1. ✓ LSTM with dropout for regularization
1. ✓ Batch training with proper hidden state handling
1. ✓ Learning rate scheduling
1. ✓ Early stopping
1. ✓ Comprehensive evaluation metrics
1. ✓ Visualization of predictions

The model should now produce meaningful predictions instead of flat lines!