# Transformer Classifier with Skeleton Dataset

In [1]:
import os
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn import functional as F
import matplotlib.pyplot as plt
import random
from tqdm import tqdm
from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
import seaborn as sns
import numpy as np
import time

## Dataloader from numpy files

In [2]:
class SkeletonDataset(Dataset):
    def __init__(self, data_path, split):
        self.X = np.load(os.path.join(data_path, f'X_{split}.npy'),  allow_pickle=True)
        self.y = np.load(os.path.join(data_path, f'y_{split}.npy'),  allow_pickle=True)
        self.X = self.X.reshape(self.X.shape[0], self.X.shape[2], self.X.shape[1])
        self.y = np.where(self.y == 27, 0, self.y)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        
        return torch.tensor(self.X[idx], dtype=torch.float32), torch.tensor(self.y[idx], dtype=torch.long)

def get_dataloader(data_path, split, batch_size):
    dataset = SkeletonDataset(data_path, split)
    return DataLoader(dataset, batch_size=batch_size, shuffle=False)

# Create train, validation, and test loaders
def get_all_dataloaders(data_path, batch_size):
    train_loader = get_dataloader(data_path, 'train', batch_size)
    test_loader = get_dataloader(data_path, 'test', batch_size)
    return train_loader, test_loader

In [3]:
train_loader, test_loader = get_all_dataloaders('Skeleton_numpy', batch_size=16)

In [4]:
for data, labels in train_loader:
    print("Data shape:", data.shape)
    print("Labels shape:", labels.shape)
    break

Data shape: torch.Size([16, 125, 60])
Labels shape: torch.Size([16])


In [5]:
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim, num_classes, num_heads=8, num_layers=4, hidden_dim=256, dropout=0.1):
        super(TransformerClassifier, self).__init__()
        
        self.input_dim = input_dim
        self.num_classes = num_classes
        
        # Transformer encoder layer
        encoder_layers = nn.TransformerEncoderLayer(
            d_model=input_dim,  # Input dimension (features per frame)
            nhead=num_heads,    # Number of attention heads
            dim_feedforward=hidden_dim,  # Feedforward hidden layer size
            dropout=dropout
        )
        
        # Stacked transformer encoder
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layers, num_layers=num_layers
        )
        
        # Classifier head
        self.fc = nn.Linear(input_dim, num_classes)  # Final layer to output class probabilities
    
    def forward(self, x):
        """
        x: (batch_size, time_steps, features)
        """
        # Transformer expects input of shape (sequence_length, batch_size, input_dim)
        x = x.permute(1, 0, 2)  # Shape: (time_steps, batch_size, features)
        
        # Apply transformer encoder
        transformer_out = self.transformer_encoder(x)
        
        # We take the output of the last time step (or average pooling across time)
        # For simplicity, let's use the last time step output (as a representation of the sequence)
        x = transformer_out[-1, :, :]  # Shape: (batch_size, features)
        
        # Classifier head to predict the class
        x = self.fc(x)
        return x

## Modified transformer architecture for better performance on this dataset

In [6]:

class EnhancedTransformerClassifier(nn.Module):
    def __init__(self, input_dim, num_classes, num_heads=8, num_layers=6, hidden_dim=512, dropout=0.2):
        super(EnhancedTransformerClassifier, self).__init__()
        
        self.input_dim = input_dim
        self.num_classes = num_classes
        
        # Transformer encoder layer with more depth, hidden layers, and dropout
        encoder_layers = nn.TransformerEncoderLayer(
            d_model=input_dim,  # Input dimension (features per frame)
            nhead=num_heads,    # Number of attention heads
            dim_feedforward=hidden_dim,  # Feedforward hidden layer size
            dropout=dropout
        )
        
        # Stacked transformer encoder with more layers (increased depth)
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layers, num_layers=num_layers
        )
        
        # Use global average pooling instead of just the last time step output
        self.pooling = nn.AdaptiveAvgPool1d(1)  # Global average pooling across the time dimension
        
        # Classifier head
        self.fc = nn.Linear(input_dim, num_classes)  # Final layer to output class probabilities
        
        # Dropout for regularization
        self.dropout = nn.Dropout(dropout)
        
        # Layer Normalization for better training
        self.layer_norm = nn.LayerNorm(input_dim)

    def forward(self, x):
        """
        x: (batch_size, time_steps, features)
        """
        # Transformer expects input of shape (sequence_length, batch_size, input_dim)
        x = x.permute(1, 0, 2)  # Shape: (time_steps, batch_size, features)
        
        # Apply transformer encoder
        transformer_out = self.transformer_encoder(x)
        
        # Global average pooling (across time_steps)
        x = transformer_out.mean(dim=0)  # Shape: (batch_size, features)
        
        # Alternatively, we could also try adaptive pooling
        # x = self.pooling(transformer_out.permute(1, 2, 0)).squeeze(-1)  # Apply global average pooling
        
        # Layer normalization for better stability
        x = self.layer_norm(x)
        
        # Dropout to regularize the output
        x = self.dropout(x)
        
        # Classifier head to predict the class
        x = self.fc(x)
        return x


