In [5]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader, TensorDataset
from collections import Counter
import os

In [6]:
import wandb 
wandb.login(key='af968dcc6f77f75c5bcd9bfc39807a51bcecab9d')

ModuleNotFoundError: No module named 'wandb'

In [12]:
# Load annotations
annotation_path = "/home/henry/robo/cis5810/final/cis5810_final_gait_classifier/data/gavd_dataset/annotations/gavd_annotations.pkl"
df = pd.read_pickle(annotation_path)
clip_label = df[['seq', 'gait_pat']].drop_duplicates()
clip_angle = df[['seq', 'cam_view']].drop_duplicates()
label_mapping = {row['seq']: row['gait_pat'] for idx, row in clip_label.iterrows()}
angle_mapping = {row['seq']: row['cam_view'] for idx, row in clip_angle.iterrows()}

# Load all sequence directories
feature_dir = "/home/henry/robo/cis5810/final/cis5810_final_gait_classifier/data/gavd_dataset/all_features_interp"
sequence_dirs = [d.split("_")[0] for d in os.listdir(feature_dir) if d.startswith('c')]
labels = [label_mapping[seq] for seq in sequence_dirs]

# Count occurrences of each label
label_counts = Counter(labels)

# Keep only classes with at least 2 samples
valid_classes = {label for label, count in label_counts.items() if count > 1}
filtered_dirs = [seq for seq, label in zip(sequence_dirs, labels) if label in valid_classes]
filtered_labels = [label for label in labels if label in valid_classes]
label_mapping = {row['seq']: row['gait_pat'] for idx, row in clip_label.iterrows()}
angle_mapping = {row['seq']: row['cam_view'] for idx, row in clip_angle.iterrows()}

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(filtered_labels)
# Create a mapping of labels to their encodings
label_to_encoding = {label: idx for idx, label in enumerate(label_encoder.classes_)}
encoding_to_label = {idx: label for label, idx in label_to_encoding.items()}
encoded_label_mapping = {idx: label_to_encoding[label] for idx, label in label_mapping.items()}

# I have too many samples labeled abnormal. Randomly remove 1/2 of them.
print(len(filtered_dirs))

