In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import numpy as np
from sklearn.model_selection import train_test_split
import os
from tqdm.notebook import tqdm
import json
import pickle
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

from utils import trainer


In [2]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3):
        """
        LSTM model for sequence classification.

        Args:
            input_size (int): Number of features per time step (e.g., 34 keypoints).
            hidden_size (int): Number of LSTM hidden units.
            output_size (int): Number of output units (e.g., 1 for binary classification).
            num_layers (int): Number of stacked LSTM layers.
            dropout (float): Dropout rate between LSTM layers.
        """
        super(LSTMModel, self).__init__()
        
        # Define LSTM layer(s)
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,  # Input shape: (batch, seq_len, input_size)
            dropout=dropout
        )
        
        # Fully connected layer for classification
        self.fc = nn.Linear(hidden_size, output_size)
        
        # Output activation (use sigmoid for binary classification)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        """
        Forward pass of the LSTM model.

        Args:
            x (Tensor): Input tensor of shape (batch_size, seq_len, input_size).

        Returns:
            Tensor: Output tensor of shape (batch_size, output_size).
        """
        # Pass through LSTM
        out, (h_n, c_n) = self.lstm(x)
        
        # Use the last hidden state for classification
        out = self.fc(out[:, -1, :])  # Get the output of the last time step
        
        # Apply sigmoid activation for binary classification
        out = self.sigmoid(out)
        return out


In [None]:
def process_data_with_windows(data_list, window_size=200, stride=30):
    """
    Converts the input dictionary into multiple windows for MLP training.
    
    Args:
        data_dict (dict): Dictionary with video data.
            - Keys: Video IDs
            - Values: {"keypoints": np.array of shape (num_frames, 34), "diag": binary}
        window_size (int): Number of frames per window.
        stride (int): Number of frames to slide for the next window.
        
    Returns:
        X (np.array): Flattened input features for MLP of shape (num_windows, 34 * window_size)
        y (np.array): Binary labels of shape (num_windows,)
    """
    idx_vid, X, y = [], [], []
    
    for video_id, video_data in df.groupby('video'):
        diag = video_data.iloc[0].diagnosis
        vd = video_data.drop(['video', 'diagnosis'], axis=1).to_numpy()
        num_frames= len(vd)
        
        # Create windows
        for start in range(0, num_frames - window_size + 1, stride):
            window = vd[start : start + window_size]
            X.append(window)
            y.append(diag)
            idx_vid.append(video_id)
    
    # Convert to numpy arrays
    X = np.array(X)
    y = np.array(y)
    
    return idx_vid, X, y


In [None]:
datapath = r"C:\Users\chataint\Documents\projet\humanlisbet\datasets\humans\humans_train_annoted.h5"
dataval = r"C:\Users\chataint\Documents\projet\humanlisbet\datasets\humans\humans_test_annoted.h5"
out = r"C:\Users\chataint\Documents\projet\humanlisbet\baseline\out_lstm"
mapping_path = r"C:\Users\chataint\Documents\projet\humanlisbet\datasets\humans\category_mapping.json"
label_path = r"C:\Users\chataint\Documents\projet\humanlisbet\datasets\humans\humans_annoted.label.json"

os.makedirs(out, exist_ok=True)

In [None]:
# Hyperparameters
INPUT_SIZE = 34  
HIDDEN_SIZE = 64
WINDOW = 200
OUTPUT_SIZE = 1  # Binary classification
LEARNING_RATE = 1e-4
EPOCHS = 500
BATCH_SIZE = 64
seed = 42
test_ratio = 0.8
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DROPOUT = 0.3
verbose = False
NUM_LAYER=1

# Parameter dictionary
run_parameters = {
    "input_size": INPUT_SIZE,
    "hidden_size": HIDDEN_SIZE,
    "output_size": OUTPUT_SIZE,
    "window": WINDOW,
    "learning_rate": LEARNING_RATE,
    "epochs": EPOCHS,
    "batch_size": BATCH_SIZE,
    "seed": seed,
    "test_ratio": test_ratio,
    "dropout": DROPOUT,
    "num_layer": NUM_LAYER,
    "verbose": verbose
}

with open(os.path.join(out, 'parameters.json'), 'w') as fd:
    json.dump(run_parameters, fd, indent=4)

model = LSTMModel(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE, NUM_LAYER, DROPOUT).to(device)

dfm = trainer(out, run_parameters, mapping_path, label_path, datapath, device, dataval, model, process_data_with_windows)


# Training and testing

In [6]:
records, labels = load_h5_data(datapath)
rec_train, rec_test = train_test_split(
            records, test_size=test_ratio, random_state=seed, stratify=labels
        )

with open(mapping_path, 'r') as fd:
    mapping = json.load(fd)

mapping = {int(key):value for key,value in mapping.items()}

with open(os.path.join(out, "rec_test"), 'wb') as fd:
    pickle.dump(rec_test, fd)
with open(os.path.join(out, "rec_train"), 'wb') as fd:
    pickle.dump(rec_train, fd)

idx_vid_test, X_test, y_test = process_data_with_windows(rec_test)
idx_vid_train, X_train, y_train = process_data_with_windows(rec_train)

In [7]:
dataset = AutismDataset(X_train, y_train, idx_vid_train, device=device)
train_loader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

dataset_test = AutismDataset(X_test, y_test, idx_vid_test, device=device)
test_loader = DataLoader(dataset_test, batch_size=BATCH_SIZE, shuffle=True)

# Initialize the model, loss, and optimizer
model = LSTMModel(INPUT_SIZE, HIDDEN_SIZE, OUTPUT_SIZE, NUM_LAYER, DROPOUT).to(device)

pos_weight = torch.tensor(((y_train.squeeze() -1).sum() *-1) / y_train.squeeze().sum())

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) # Binary Cross-Entropy Loss
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

early_stopping = EarlyStoppingMetric(patience=20, verbose=verbose, path=os.path.join(out, 'best_model.pth'), warm_up=1)

dfm = pd.DataFrame()
# Training loop
for epoch in tqdm(range(EPOCHS), desc="Training Progress", unit="epoch"):
    metrics = {'epoch':epoch}
    model.train()
    epoch_loss = 0
    for batch_idx, (batch_X, batch_y, _) in tqdm(enumerate(train_loader), 
                                              total=len(train_loader), 
                                              desc=f"Training {epoch + 1}", 
                                              leave=False, disable=not(verbose)):
        optimizer.zero_grad()
        outputs = model(batch_X).squeeze()
        loss = criterion(outputs, batch_y.squeeze())
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    
    # Adjust learning rate
    scheduler.step(epoch_loss)

    metrics['loss']=epoch_loss / len(train_loader)
    # Validation phase
    model.eval()  # Set model to evaluation mode
    test_targets, test_predictions, videos, test_loss = [], [], [], 0
    with torch.no_grad():
        for batch_idx, (batch_X, batch_y, idx_video) in tqdm(enumerate(test_loader), 
                                              total=len(test_loader), 
                                              desc=f"Validation {epoch + 1}", 
                                              leave=False, disable=not(verbose)):
            
            # Forward pass
            outputs = model(batch_X).squeeze()
            loss = criterion(outputs, batch_y.squeeze())
            test_loss += loss
            if outputs.dim() == 0:
                outputs = outputs.unsqueeze(0)
            
            # Store predictions and targets
            test_predictions.extend(torch.round(outputs).cpu().numpy())  # Convert logits to binary predictions
            test_targets.extend(batch_y.cpu().numpy())
            videos.extend(idx_video)
    
    test_loss = test_loss.cpu().numpy() / len(test_loader)
    # Compute validation metrics
    val_accuracy = accuracy_score(test_targets, test_predictions)
    val_precision = precision_score(test_targets, test_predictions, zero_division=0)
    val_recall = recall_score(test_targets, test_predictions, zero_division=0)
    val_f1 = f1_score(test_targets, test_predictions, zero_division=0)

    if verbose:
        tqdm.write(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {epoch_loss / len(train_loader):.4f} Validation - Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")
        
    if True:
        debug_metrics(test_targets, test_predictions, videos, mapping, epoch, out)
    metrics['acc'] = val_accuracy
    metrics['prec'] = val_precision
    metrics['rec'] = val_recall
    metrics['f1'] = val_f1
    metrics['test_loss'] = test_loss

    dfm = pd.concat([dfm, pd.DataFrame(metrics, index=[0])], ignore_index=True)

    # Check early stopping
    found_best = early_stopping(metric=val_f1, model=model, epoch=epoch)
    if found_best:
        get_metrics(test_targets, test_predictions, videos, mapping, out)
    # early_stopping(test_loss=test_loss, model=model, epoch=epoch)
    if early_stopping.early_stop:
        tqdm.write(f"Early stopping triggered. Training terminated. Best model at {early_stopping.best_epoch} with score={early_stopping.best_score}")
        break

print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {epoch_loss / len(train_loader):.4f} Validation - Accuracy: {val_accuracy:.4f}, Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, F1: {val_f1:.4f}")

# Save the model
torch.save(model.state_dict(), os.path.join(out,"last_model.pth"))
dfm.to_csv(os.path.join(out, 'metrics.csv'))






Training Progress:   0%|          | 0/500 [00:00<?, ?epoch/s]

Early stopping triggered. Training terminated. Best model at 9 with score=0.6149729042617247
Epoch 30/500, Loss: 0.6558 Validation - Accuracy: 0.5090, Precision: 0.4842, Recall: 0.6198, F1: 0.5437


# Validation phase

In [8]:
records, labels = load_h5_data(dataval)

idx_vid_val, X_val, y_val = process_data_with_windows(records)
y_val = y_val.squeeze()

db_val = AutismDataset(X_val, y_val,idx_vid_val, device=device)
val_loader = DataLoader(db_val, batch_size=BATCH_SIZE)


In [9]:
y_true, y_pred, videos = [], [],[]
with torch.no_grad():
    for batch_idx, (batch_X, batch_y, idx_video) in tqdm(enumerate(val_loader), 
                                              total=len(val_loader), 
                                              desc=f"Validation", 
                                              leave=False, disable=not(verbose)):
        y_pred.extend(model(batch_X).cpu().numpy().squeeze().round())
        y_true.extend(batch_y.cpu().numpy().squeeze())
        videos.extend(idx_video)

In [10]:
compute_validation(y_true, y_pred, videos, out, mapping)

Unnamed: 0,video,y_pred,y_true
0,8104,TD,ASD
1,8117,ASD,ASD
2,8121,ASD,ASD
3,8124,TD,ASD
4,8127,TD,ASD
5,8130,TD,ASD
6,8133,ASD,ASD
7,8107,TD,ASD
8,8137,ASD,ASD
9,8155,TD,ASD
