In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, Input
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from scipy import signal

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
#acc_G1 = np.load("/content/Copy of acc_grade1_aipms.npy")
#acc_G2 = np.load("/content/Copy of acc_grade2_aipms.npy")
#gyro_G1 = np.load("/content/Copy of gyr_grade1_aipms.npy")
#gyro_G2 = np.load("/gyr_grade2_aipms.npy")
emg_G1 = np.load("/content/drive/MyDrive/Physiotherapy_USRF25/Copy of emg_grade1_aipms.npy")
emg_G2 = np.load("/content/drive/MyDrive/Physiotherapy_USRF25/Copy of emg_grade2_aipms.npy")

# Reshape and process EMG signals
chN = 5
seglenE, seglenA = 6926, 815  # Downsample target length

#emg_G1 = signal.resample(emg_G1.reshape((emg_G1.shape[0], chN, seglenE)), seglenA, axis=2)
#emg_G2 = signal.resample(emg_G2.reshape((emg_G2.shape[0], chN, seglenE)), seglenA, axis=2)

# Transpose to match shape
emg_G1 = np.transpose(emg_G1, axes=(0, 2, 1))
emg_G2 = np.transpose(emg_G2, axes=(0, 2, 1))

# Combine features (EMG, accelerometer, gyroscope)
X_G1 = emg_G1
X_G2 = emg_G2

# Merge grade 1 & 2 data
X_combined = np.vstack((X_G1, X_G2))

# Load labels
labels_G1 = np.load("/content/drive/MyDrive/Physiotherapy_USRF25/Copy of labels_activity_grade1_aipms.npy")
labels_G2 = np.load("/content/drive/MyDrive/Physiotherapy_USRF25/Copy of labels_activity_grade2_aipms.npy")

# Assign grade-specific labels
y_raw = [[activity, 10] for activity in labels_G1] + [[activity, 11] for activity in labels_G2]

# Multi-label binarization
mlb = MultiLabelBinarizer()
y_combined = mlb.fit_transform(y_raw)


In [6]:
X_combined.shape

(2000, 5, 6926)

In [8]:
y_combined

array([[1, 0, 0, ..., 0, 1, 0],
       [1, 0, 0, ..., 0, 1, 0],
       [1, 0, 0, ..., 0, 1, 0],
       ...,
       [0, 0, 0, ..., 1, 0, 1],
       [0, 0, 0, ..., 1, 0, 1],
       [0, 0, 0, ..., 1, 0, 1]])

In [7]:
y_combined.shape

(2000, 12)

In [10]:
y_combined[:,:-2].shape

(2000, 10)

In [11]:
y_combined[:,-2:].shape

(2000, 2)

In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np

# 1. EMG Signal Preprocessor
class EMGPreprocessor(nn.Module):
    def __init__(self, input_channels=X_combined.shape[1], hidden_dim=64):
        super().__init__()
        self.conv1 = nn.Conv1d(input_channels, hidden_dim, kernel_size=5, padding=2)
        self.conv2 = nn.Conv1d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        # x shape: [batch, channels, time_steps]
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = self.dropout(x)
        return x  # [batch, hidden_dim, time_steps//4]

# 2. Label-Aware Cross-Attention Head
class TaskSpecificAttentionHead(nn.Module):
    def __init__(self, feature_dim, label_dim, hidden_dim, num_labels):
        super().__init__()
        self.num_labels = num_labels
        self.label_embeddings = nn.Embedding(num_labels, label_dim)
        self.query = nn.Linear(label_dim, hidden_dim)
        self.key = nn.Linear(feature_dim, hidden_dim)
        self.value = nn.Linear(feature_dim, feature_dim)
        self.output_proj = nn.Linear(feature_dim, 1)

    def forward(self, x):
        batch_size, seq_len, _ = x.shape
        label_ids = torch.arange(self.num_labels).to(x.device)
        label_embs = self.label_embeddings(label_ids).unsqueeze(0).expand(batch_size, -1, -1)

        queries = self.query(label_embs)
        keys = self.key(x)
        values = self.value(x)

        attn_scores = torch.matmul(queries, keys.transpose(1, 2)) / np.sqrt(keys.size(-1))
        attn_weights = F.softmax(attn_scores, dim=-1)
        attended = torch.matmul(attn_weights, values)
        logits = self.output_proj(attended).squeeze(-1)
        return logits