abnormal_indices = [idx for idx, label in enumerate(y_encoded) if label == label_to_encoding['abnormal']]
np.random.shuffle(abnormal_indices)
abnormal_indices = abnormal_indices[:len(abnormal_indices)//2]
filtered_dirs = [d for idx, d in enumerate(filtered_dirs) if idx not in abnormal_indices]
y_encoded = [label for idx, label in enumerate(y_encoded) if idx not in abnormal_indices]

print(len(filtered_dirs))
# Stratified split
train_dirs, temp_dirs, train_labels, temp_labels = train_test_split(
    filtered_dirs, y_encoded, stratify=y_encoded, test_size=0.35, random_state=42
)
val_dirs, test_dirs, val_labels, test_labels = train_test_split(
    temp_dirs, temp_labels, stratify=temp_labels, test_size=0.5, random_state=42)

print(train_dirs)

label_counts = Counter(y_encoded)
print(label_counts)

# compute class weights for loss function
class_weights = {}
total_samples = len(y_encoded)
for label, count in label_counts.items():
    class_weights[label] = total_samples / (len(label_counts) * count)
print(class_weights)




1495
1192
['cll7nuzfe001r3o6lzh1nyjnq', 'cll8xwtqh00053o6lnmzz62kh', 'cll7p63zc00753o6lmf6n6p9i', 'cljxkr5zf002k3n6lv3aaexih', 'cljawo31o00173n6lk7goqxm1', 'cll8k24qs00583o6lbnpqmjyl', 'cljr6ohvb000k3n6lfqwsgy5d', 'clk7v0al3000j3n6l81n66a68', 'cljxm3ajs004m3n6lk7unsn04', 'cljs2vx9b001s3n6lo4tc3qwd', 'cljo7eycd00333n6loe70zhzt', 'cljwsyscb00133n6lj5h5v9v1', 'clk6lmg90004w3n6l0o506aay', 'cljwr3zxa000c3n6l3s75sppt', 'cll9v88e7005b3o6lsm78vmuo', 'cllephqq9001m3o6li43x1ynu', 'cll74opgk001w3o6lpjvqslkk', 'cll8yzbic006p3o6lmzz1rotd', 'cljarhldg00d13n6l7utw0lqn', 'cljxkrp7o002o3n6lmygdz4gg', 'cll44balt00193o6lbzpyxlqq', 'cll8yvfp4005t3o6l33zyx8ap', 'cll7phgq7001f3o6lhvyp3qkx', 'cljo19dd1002f3n6lx3t94r7n', 'cljr3vdmx002j3n6ldt042b04', 'cljwtm8x8001o3n6lz7zicp07', 'cll44n86b003l3o6l2rl66u61', 'cll8kklqc00843o6lqv7v4xnl', 'cljw77snx004c3n6lvetndlu2', 'cljapao5600503n6lhrbr4zii', 'cll7ar7am00053o6l5o5l0gu5', 'cljwspg6e00213n6ltn2y4cy8', 'clk7vuicr00413n6l97p3b0gj', 'clk7vy9ja004f3n6lhju0ini5', 'cl

In [4]:
# Initialize Dataset
train_dataset = MediapipeDataset(
    clips=train_dirs,
    feature_dir=feature_dir,
    label_mapping=encoded_label_mapping,
    angle_mapping=angle_mapping,
    sequence_length=30,
    downsample_factor=3
)
# Initialize Dataset
val_dataset = MediapipeDataset(
    clips=val_dirs,
    feature_dir=feature_dir,
    label_mapping=encoded_label_mapping,
    angle_mapping=angle_mapping,
    sequence_length=30,
    downsample_factor=3
)
# Initialize Dataset
test_dataset = MediapipeDataset(
    clips=test_dirs,
    feature_dir=feature_dir,
    label_mapping=encoded_label_mapping,
    angle_mapping=angle_mapping,
    sequence_length=30,
    downsample_factor=3
)

# Make data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [5]:
# Model parameters
input_size = train_dataset[0][0].shape[1]  # Number of features, assuming landmark_11 to landmark_32 (22 landmarks)
hidden_size = 512  # Size of the LSTM's hidden layer
num_layers = 3     # Number of LSTM layers
output_size = 12    # Number of output units (change this based on your task)

# Instantiate the model
model = LSTMwithAngle(input_size, hidden_size, num_layers, output_size)


In [12]:
import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.utils.class_weight import compute_class_weight

# Initialize wandb

wandb.init(project='mp-lstm-gait-analysis')
wandb.log({"hidden_size": hidden_size, 
           "num_layers": num_layers,
           "sequence_length": train_dataset.sequence_length, 
           "downsampling": train_dataset.downsample_factor
           })

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Loss function
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_encoded), y=y_encoded)
class_weights = torch.tensor(class_weights, dtype=torch.float)
class_weights = class_weights.to(device)

# Use in loss
criterion = torch.nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Track the best validation loss
best_val_loss = float('inf')
best_model_weights = None  # To hold the best model's weights

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct = 0
    total = 0

    for batch in train_loader:
        sequences, labels, _, angles = batch  # Assuming train_loader returns inputs and targets
        sequences = sequences.to(device) 
        labels = labels.to(device)
        angles = angles.to(device)

        assert not torch.isnan(sequences).any(), "NaN found in inputs"
        assert not torch.isinf(sequences).any(), "Inf found in inputs"

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(sequences, angles)
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total

    # Log the validation loss and accuracy to wandb
    wandb.log({"epoch":epoch, "train_loss": train_loss, "train_accuracy": train_accuracy})
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")

    # Validation step
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    skipped = 0
    with torch.no_grad():
        for sequences, labels, _, angles in val_loader:
            sequences = sequences.to(device)
            labels = labels.to(device)
            angles = angles.to(device)

            assert not torch.isnan(labels).any(), "NaN found in inputs"
            assert not torch.isinf(labels).any(), "Inf found in inputs"

            outputs = model(sequences,angles)
            if not torch.isnan(outputs).any():           
                loss = criterion(outputs, labels)
                val_loss += loss.item()
            else:
                skipped += 1

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= (len(val_loader)-skipped)
    val_accuracy = 100 * correct / total

    # Log the validation loss and accuracy to wandb
    wandb.log({"epoch":epoch, "val_loss": val_loss, "val_accuracy": val_accuracy})
    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

    # Save the model if the validation loss improves
    if val_loss < best_val_loss:
        best_val_loss = val_loss

        # Save the model checkpoint
        torch.save(model.state_dict(), f"big_mp_model_20_3.pth")
        print("Model saved with improved validation loss.")


