## Tranformer Encoder-Decoder

In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import os
import matplotlib.pyplot as plt
import copy
from typing import Optional

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
video_directory = 'Latest-WLASL-100'

total = 0

gesture_folder = np.array(os.listdir(video_directory))
for gestures in gesture_folder:
    gesture = []

    for fname in os.listdir(os.path.join(video_directory, gestures)):
        path = os.path.join(video_directory, gestures, fname)
        if os.path.isdir(path):
            gesture.append(fname)

    total += len(gesture) 
    # print(gestures, end =" : ")        
    # print(len(gesture))

print("Total gestures: ", len(gesture_folder), "; Total videos: ", total)

Total gestures:  100 ; Total videos:  4086


In [3]:
label_map = {label: num for num, label in enumerate(gesture_folder)}
len(label_map)

100

In [4]:
import torch
import torch.nn as nn

class CustomTransformerEncDec(nn.Module):
    def __init__(self, input_size, target_size, d_model=64, nhead=8,
                 num_encoder_layers=3, num_decoder_layers=3, dim_feedforward=128, dropout=0.1):
        super(CustomTransformerEncDec, self).__init__()
        # Projection layers for source (encoder) and target (decoder)
        self.src_projection = nn.Linear(input_size, d_model)
        self.tgt_projection = nn.Linear(target_size, d_model)
        
        # Learnable positional encodings for source and target sequences
        self.src_positional_encoding = nn.Parameter(torch.zeros(1, 5000, d_model))
        self.tgt_positional_encoding = nn.Parameter(torch.zeros(1, 5000, d_model))
        
        # Transformer module with encoder and decoder stacks
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True  # Ensures the batch dimension comes first
        )
        
        # Final linear layer to map the decoder output to the target size
        self.fc = nn.Linear(d_model, target_size)

    def forward(self, src, tgt):
        """
        Args:
            src: Tensor of shape [batch_size, src_seq_length, input_size]
            tgt: Tensor of shape [batch_size, tgt_seq_length, target_size]
                 Typically, during training, tgt is the ground-truth sequence shifted right.
        """
        # Project the source and target inputs to the model dimension
        src = self.src_projection(src)
        tgt = self.tgt_projection(tgt)
        
        # Add positional encodings
        src_seq_len = src.size(1)
        tgt_seq_len = tgt.size(1)
        src = src + self.src_positional_encoding[:, :src_seq_len, :]
        tgt = tgt + self.tgt_positional_encoding[:, :tgt_seq_len, :]
        
        # Generate a causal mask for the target to prevent positions from attending to subsequent positions
        tgt_mask = self.transformer.generate_square_subsequent_mask(tgt_seq_len).to(src.device)
        
        # Pass through the Transformer (encoder-decoder)
        # The transformer returns a tensor of shape [batch_size, tgt_seq_length, d_model]
        output = self.transformer(src, tgt, tgt_mask=tgt_mask)
        
        # Project the output to the target dimension (e.g., vocabulary size or number of classes)
        output = self.fc(output)
        return output


In [7]:
# Load data
X = np.load('train/X_TRAIN_landmarks_normalized.npy')
y = np.load('train/y_TRAIN_landmarks_normalized.npy')

print(f"Data shapes - X: {X.shape}, y: {y.shape}")

y = tf.keras.utils.to_categorical(y, num_classes=len(gesture_folder))
y_labels = np.argmax(y, axis=1)

# Split the data
X_train_ori, X_test_ori, y_train_ori, y_test_ori = train_test_split(X, y, test_size=0.2, stratify=y_labels, random_state=42)
X_test_ori, X_val_ori, y_test_ori, y_val_ori = train_test_split(X_test_ori, y_test_ori, test_size=0.5, stratify=y_test_ori.argmax(axis=1), random_state=42)

# Convert to PyTorch tensors
X_train = torch.tensor(X_train_ori, dtype=torch.float32)
X_val = torch.tensor(X_val_ori, dtype=torch.float32)
X_test = torch.tensor(X_test_ori, dtype=torch.float32)
y_train = torch.tensor(y_train_ori.argmax(axis=1), dtype=torch.long)
y_val = torch.tensor(y_val_ori.argmax(axis=1), dtype=torch.long)
y_test = torch.tensor(y_test_ori.argmax(axis=1), dtype=torch.long)

print(f"Training set - X: {X_train.shape}, y: {y_train.shape}")
print(f"Validation set - X: {X_val.shape}, y: {y_val.shape}")
print(f"Test set - X: {X_test.shape}, y: {y_test.shape}")


