In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
from tqdm import tqdm
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from models.cnn_lstm import ASLTranslator, ASLDataLoader
from models.resnet50_bilstm import ResNet50BiLSTM

In [None]:
class ASLDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.classes = sorted(os.listdir(data_dir))
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}
        
        self.samples = []
        for class_name in self.classes:
            class_dir = os.path.join(data_dir, class_name)
            for video_file in os.listdir(class_dir):
                if video_file.endswith(('.mp4', '.avi', '.mov')):
                    self.samples.append((os.path.join(class_dir, video_file), class_name))
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        video_path, class_name = self.samples[idx]
        loader = ASLDataLoader(video_path, self.transform)
        frames = loader.load_video()
        label = self.class_to_idx[class_name]
        return frames, label

In [None]:
def collate_fn_padd(batch):
    # Find the maximum length of sequences in the batch (first dimension)
    max_len = max([item[0].size(0) for item in batch])

    # Find the maximum size of the last dimension across all tensors in the batch
    max_last_dim = max([item[0].size(-1) for item in batch])

In [None]:
    # Pad the sequences to the maximum length and the last dimension to the maximum size
    padded_inputs = []
    labels = []
    for inputs, label in batch:
        padding_size_seq = max_len - inputs.size(0)
        padding_size_last_dim = max_last_dim - inputs.size(-1)

        # Pad the first dimension (sequence length) and the last dimension
        # The padding order is (pad_left_dim_0, pad_right_dim_0, pad_left_dim_1, pad_right_dim_1, ...)
        # Since we only want to pad the first and last dimensions on the right, the padding tuple is (0, padding_size_last_dim, 0, 0, 0, 0, 0, padding_size_seq) for a 4D tensor
        # However, the documentation for F.pad is (pad_left, pad_right, pad_top, pad_bottom, pad_front, pad_back) for 3D, and it extends for higher dimensions.
        # For a 4D tensor [seq_len, channels, height, width], padding (last_dim, second_to_last_dim, ...)
        # We want to pad the first dimension (seq_len) and the last dimension (width)
        # The padding order for a 4D tensor is (pad_left_dim3, pad_right_dim3, pad_left_dim2, pad_right_dim2, pad_left_dim1, pad_right_dim1, pad_left_dim0, pad_right_dim0)
        # So to pad the first (seq_len) and last (width) dimensions on the right: (0, padding_size_last_dim, 0, 0, 0, 0, 0, padding_size_seq)
        padded_input = torch.nn.functional.pad(inputs, (0, padding_size_last_dim, 0, 0, 0, 0, 0, padding_size_seq))
        padded_inputs.append(padded_input)
        labels.append(label)

    # Stack the padded inputs and labels
    return torch.stack(padded_inputs), torch.tensor(labels)

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device):
    best_val_acc = 0.0
    
    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        train_pbar = tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]')
        for inputs, labels in train_pbar:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
            
            train_pbar.set_postfix({'loss': running_loss/total, 'acc': 100.*correct/total})
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            val_pbar = tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Val]')
            for inputs, labels in val_pbar:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                _, predicted = outputs.max(1)
                val_total += labels.size(0)
                val_correct += predicted.eq(labels).sum().item()
                
                val_pbar.set_postfix({'loss': val_loss/val_total, 'acc': 100.*val_correct/val_total})
        
        val_acc = 100. * val_correct / val_total
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'models/best_model.pth')
            print(f'New best model saved with validation accuracy: {val_acc:.2f}%')

In [None]:
def main():
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')
    
    # Data transforms
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Create datasets
    data_dir = 'data/processed'
    dataset = ASLDataset(data_dir, transform=transform)
    
    # Split dataset
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4, collate_fn=collate_fn_padd)
    val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=4, collate_fn=collate_fn_padd)
    
    # Initialize model
    num_classes = len(dataset.classes)
    model = ResNet50BiLSTM(num_classes=num_classes).to(device)
    
    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # Train the model
    num_epochs = 50
    train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, device)

In [None]:
if __name__ == '__main__':
    main() 