0,1
downsampling,▁
hidden_size,▁
num_layers,▁
sequence_length,▁

0,1
downsampling,3
hidden_size,512
num_layers,3
sequence_length,30


Train Loss: 2.3337, Train Accuracy: 5.97%
Val Loss: 2.3938, Val Accuracy: 4.38%
Model saved with improved validation loss.
Train Loss: 2.2231, Train Accuracy: 7.87%
Val Loss: 2.3604, Val Accuracy: 4.93%
Model saved with improved validation loss.
Train Loss: 2.1679, Train Accuracy: 9.58%
Val Loss: 2.3263, Val Accuracy: 28.52%
Model saved with improved validation loss.
Train Loss: 2.1121, Train Accuracy: 10.91%
Val Loss: 2.3396, Val Accuracy: 6.63%
Train Loss: 2.0454, Train Accuracy: 11.04%
Val Loss: 2.3557, Val Accuracy: 5.60%
Train Loss: 1.9706, Train Accuracy: 12.42%
Val Loss: 2.2856, Val Accuracy: 9.86%
Model saved with improved validation loss.
Train Loss: 1.8931, Train Accuracy: 14.79%
Val Loss: 2.3181, Val Accuracy: 7.18%
Train Loss: 1.8027, Train Accuracy: 16.58%
Val Loss: 2.2033, Val Accuracy: 12.54%
Model saved with improved validation loss.
Train Loss: 1.7484, Train Accuracy: 19.56%
Val Loss: 2.2918, Val Accuracy: 9.90%
Train Loss: 1.6748, Train Accuracy: 20.49%
Val Loss: 2.20

In [13]:
torch.save(model.state_dict(), f"final_big_mpmodel_30_3_angles.pth")

In [15]:
# Test the model
model.eval()  # Set the model to evaluation mode
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
    for batch in test_loader:
        inputs, targets, _, angles = batch
        inputs = inputs.to(device)
        targets = targets.to(device)
        angles = angles.to(device)

        outputs = model(inputs, angles)
        loss = criterion(outputs, targets)
        test_loss += loss.item()

        # Get predictions
        _, predicted = torch.max(outputs, 1)
        total += targets.size(0)
        correct += (predicted == targets).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")


Test Accuracy: 53.51%


In [16]:
from sklearn.metrics import confusion_matrix
from collections import defaultdict
import numpy as np

# Extract true labels, clip IDs, and predictions
y_true = [x[1] for x in test_dataset]
clip_ids = [x[2] for x in test_dataset]
angles = torch.stack([x[3] for x in test_dataset]).to(device)
x_test = torch.stack([x[0] for x in test_dataset]).to(device)


# Predict using the model
y_pred = torch.argmax(model(x_test, angles), dim=1).cpu().numpy()

# Group predictions and true labels by video (clip ID)
clip_predictions = defaultdict(list)
clip_labels = {}

for pred, true_label, clip in zip(y_pred, y_true, clip_ids):
    clip_predictions[clip].append(pred)
    clip_labels[clip] = true_label  # True label for each video

# Determine majority vote prediction and true label per video
video_preds = []
video_labels = []

for clip, preds in clip_predictions.items():
    majority_vote = np.bincount(preds).argmax()  # Most common prediction
    video_preds.append(majority_vote)
    video_labels.append(clip_labels[clip])

correct = np.equal(video_preds,video_labels).sum()
print(f"Test Accuracy: {100 * correct / len(video_preds):.2f}%")

# Generate the per-video confusion matrix
per_video_conf_matrix = confusion_matrix(video_labels, video_preds)
print("Per-Video Confusion Matrix:\n", per_video_conf_matrix)

# Assuming you have a label encoder to get class names
print("Class Labels:", label_encoder.classes_)