Data shapes - X: (4086, 512, 258), y: (4086,)
Training set - X: torch.Size([3268, 512, 258]), y: torch.Size([3268])
Validation set - X: torch.Size([409, 512, 258]), y: torch.Size([409])
Test set - X: torch.Size([409, 512, 258]), y: torch.Size([409])


In [8]:
# Create data loaders
batch_size = 32  # Reduced batch size to avoid memory issues
train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test, y_test), batch_size=batch_size, shuffle=False)

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [10]:
# Model, loss, and optimizer
input_size = X_train.size(-1)
num_classes = len(label_map)
model = CustomTransformerEncDec(input_size, num_classes).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop parameters
num_epochs = 400
loss_history = []
val_loss_history = []
loss_threshold = 0.1

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    
    for X_batch, y_batch in train_loader:
        # Move batch to GPU
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        optimizer.zero_grad()
        
        # Create a dummy target tensor for the decoder input.
        # Here we assume a target sequence length of 1.
        dummy_tgt = torch.zeros(X_batch.size(0), 1, num_classes).to(device)
        
        # Forward pass: pass both src and tgt to the model
        outputs = model(X_batch, dummy_tgt)  # outputs shape: [batch_size, 1, num_classes]
        outputs = outputs.squeeze(1)         # Now shape: [batch_size, num_classes]
        
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    loss_history.append(avg_loss)
    
    # Validation phase
    model.eval()
    val_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            dummy_tgt = torch.zeros(X_batch.size(0), 1, num_classes).to(device)
            outputs = model(X_batch, dummy_tgt)
            outputs = outputs.squeeze(1)
            loss = criterion(outputs, y_batch)
            val_loss += loss.item()

            predictions = outputs.argmax(dim=1)
            correct += (predictions == y_batch).sum().item()
            total += y_batch.size(0)

    avg_val_loss = val_loss / len(val_loader)
    val_loss_history.append(avg_val_loss)
    val_accuracy = correct / total

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}, Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

    if avg_loss < loss_threshold:
        print(f'Loss threshold of {loss_threshold} reached. Stopping training.')
        break

# Evaluate the model
model.eval()
with torch.no_grad():
    X_test = X_test.to(device)
    y_test = y_test.to(device)
    
    dummy_tgt = torch.zeros(X_test.size(0), 1, num_classes).to(device)
    test_outputs = model(X_test, dummy_tgt)
    test_outputs = test_outputs.squeeze(1)
    test_loss = criterion(test_outputs, y_test)
    accuracy = (test_outputs.argmax(dim=1) == y_test).float().mean()
    print(f'Test Loss: {test_loss.item():.4f}, Test Accuracy: {accuracy.item():.4f}')


Epoch [1/400], Loss: 4.6250, Val Loss: 4.3750, Val Accuracy: 0.0171
Epoch [2/400], Loss: 4.2203, Val Loss: 4.0577, Val Accuracy: 0.0416
Epoch [3/400], Loss: 3.9243, Val Loss: 3.8457, Val Accuracy: 0.0660
Epoch [4/400], Loss: 3.7738, Val Loss: 3.7382, Val Accuracy: 0.0807
Epoch [5/400], Loss: 3.6434, Val Loss: 3.6367, Val Accuracy: 0.1002
Epoch [6/400], Loss: 3.4883, Val Loss: 3.6366, Val Accuracy: 0.0880
Epoch [7/400], Loss: 3.3333, Val Loss: 3.3726, Val Accuracy: 0.1271
Epoch [8/400], Loss: 3.2545, Val Loss: 3.2914, Val Accuracy: 0.1443
Epoch [9/400], Loss: 3.0980, Val Loss: 3.2352, Val Accuracy: 0.1394
Epoch [10/400], Loss: 3.0058, Val Loss: 3.0193, Val Accuracy: 0.1956
Epoch [11/400], Loss: 2.9122, Val Loss: 2.9195, Val Accuracy: 0.2054
Epoch [12/400], Loss: 2.7350, Val Loss: 2.9961, Val Accuracy: 0.1956
Epoch [13/400], Loss: 2.6758, Val Loss: 2.8271, Val Accuracy: 0.2176
Epoch [14/400], Loss: 2.5907, Val Loss: 2.8232, Val Accuracy: 0.2445
Epoch [15/400], Loss: 2.4435, Val Loss: 2.6