In [70]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
import torch.optim as optim
import os
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler

In [71]:
# Load data
df = pd.read_csv('./Datasets/master_csv_bicep_0.csv')

# Choose features
imu_features = [
    'accel_y', 'accel_x', 'accel_z',
    'gyro_z', 'gyro_y', 'gyro_x',
]
target_column = 'part'

# Use segment UIDs to split at the segment level
segment_uids = df['Segment UID'].unique()

# 80-20 split
train_uids, val_uids = train_test_split(segment_uids, test_size=0.2, random_state=42)

# Prepare training data
train_imu_segments_bicep = []
train_emg_segments_bicep = []

for segment_uid in train_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    train_imu_segments_bicep.append(imu_data)
    train_emg_segments_bicep.append(emg_data)

# Prepare validation data
val_imu_segments_bicep = []
val_emg_segments_bicep = []

for segment_uid in val_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    val_imu_segments_bicep.append(imu_data)
    val_emg_segments_bicep.append(emg_data)

print(f"Number of training segments: {len(train_imu_segments_bicep)}")
print(f"Number of validation segments: {len(val_imu_segments_bicep)}")


Number of training segments: 140
Number of validation segments: 35


In [72]:
# Load data
df = pd.read_csv('./Datasets/master_csv_tricep_0.csv')

# Choose features
imu_features = [
    'accel_y', 'accel_x', 'accel_z',
    'gyro_z', 'gyro_y', 'gyro_x',
]
target_column = 'part'

# Use segment UIDs to split at the segment level
segment_uids = df['Segment UID'].unique()

# 80-20 split
train_uids, val_uids = train_test_split(segment_uids, test_size=0.2, random_state=42)

# Prepare training data
train_imu_segments_tricep = []
train_emg_segments_tricep = []

for segment_uid in train_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    train_imu_segments_tricep.append(imu_data)
    train_emg_segments_tricep.append(emg_data)

# Prepare validation data
val_imu_segments_tricep = []
val_emg_segments_tricep = []

for segment_uid in val_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    val_imu_segments_tricep.append(imu_data)
    val_emg_segments_tricep.append(emg_data)

print(f"Number of training segments: {len(train_imu_segments_tricep)}")
print(f"Number of validation segments: {len(val_imu_segments_tricep)}")


Number of training segments: 140
Number of validation segments: 35


In [73]:
# Load data
df = pd.read_csv('./Datasets/master_csv_Supination_1.csv')

# Choose features
imu_features = [
    'accel_y', 'accel_x', 'accel_z',
    'gyro_z', 'gyro_y', 'gyro_x',
]
target_column = 'part'

# Use segment UIDs to split at the segment level
segment_uids = df['Segment UID'].unique()

# 80-20 split
train_uids, val_uids = train_test_split(segment_uids, test_size=0.2, random_state=42)

# Prepare training data
train_imu_segments_supination = []
train_emg_segments_supination = []

for segment_uid in train_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    train_imu_segments_supination.append(imu_data)
    train_emg_segments_supination.append(emg_data)

# Prepare validation data
val_imu_segments_supination = []
val_emg_segments_supination = []

for segment_uid in val_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    val_imu_segments_supination.append(imu_data)
    val_emg_segments_supination.append(emg_data)

print(f"Number of training segments: {len(train_imu_segments_supination)}")
print(f"Number of validation segments: {len(val_imu_segments_supination)}")


Number of training segments: 140
Number of validation segments: 35


In [74]:
# Load data
df = pd.read_csv('./Datasets/master_csv_Pronation_1.csv')

# Choose features
imu_features = [
    'accel_y', 'accel_x', 'accel_z',
    'gyro_z', 'gyro_y', 'gyro_x',
]
target_column = 'part'

# Use segment UIDs to split at the segment level
segment_uids = df['Segment UID'].unique()

# 80-20 split
train_uids, val_uids = train_test_split(segment_uids, test_size=0.2, random_state=42)

# Prepare training data
train_imu_segments_pronation = []
train_emg_segments_pronation = []

for segment_uid in train_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    train_imu_segments_pronation.append(imu_data)
    train_emg_segments_pronation.append(emg_data)

# Prepare validation data
val_imu_segments_pronation = []
val_emg_segments_pronation = []

for segment_uid in val_uids:
    segment_df = df[df['Segment UID'] == segment_uid]
    imu_data = segment_df[imu_features].values
    emg_data = segment_df[target_column].values
    val_imu_segments_pronation.append(imu_data)
    val_emg_segments_pronation.append(emg_data)