# 3. Complete Multi-Label EMG Model
class EMGMultiLabelClassifier(nn.Module):
    def __init__(self, input_channels=emg_G1.shape[1], time_steps=emg_G1.shape[2],
                 activity_classes=10, fatigue_classes=2):
        super().__init__()

        # Signal processing
        self.preprocessor = EMGPreprocessor(input_channels)

        # Temporal encoder (BiLSTM)
        self.temporal_encoder = nn.LSTM(
            input_size=64,  # Must match EMGPreprocessor output
            hidden_size=128,
            num_layers=2,
            bidirectional=True,
            batch_first=True
        )

        # Attention heads
        self.activity_head = TaskSpecificAttentionHead(
            feature_dim=256,  # 2*128 for bidirectional
            label_dim=64,
            hidden_dim=128,
            num_labels=activity_classes
        )

        self.fatigue_head = TaskSpecificAttentionHead(
            feature_dim=256,
            label_dim=32,
            hidden_dim=64,
            num_labels=fatigue_classes
        )

    def forward(self, x):
        # x shape: [batch, channels, time_steps]
        x = self.preprocessor(x)

        # Permute for LSTM [batch, time_steps, features]
        x = x.permute(0, 2, 1)

        # Temporal encoding
        x, _ = self.temporal_encoder(x)

        # Get predictions from both heads
        activity_logits = self.activity_head(x)
        fatigue_logits = self.fatigue_head(x)

        return activity_logits, fatigue_logits

# 4. Custom Dataset and Training Setup
class EMGDataset(Dataset):
    def __init__(self, signals, activity_labels, fatigue_labels):
        self.signals = X_combined  # [n_samples, channels, time_steps]
        self.activity_labels = y_combined[:,:-2]  # [n_samples, activity_classes]
        self.fatigue_labels = y_combined[:,-2:]  # [n_samples, fatigue_classes]

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        return (
            torch.FloatTensor(self.signals[idx]),
            torch.FloatTensor(self.activity_labels[idx]),
            torch.FloatTensor(self.fatigue_labels[idx])
        )

# 5. Training Loop Example
#def train_model(model, dataloader, epochs=50):
#   device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#   model = model.to(device)

#   # Multi-task loss
#    activity_criterion = nn.BCEWithLogitsLoss()
#    fatigue_criterion = nn.BCEWithLogitsLoss()

#   optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

#   for epoch in range(epochs):
#        model.train()
#        total_loss = 0

#       for signals, activity_labels, fatigue_labels in dataloader:
#            signals = signals.to(device)
#            activity_labels = activity_labels.to(device)
#            fatigue_labels = fatigue_labels.to(device)

#            optimizer.zero_grad()

            # Forward pass
#            act_logits, fat_logits = model(signals)

            # Compute losses
#            act_loss = activity_criterion(act_logits, activity_labels)
#            fat_loss = fatigue_criterion(fat_logits, fatigue_labels)
#            loss = act_loss + 0.5 * fat_loss  # Weighted sum

            # Backward pass
#            loss.backward()
#            optimizer.step()

#            total_loss += loss.item()

#        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")


In [13]:
def compute_accuracy(preds, labels, threshold=0.5):
    """
    preds: raw logits (before sigmoid), shape [batch, num_classes]
    labels: binary ground-truths, shape [batch, num_classes]
    """
    preds = torch.sigmoid(preds)  # convert logits to probabilities
    preds_binary = (preds > threshold).float()
    correct = (preds_binary == labels).float()
    acc = correct.sum() / correct.numel()  # average over all labels and samples
    return acc.item()