## Training model

In [7]:
def train_model(model, train_loader, device, epochs=10, lr=1e-4):
    criterion = nn.CrossEntropyLoss()  # For classification task
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    model.to(device)  # Move model to CUDA if available

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
            
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(X_batch)
            
            # Compute loss
            loss = criterion(outputs, y_batch)
            running_loss += loss.item()
            
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            
            # Calculate accuracy
            _, predicted = torch.max(outputs, 1)
            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.2f}%")

In [8]:
input_dim = 60  # Number of features (e.g., 75 landmarks × 3 coordinates)
num_classes = 27 #len(set([item['y'] for item in train_loader.dataset]))  # Number of unique glosses
num_heads = 12
print(num_classes)

# Initialize model
model = EnhancedTransformerClassifier(input_dim=input_dim, num_heads = num_heads, num_classes=num_classes)
print(model)

# Select device (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
# Train the model
train_model(model, train_loader, device=device, epochs=2000, lr=1e-4)

# Test the model on test data
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")




Epoch 1/2000, Loss: 3.4006, Accuracy: 3.71%
Epoch 2/2000, Loss: 3.3383, Accuracy: 3.71%
Epoch 3/2000, Loss: 3.3389, Accuracy: 4.45%
Epoch 4/2000, Loss: 3.3095, Accuracy: 5.01%
Epoch 5/2000, Loss: 3.3144, Accuracy: 3.15%
Epoch 6/2000, Loss: 3.3312, Accuracy: 5.19%
Epoch 7/2000, Loss: 3.3168, Accuracy: 5.19%
Epoch 8/2000, Loss: 3.2876, Accuracy: 6.12%
Epoch 9/2000, Loss: 3.2210, Accuracy: 5.75%
Epoch 10/2000, Loss: 3.1259, Accuracy: 8.53%
Epoch 11/2000, Loss: 3.1465, Accuracy: 6.68%
Epoch 12/2000, Loss: 3.0794, Accuracy: 8.16%
Epoch 13/2000, Loss: 3.0222, Accuracy: 9.83%
Epoch 14/2000, Loss: 3.0300, Accuracy: 10.76%
Epoch 15/2000, Loss: 3.0456, Accuracy: 10.02%
Epoch 16/2000, Loss: 3.0102, Accuracy: 8.53%
Epoch 17/2000, Loss: 2.9963, Accuracy: 9.28%
Epoch 18/2000, Loss: 2.9723, Accuracy: 11.32%
Epoch 19/2000, Loss: 2.9738, Accuracy: 12.80%
Epoch 20/2000, Loss: 2.9303, Accuracy: 13.54%
Epoch 21/2000, Loss: 2.8850, Accuracy: 15.03%
Epoch 22/2000, Loss: 2.8539, Accuracy: 16.51%
Epoch 23/200

## Test results

In [9]:
model.eval()
correct = 0
total = 0
start=time.time()
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()
end = time.time()
print(f"Test Accuracy: {100 * correct / total:.2f}%")
print("Time per sample:", (end-start)*1000/total)


Test Accuracy: 57.45%
Time per sample: 0.2052635879990477


## Print metrics

In [10]:
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)  # Move data to device
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)

        all_preds.extend(predicted.cpu().numpy())  # Collect predictions
        all_labels.extend(y_batch.cpu().numpy())  # Collect true labels

# Calculate precision, recall, F1 score (macro averaged), and accuracy
precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='macro')
accuracy = accuracy_score(all_labels, all_preds)

print(f"Test Accuracy: {accuracy * 100}%")
print(f"Test Precision (Macro): {precision}")
print(f"Test Recall (Macro): {recall}")
print(f"Test F1 Score (Macro): {f1}")


Test Accuracy: 57.453416149068325%
Test Precision (Macro): 0.6085554877221543
Test Recall (Macro): 0.5754769921436589
Test F1 Score (Macro): 0.555776354190826