print(f"Number of training segments: {len(train_imu_segments_pronation)}")
print(f"Number of validation segments: {len(val_imu_segments_pronation)}")


Number of training segments: 140
Number of validation segments: 35


In [75]:
class IMU_EMG_Dataset(Dataset):
    def __init__(self, imu_segments, emg_segments):
        self.imu_segments = imu_segments
        self.emg_segments = emg_segments

    def __len__(self):
        return len(self.imu_segments)

    def __getitem__(self, idx):
        imu = torch.tensor(self.imu_segments[idx], dtype=torch.float32)   # shape: (seq_len, imu_features)
        emg = torch.tensor(self.emg_segments[idx], dtype=torch.float32)   # shape: (seq_len,)
        emg = emg.unsqueeze(-1)  # Make it (seq_len, 1) to match output shape
        return imu, emg


In [76]:
def collate_fn(batch):
    imu_batch, emg_batch = zip(*batch)
    
    # Pad sequences to the max length in this batch
    imu_batch_padded = pad_sequence(imu_batch, batch_first=True)     # (batch_size, max_seq_len, imu_features)
    emg_batch_padded = pad_sequence(emg_batch, batch_first=True)     # (batch_size, max_seq_len, 1)

    # Create padding mask: shape (batch_size, max_seq_len)
    # False where there's real data, True where there's padding
    lengths = torch.tensor([x.shape[0] for x in imu_batch])
    max_len = imu_batch_padded.shape[1]
    pad_mask = torch.arange(max_len).expand(len(lengths), max_len) >= lengths.unsqueeze(1)
    
    return imu_batch_padded, emg_batch_padded, pad_mask



In [77]:
train_imu_segments = train_imu_segments_bicep + train_imu_segments_tricep + train_imu_segments_supination + train_imu_segments_pronation
train_emg_segments = train_emg_segments_bicep + train_emg_segments_tricep + train_emg_segments_supination + train_emg_segments_pronation

val_imu_segments = val_imu_segments_bicep + val_imu_segments_tricep + val_imu_segments_supination + val_imu_segments_pronation
val_emg_segments = val_emg_segments_bicep + val_emg_segments_tricep + val_emg_segments_supination + val_emg_segments_pronation

In [78]:
train_dataset = IMU_EMG_Dataset(train_imu_segments, train_emg_segments)
val_dataset = IMU_EMG_Dataset(val_imu_segments, val_emg_segments)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)

In [79]:
# Updated classification model and dataset for bicep/tricep classification

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import numpy as np

class IMU_Classification_Dataset(Dataset):
    def __init__(self, imu_segments, labels):
        """
        Dataset for IMU sequence classification
        Args:
            imu_segments: List of IMU data segments (each segment is numpy array of shape [seq_len, features])
            labels: List of labels (0 for bicep, 1 for tricep, 2 for supination, 3 for pronation)
        """
        self.imu_segments = imu_segments
        self.labels = labels

    def __len__(self):
        return len(self.imu_segments)

    def __getitem__(self, idx):
        imu = torch.tensor(self.imu_segments[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return imu, label

def classification_collate_fn(batch):
    """Collate function for classification dataset"""
    imu_batch, labels_batch = zip(*batch)
    
    # Pad sequences to the max length in this batch
    imu_batch_padded = pad_sequence(imu_batch, batch_first=True)
    labels_batch = torch.stack(labels_batch)
    
    # Create padding mask
    lengths = torch.tensor([x.shape[0] for x in imu_batch])
    max_len = imu_batch_padded.shape[1]
    pad_mask = torch.arange(max_len).expand(len(lengths), max_len) >= lengths.unsqueeze(1)
    
    return imu_batch_padded, labels_batch, pad_mask

class IMU_Classifier(nn.Module):
    def __init__(self, input_size=6, hidden_size=128, num_layers=2, num_classes=4, dropout=0.3):
        """
        LSTM-based classifier for IMU sequences
        Args:
            input_size: Number of IMU features (6: accel_x,y,z + gyro_x,y,z)
            hidden_size: Hidden size of LSTM
            num_layers: Number of LSTM layers
            num_classes: Number of output classes (4 for bicep/tricep/supination/pronation)
            dropout: Dropout rate
        """
        super(IMU_Classifier, self).__init__()
        
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # LSTM layers
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=True
        )
        
        # Attention mechanism
        self.attention = nn.MultiheadAttention(
            embed_dim=hidden_size * 2,  # *2 for bidirectional
            num_heads=8,
            dropout=dropout,
            batch_first=True
        )
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(hidden_size * 2, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, num_classes)
        )
        
    def forward(self, x, pad_mask=None):
        """
        Forward pass
        Args:
            x: Input tensor of shape (batch_size, seq_len, input_size)
            pad_mask: Padding mask of shape (batch_size, seq_len)
        Returns:
            logits: Classification logits of shape (batch_size, num_classes)
        """
        batch_size, seq_len = x.size(0), x.size(1)
        
        # LSTM forward pass
        lstm_out, (hidden, cell) = self.lstm(x)  # (batch_size, seq_len, hidden_size*2)
        
        # Apply attention mechanism
        if pad_mask is not None:
            # Convert pad_mask to attention mask (inverted)
            attn_mask = pad_mask  # True for padding positions
        else:
            attn_mask = None
            
        attended_out, attention_weights = self.attention(
            lstm_out, lstm_out, lstm_out,
            key_padding_mask=attn_mask
        )
        
        # Global average pooling over sequence dimension, ignoring padded positions
        if pad_mask is not None:
            # Mask out padded positions
            mask = (~pad_mask).float().unsqueeze(-1)  # (batch_size, seq_len, 1)
            attended_out = attended_out * mask
            # Compute mean only over non-padded positions
            seq_lengths = mask.sum(dim=1, keepdim=True)  # (batch_size, 1, 1)
            pooled = attended_out.sum(dim=1) / seq_lengths.squeeze(-1)  # (batch_size, hidden_size*2)
        else:
            pooled = attended_out.mean(dim=1)  # (batch_size, hidden_size*2)
        
        # Classification
        logits = self.classifier(pooled)  # (batch_size, num_classes)
        
        return logits