In [14]:
@torch.no_grad()
def evaluate(model, dataloader, device):
    model.eval()
    total_loss, total_act_acc, total_fat_acc = 0, 0, 0

    criterion_act = nn.BCEWithLogitsLoss()
    criterion_fat = nn.BCEWithLogitsLoss()

    for signals, activity_labels, fatigue_labels in dataloader:
        signals = signals.to(device)
        activity_labels = activity_labels.to(device)
        fatigue_labels = fatigue_labels.to(device)

        act_logits, fat_logits = model(signals)

        act_loss = criterion_act(act_logits, activity_labels)
        fat_loss = criterion_fat(fat_logits, fatigue_labels)
        loss = act_loss + 0.5 * fat_loss

        total_loss += loss.item()
        total_act_acc += compute_accuracy(act_logits, activity_labels)
        total_fat_acc += compute_accuracy(fat_logits, fatigue_labels)

    avg_loss = total_loss / len(dataloader)
    avg_act_acc = total_act_acc / len(dataloader)
    avg_fat_acc = total_fat_acc / len(dataloader)

    print(f"[Test] Loss: {avg_loss:.4f} | Activity Acc: {avg_act_acc:.4f} | Fatigue Acc: {avg_fat_acc:.4f}")


In [15]:
def train_model(model, dataloader, epochs=50, test_loader=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    activity_criterion = nn.BCEWithLogitsLoss()
    fatigue_criterion = nn.BCEWithLogitsLoss()

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        model.train()
        total_loss, total_act_acc, total_fat_acc = 0, 0, 0

        for signals, activity_labels, fatigue_labels in dataloader:
            signals = signals.to(device)
            activity_labels = activity_labels.to(device)
            fatigue_labels = fatigue_labels.to(device)

            optimizer.zero_grad()

            # Forward
            act_logits, fat_logits = model(signals)

            # Loss
            act_loss = activity_criterion(act_logits, activity_labels)
            fat_loss = fatigue_criterion(fat_logits, fatigue_labels)
            loss = act_loss + 0.5 * fat_loss

            # Backward
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            total_act_acc += compute_accuracy(act_logits, activity_labels)
            total_fat_acc += compute_accuracy(fat_logits, fatigue_labels)

        avg_loss = total_loss / len(dataloader)
        avg_act_acc = total_act_acc / len(dataloader)
        avg_fat_acc = total_fat_acc / len(dataloader)

        print(f"[Train] Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f} | "
              f"Activity Acc: {avg_act_acc:.4f} | Fatigue Acc: {avg_fat_acc:.4f}")

        # Optional test evaluation
        if test_loader:
            evaluate(model, test_loader, device)


In [16]:
# Example Usage
if __name__ == "__main__":
    # Example data dimensions
    signals = X_combined  # [n_samples, channels, time_steps]
    activity_labels = y_combined[:,:-2]  # [n_samples, activity_classes]
    fatigue_labels = y_combined[:,-2:]  # [n_samples, fatigue_classes]

    n_samples = 2000
    channels = X_combined.shape[1]
    time_steps = X_combined.shape[2]
    activity_classes = 10
    fatigue_classes = 2

    # Generate data
    #signals = emg_G1
    #act_labels = np.random.randint(0, 2, (n_samples, activity_classes))
    #fat_labels = np.random.randint(0, 2, (n_samples, fatigue_classes))

    # Create dataset and dataloader
    dataset = EMGDataset(signals, activity_labels, fatigue_labels)
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Initialize and train model
    model = EMGMultiLabelClassifier(input_channels=channels,
                                  time_steps=time_steps,
                                  activity_classes=activity_classes,
                                  fatigue_classes=fatigue_classes)

    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [int(0.8*len(dataset)), len(dataset) - int(0.8*len(dataset))])
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Train model
    train_model(model, train_loader, epochs=20, test_loader=test_loader)
#train_model(model, dataloader, epochs=20)

[Train] Epoch 1/20 | Loss: 0.7062 | Activity Acc: 0.8840 | Fatigue Acc: 0.5000
[Test] Loss: 0.6717 | Activity Acc: 0.9000 | Fatigue Acc: 0.5000
[Train] Epoch 2/20 | Loss: 0.6717 | Activity Acc: 0.9000 | Fatigue Acc: 0.5000
[Test] Loss: 0.6717 | Activity Acc: 0.9000 | Fatigue Acc: 0.5000
[Train] Epoch 3/20 | Loss: 0.6717 | Activity Acc: 0.9000 | Fatigue Acc: 0.4978
[Test] Loss: 0.6717 | Activity Acc: 0.9000 | Fatigue Acc: 0.5048


KeyboardInterrupt: 