Test Accuracy: 53.90%
Per-Video Confusion Matrix:
 [[83  1  2  7  1  3  5  0  2  6  1]
 [ 1  4  0  0  0  0  0  0  1  0  0]
 [ 4  0  6  0  0  0  0  0  1  0  0]
 [21  1  0 11  2  1  2  0  0  1  0]
 [ 0  0  0  0  1  0  1  0  0  0  0]
 [ 7  1  0  2  0 18  1  1  2  0  0]
 [16  2  2  1  0  3 15  0  3  0  2]
 [ 5  0  0  0  0  0  0  2  0  0  0]
 [ 4  0  0  0  0  1  0  0  2  0  0]
 [ 2  1  0  0  0  1  0  0  0  9  0]
 [ 4  0  0  2  0  2  1  0  0  0  1]]
Class Labels: ['abnormal' 'antalgic' 'cerebral palsy' 'exercise' 'inebriated'
 'myopathic' 'normal' 'parkinsons' 'pregnant' 'prosthetic' 'stroke'
 'style']


In [17]:
from sklearn.metrics import confusion_matrix

# Generate the confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)
print("Confusion Matrix:\n", conf_matrix)

print(label_encoder.classes_)


Confusion Matrix:
 [[738  16  27  70   5  56  53  31   6  40  36   6]
 [  7  23   0   0   0   7   0   3   0   8   0   0]
 [ 31   3  50   1   1   0   5   0   0   6   6   6]
 [114   8  20 142  11  14  17   0   0   3  11   9]
 [  1   0   0   1   8   0   2   0   0   2   5   0]
 [ 43   9   2  15   0 107  19   4   0  13   3   5]
 [ 87   7  15  25   3  25  70   2   0  21   8   8]
 [ 52   0   0  12   0   2   3  13   0   3   2   0]
 [  0   0   0   0   0   0   0   0   0   0   0   0]
 [ 31   0   0   0   0   7   1   0   0  31   2   0]
 [ 32   5   6   6   0  19  18   6   0   0 186   0]
 [ 11   0   0   5   0   9   3   0   0   0   0  10]]
['abnormal' 'antalgic' 'cerebral palsy' 'exercise' 'inebriated'
 'myopathic' 'normal' 'parkinsons' 'pregnant' 'prosthetic' 'stroke'
 'style']


In [18]:
import numpy as np

# Calculate per-class accuracy
class_accuracy = {}
num_classes = per_video_conf_matrix.shape[0]

for i in range(num_classes):
    true_positives = per_video_conf_matrix[i, i]  # Correctly predicted instances for class i
    total_instances = np.sum(per_video_conf_matrix[i, :])  # Total instances for class i
    if total_instances > 0:
        class_accuracy[label_encoder.classes_[i]] = true_positives / total_instances * 100  # Percent correct
    else:
        class_accuracy[label_encoder.classes_[i]] = 0.0  # Handle cases with no instances of class i

#print("Original labels:", label_encoder.classes_)

# Print per-class accuracy
for class_label, accuracy in class_accuracy.items():
    print(f"Class {class_label}: {accuracy:.2f}%")


Class abnormal: 74.77%
Class antalgic: 66.67%
Class cerebral palsy: 54.55%
Class exercise: 28.21%
Class inebriated: 50.00%
Class myopathic: 56.25%
Class normal: 34.09%
Class parkinsons: 28.57%
Class pregnant: 28.57%
Class prosthetic: 69.23%
Class stroke: 10.00%


In [None]:
import torch.backends.cudnn as cudnn
import shap

# Ensure the model is on the appropriate device
model.to(device)

# Use a small subset of your data as background for SHAP
background = x_test[:20]  # Choose a reasonable size to avoid memory issues

# Temporarily disable cudnn for RNNs
cudnn.enabled = False
cudnn.benchmark = False

try:
    # Set the model to training mode temporarily
    model.train()

    # Initialize the SHAP explainer
    explainer = shap.DeepExplainer(model, background)

    # Generate SHAP values for your test data
    shap_values = explainer.shap_values(x_test)
finally:
    # Restore cudnn settings and evaluation mode
    cudnn.enabled = True
    cudnn.benchmark = True
    model.eval()

# Visualize SHAP values (example for a single prediction)
shap.summary_plot(shap_values, x_test.cpu().numpy())




KeyboardInterrupt: 