# Training function
def train_classifier(model, train_loader, val_loader, epochs=50, lr=0.001, device='cpu'):
    """Training loop for the classification model"""
    
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
    
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    
    best_val_acc = 0
    best_model_state = None
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        for batch_idx, (imu_data, labels, pad_mask) in enumerate(train_loader):
            imu_data = imu_data.to(device)
            labels = labels.to(device)
            pad_mask = pad_mask.to(device)
            
            optimizer.zero_grad()
            
            # Forward pass
            logits = model(imu_data, pad_mask)
            loss = criterion(logits, labels)
            
            # Backward pass
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            # Statistics
            train_loss += loss.item()
            _, predicted = torch.max(logits.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()
        
        # Validation phase
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for imu_data, labels, pad_mask in val_loader:
                imu_data = imu_data.to(device)
                labels = labels.to(device)
                pad_mask = pad_mask.to(device)
                
                logits = model(imu_data, pad_mask)
                loss = criterion(logits, labels)
                
                val_loss += loss.item()
                _, predicted = torch.max(logits.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()
        
        # Calculate metrics
        train_acc = 100 * train_correct / train_total
        val_acc = 100 * val_correct / val_total
        train_loss_avg = train_loss / len(train_loader)
        val_loss_avg = val_loss / len(val_loader)
        
        # Store metrics
        train_losses.append(train_loss_avg)
        val_losses.append(val_loss_avg)
        train_accuracies.append(train_acc)
        val_accuracies.append(val_acc)
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model_state = model.state_dict().copy()
        
        # Learning rate scheduling
        scheduler.step(val_loss_avg)
        
        # Print progress
        if (epoch + 1) % 1 == 0:
            print(f'Epoch [{epoch+1}/{epochs}]')
            print(f'Train Loss: {train_loss_avg:.4f}, Train Acc: {train_acc:.2f}%')
            print(f'Val Loss: {val_loss_avg:.4f}, Val Acc: {val_acc:.2f}%')
            print(f'Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')
            print('-' * 50)
    
    # Load best model
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
        print(f'Best validation accuracy: {best_val_acc:.2f}%')
    
    return model, {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_accuracies': train_accuracies,
        'val_accuracies': val_accuracies
    }

# Updated data preparation code
def prepare_classification_data(train_imu_segments_bicep, train_imu_segments_tricep, train_emg_segments_supination, train_emg_segments_pronation,
                              val_imu_segments_bicep, val_imu_segments_tricep, val_imu_segments_supination, val_imu_segments_pronation):
    """Prepare data for classification"""
    
    # Combine segments and create labels
    train_imu_segments = train_imu_segments_bicep + train_imu_segments_tricep + train_emg_segments_supination + train_emg_segments_pronation
    train_labels = [0] * len(train_imu_segments_bicep) + [1] * len(train_imu_segments_tricep) + [2] * len(train_emg_segments_supination) + [3] * len(train_emg_segments_pronation)
    
    val_imu_segments = val_imu_segments_bicep + val_imu_segments_tricep + val_imu_segments_supination + val_imu_segments_pronation
    val_labels = [0] * len(val_imu_segments_bicep) + [1] * len(val_imu_segments_tricep) + [2] * len(val_imu_segments_supination) + [3] * len(val_imu_segments_pronation)
    
    # Create datasets
    train_dataset = IMU_Classification_Dataset(train_imu_segments, train_labels)
    val_dataset = IMU_Classification_Dataset(val_imu_segments, val_labels)
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, 
                            collate_fn=classification_collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, 
                          collate_fn=classification_collate_fn)
    
    return train_loader, val_loader

In [80]:
# After your existing data loading code, add:
train_loader, val_loader = prepare_classification_data(
    train_imu_segments_bicep, train_imu_segments_tricep, train_imu_segments_supination, train_imu_segments_pronation,
    val_imu_segments_bicep, val_imu_segments_tricep, val_imu_segments_supination, val_imu_segments_pronation
)

# Create and train model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = IMU_Classifier(input_size=6, hidden_size=128, num_layers=2)
trained_model, history = train_classifier(model, train_loader, val_loader, 
                                        epochs=2, device=device)

Epoch [1/2]
Train Loss: 0.2776, Train Acc: 95.54%
Val Loss: 0.0000, Val Acc: 100.00%
Learning Rate: 0.001000
--------------------------------------------------
Epoch [2/2]
Train Loss: 0.0000, Train Acc: 100.00%
Val Loss: 0.0000, Val Acc: 100.00%
Learning Rate: 0.001000
--------------------------------------------------
Best validation accuracy: 100.00%


In [81]:
trained_model_path = './model_checkpoint/classification_model.pth'
os.makedirs(os.path.dirname(trained_model_path), exist_ok=True)
torch.save(trained_model.state_dict(), trained_model_path)

In [82]:
def predict_segment(model, imu_segment, device='cpu'):
    """
    Predict the class for a single IMU segment
    
    Args:
        model: Trained IMU_Classifier model
        imu_segment: Single IMU segment as numpy array of shape (seq_len, 6) or tensor
        device: Device to run inference on ('cpu' or 'cuda')
    
    Returns:
        dict: Contains prediction, confidence, and probabilities
    """
    model.eval()
    model = model.to(device)
    
    with torch.no_grad():
        # Convert to tensor if numpy array
        if isinstance(imu_segment, np.ndarray):
            imu_tensor = torch.tensor(imu_segment, dtype=torch.float32)
        else:
            imu_tensor = imu_segment.clone().detach().float()
        
        # Add batch dimension: (1, seq_len, features)
        imu_tensor = imu_tensor.unsqueeze(0).to(device)
        
        # No padding mask needed for single sequence
        logits = model(imu_tensor, pad_mask=None)
        
        # Get probabilities
        probabilities = torch.softmax(logits, dim=1)
        
        # Get prediction
        predicted_class = torch.argmax(logits, dim=1).item()
        confidence = probabilities[0, predicted_class].item()
        
        # Class names
        class_names = ['bicep', 'tricep', 'supination', 'pronation']
        predicted_label = class_names[predicted_class]
        
        return {
            'prediction': predicted_class,
            'label': predicted_label,
            'confidence': confidence,
            'probabilities': {
                'bicep': probabilities[0, 0].item(),
                'tricep': probabilities[0, 1].item(),
                'supination' : probabilities[0, 2].item(),
                'pronation' : probabilities[0, 3].item()
            }
        }

In [83]:
model = IMU_Classifier(input_size=6, hidden_size=128, num_layers=2)
model.load_state_dict(torch.load('./model_checkpoint/classification_model.pth'))

<All keys matched successfully>

In [84]:
result = predict_segment(model, train_imu_segments_supination[9], device='cpu')
print(f"Prediction: {result['label']}")
print(f"Confidence: {result['confidence']:.3f}")
print(f"Probabilities: {result['probabilities']}")

Prediction: supination
Confidence: 1.000
Probabilities: {'bicep': 4.9547470076447e-14, 'tricep': 9.189292527097556e-18, 'supination': 1.0, 'pronation': 1.1669586466849652e-13}
