Abderrahmane Balah - 201932370 - Section 1

This notebook has code related to developing a TRANSFORMER BASED MODEL.

# TRANSFORMER BASED SOLUTION FOR SIGN LANGUAGE INTERPRETATION

# Data Preprocessing

In [9]:
import torch
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchvision import transforms
from PIL import Image
import os

class SignLanguageDataset(Dataset):
    def __init__(self, root_dir, caption_mapping, vocab, transform=None, data=None):
        self.root_dir = root_dir
        self.caption_mapping = caption_mapping
        self.vocab = vocab
        self.transform = transform
        if(data is None):
          self.data = self._load_data()
        else:
          self.data = data

    def _load_data(self):
        data = []
        for folder_name in sorted(os.listdir(self.root_dir)):
            folder_path = os.path.join(self.root_dir, folder_name)
            for video_folder in os.listdir(folder_path):
                video_folder_path = os.path.join(folder_path, video_folder)
                frames = [os.path.join(video_folder_path, frame) for frame in sorted(os.listdir(video_folder_path))]
                caption = self.caption_mapping[folder_name]
                numericalized_caption = numericalize(caption, self.vocab)
                data.append((frames, numericalized_caption))
        return data

    def __getitem__(self, idx):
        frames_paths, numericalized_caption = self.data[idx]
        frames = [Image.open(frame_path).convert('RGB') for frame_path in frames_paths]

        if self.transform:
            frames = [self.transform(frame) for frame in frames]
        else:
            to_tensor = transforms.ToTensor()
            frames = [to_tensor(frame) for frame in frames]

        frames_tensor = torch.stack(frames)
        caption_tensor = torch.tensor(numericalized_caption)
        return frames_tensor, caption_tensor

    def __len__(self):
        return len(self.data)

    def subset(self, indices):
        subset_data = [self.data[i] for i in indices]
        return SignLanguageDataset(self.root_dir, self.caption_mapping, self.vocab, self.transform, subset_data)

def load_captions(captions_file):
    with open(captions_file, 'r', encoding='utf-8') as file:
        captions = file.read().splitlines()
    # Create a mapping from folder number to caption
    caption_mapping = {str(i).zfill(4): caption for i, caption in enumerate(captions, 1)}
    return caption_mapping

def build_vocab(captions):
    tokenized_captions = [caption.lower().split() for caption in captions]
    vocab = build_vocab_from_iterator(tokenized_captions, specials=['<unk>', '<pad>', '<sos>', '<eos>'])
    vocab.set_default_index(vocab['<unk>'])
    return vocab

def numericalize(caption, vocab):
    return [vocab['<sos>']] + [vocab[token] for token in caption.lower().split()] + [vocab['<eos>']]

def collate_fn(batch):
    # Separate frames and captions
    frames, captions = zip(*batch)
    # Pad the captions to the same length
    captions_padded = pad_sequence(captions, batch_first=True, padding_value=vocab['<eos>'])
    # Stack frames
    frames_stacked = torch.stack(frames)
    return frames_stacked, captions_padded

# Load the data with the collate function
def get_data_loader(root_dir, caption_mapping, vocab, batch_size, transform, shuffle=True, num_workers=4):
    dataset = SignLanguageDataset(root_dir=root_dir, caption_mapping=caption_mapping, vocab=vocab, transform=transform)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, collate_fn=collate_fn)



In [10]:
caption_mapping = load_captions('../input/assignment3/Assignment03/groundTruth.txt')
all_captions = list(caption_mapping.values())
vocab = build_vocab(all_captions)

train_transform = transforms.Compose([
    transforms.Resize((256, 256)),  
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  
])
test_transform = transforms.Compose([
    transforms.Resize((256, 256)), 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
])

test_data_loader = get_data_loader(
    root_dir='../input/assignment3/Assignment03/test/',
    caption_mapping=caption_mapping,
    vocab=vocab,
    batch_size=1,
    transform=test_transform,
    shuffle=True,
    num_workers=2
)


In [12]:
from sklearn.model_selection import train_test_split

full_test_dataset = SignLanguageDataset(root_dir='../input/assignment3/Assignment03/train/',
                                        caption_mapping=caption_mapping,
                                        vocab=vocab,
                                        transform=train_transform)

captions = [str(full_test_dataset.data[i][1]) for i in range(len(full_test_dataset))]

train_indices, test_indices = train_test_split(range(len(full_test_dataset)),
                                               stratify=captions,
                                               test_size=0.2)  

train_dataset = full_test_dataset.subset(train_indices)
val_dataset = full_test_dataset.subset(test_indices)

train_data_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=2, collate_fn=collate_fn)
val_data_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, num_workers=2, collate_fn=collate_fn)

# Model Building

In [13]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.models.mobilenetv2 import MobileNet_V2_Weights

## Transformer

In [30]:
import torch
import torch.nn as nn
from torchvision import models
from torch.nn import TransformerEncoder, TransformerEncoderLayer, TransformerDecoder, TransformerDecoderLayer
import math
class TransformerModel(nn.Module):
    def __init__(self, feature_size, num_layers, num_heads, hidden_dim, vocab_size, max_seq_length, dropout=0.5):
        super(TransformerModel, self).__init__()
        self.feature_size = feature_size
        mobilenet_weights = models.MobileNet_V2_Weights.DEFAULT
        mobilenet = models.mobilenet_v2(weights=mobilenet_weights)
        modules = list(mobilenet.children())[:-1]
        self.mobilenet = nn.Sequential(*modules)
        for param in self.mobilenet.parameters():
            param.requires_grad = False

        self.linear_cnn = nn.Linear(mobilenet.last_channel*8*8, feature_size)
        self.pos_encoder = PositionalEncoding(feature_size, dropout)

        # Transformer Encoder
        encoder_layers = TransformerEncoderLayer(d_model=feature_size, nhead=num_heads, dim_feedforward=hidden_dim, dropout=dropout, batch_first=True)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers=num_layers)

        # Transformer Decoder
        self.embed = nn.Embedding(vocab_size, feature_size)
        decoder_layers = TransformerDecoderLayer(d_model=feature_size, nhead=num_heads, dim_feedforward=hidden_dim, dropout=dropout, batch_first=True)
        self.transformer_decoder = TransformerDecoder(decoder_layers, num_layers=num_layers)
        self.linear_vocab = nn.Linear(feature_size, vocab_size)

    def forward(self, frames, captions):
        # Extract and transform features from frames
        batch_size, num_frames, C, H, W = frames.size()
        frames = frames.view(batch_size * num_frames, C, H, W)
        cnn_features = self.mobilenet(frames)
        cnn_features = cnn_features.view(batch_size, num_frames, -1)
        cnn_features = self.linear_cnn(cnn_features)
        cnn_features = self.pos_encoder(cnn_features)
        
        # Transformer Encoder
        encoder_output = self.transformer_encoder(cnn_features)

        # Prepare captions for Transformer Decoder
        captions = self.embed(captions) * math.sqrt(self.feature_size)
        captions = self.pos_encoder(captions)
        decoder_output = self.transformer_decoder(captions, encoder_output)

        # Final Linear layer
        output = self.linear_vocab(decoder_output)
        return output

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


## Validation Function

In [31]:
def validate_model(model, val_data_loader, criterion, device):
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for frames, captions in val_data_loader:
            frames, captions = frames.to(device), captions.to(device)
            outputs = model(frames, captions[:, :-1])
            loss = criterion(outputs.view(-1, outputs.size(-1)), captions[:, 1:].contiguous().view(-1))
            total_val_loss += loss.item()
    return total_val_loss / len(val_data_loader)


## Train Loop

In [36]:
import torch
import torch.nn as nn
import torch.optim as optim

def train(model, model_filename, train_data_loader, val_data_loader, device, vocab, num_epochs=10):
    model.to(device)

    # Define loss function and optimizer
    criterion = nn.CrossEntropyLoss(ignore_index=vocab['<pad>'])
    optimizer = optim.Adam(model.parameters(), lr=0.0001)

    best_val_loss = float('inf')
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0

        for batch_idx, (frames, captions) in enumerate(train_data_loader):
            frames, captions = frames.to(device), captions.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(frames, captions[:, :-1])  # Exclude the <eos> token from the input

            # Calculate the loss
            loss = criterion(outputs.reshape(-1, outputs.size(-1)), captions[:, 1:].contiguous().view(-1))
            total_loss += loss.item()

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:  # Print loss every 10 batches
                print(f"Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(train_data_loader)}], Loss: {loss.item():.4f}")

        # Print epoch loss
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_data_loader):.4f}')

        # Validation phase
        val_loss = validate_model(model, val_data_loader, criterion, device)
        print(f'Epoch [{epoch+1}/{num_epochs}], Validation Loss: {val_loss:.4f}')

        # Check for early stopping
        if val_loss < best_val_loss:
            print(f'Saving Model! {val_loss} < {best_val_loss}')
            best_val_loss = val_loss
            torch.save(model.state_dict(), model_filename)


### Training Transformer

In [37]:
feature_size = 512 
num_layers = 3   
num_heads = 8     
hidden_dim = 2048  
vocab_size = len(vocab)  
max_seq_length = 20 

# Initialize the Transformer model
transformer_model = TransformerModel(feature_size, num_layers, num_heads, hidden_dim, vocab_size, max_seq_length, dropout=0.5)

# Check if CUDA is available and move the model to GPU if possible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
transformer_model.to(device)

# Start training the Transformer model
model_filename = "transformer_model_v3.pth"  # File name to save the model
train(transformer_model, model_filename, train_data_loader, val_data_loader, device, vocab, num_epochs=10)


Epoch [1/10], Step [1/107], Loss: 3.7180
Epoch [1/10], Step [11/107], Loss: 2.4077
Epoch [1/10], Step [21/107], Loss: 2.4192
Epoch [1/10], Step [31/107], Loss: 3.1973
Epoch [1/10], Step [41/107], Loss: 2.4625
Epoch [1/10], Step [51/107], Loss: 2.1275
Epoch [1/10], Step [61/107], Loss: 2.0370
Epoch [1/10], Step [71/107], Loss: 1.8708
Epoch [1/10], Step [81/107], Loss: 1.8195
Epoch [1/10], Step [91/107], Loss: 1.7743
Epoch [1/10], Step [101/107], Loss: 1.8511
Epoch [1/10], Loss: 2.1932
Epoch [1/10], Validation Loss: 0.5412
Saving Model! 0.5411722356470946 < inf
Epoch [2/10], Step [1/107], Loss: 1.2802
Epoch [2/10], Step [11/107], Loss: 0.9567
Epoch [2/10], Step [21/107], Loss: 0.7685
Epoch [2/10], Step [31/107], Loss: 0.6618
Epoch [2/10], Step [41/107], Loss: 0.7344
Epoch [2/10], Step [51/107], Loss: 0.7501
Epoch [2/10], Step [61/107], Loss: 0.6570
Epoch [2/10], Step [71/107], Loss: 0.7104
Epoch [2/10], Step [81/107], Loss: 0.5879
Epoch [2/10], Step [91/107], Loss: 0.4541
Epoch [2/10], S

## Testing on testing set

In [38]:
transformer_model.eval()

TransformerModel(
  (mobilenet): Sequential(
    (0): Sequential(
      (0): Conv2dNormActivation(
        (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU6(inplace=True)
      )
      (1): InvertedResidual(
        (conv): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
            (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): ReLU6(inplace=True)
          )
          (1): Conv2d(32, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (2): InvertedResidual(
        (conv): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(16, 96, kernel_size=(1,

In [40]:
def generate_caption(transformer_model, vocab, frames, device, max_length=20):
    start_token_idx = vocab['<sos>']
    sequence = torch.tensor([[start_token_idx]], dtype=torch.long).to(device)

    with torch.no_grad():
        for _ in range(max_length):
            output = transformer_model(frames, sequence)
            next_word_idx = output.argmax(2)[:, -1].item()
            sequence = torch.cat([sequence, torch.tensor([[next_word_idx]], dtype=torch.long).to(device)], dim=1)

            if next_word_idx == vocab['<eos>']:
                break

    caption = indices_to_string(sequence[0], vocab)
    return caption


def indices_to_string(indices, vocab):
    if isinstance(indices, torch.Tensor):
        indices = indices.tolist()

    # Convert each index to its corresponding word
    words = [vocab.get_itos()[idx] for idx in indices]

    # Exclude special tokens like <start>, <end>, and <pad> from the final sentence
    special_tokens = {'<sos>', '<eos>', '<pad>'}
    filtered_words = [word for word in words if word not in special_tokens]

    # Join the words to form a sentence
    sentence = ' '.join(filtered_words)

    return sentence



In [41]:
!pip install jiwer

Collecting jiwer
  Obtaining dependency information for jiwer from https://files.pythonhosted.org/packages/0d/4f/ee537ab20144811dd99321735ff92ef2b3a3230b77ed7454bed4c44d21fc/jiwer-3.0.3-py3-none-any.whl.metadata
  Downloading jiwer-3.0.3-py3-none-any.whl.metadata (2.6 kB)
Downloading jiwer-3.0.3-py3-none-any.whl (21 kB)
Installing collected packages: jiwer
Successfully installed jiwer-3.0.3


In [42]:
import jiwer

def calculate_wer(reference, hypothesis):
    return jiwer.wer(reference, hypothesis)

total_wer = 0
num_samples = 0

for frames, captions in test_data_loader:
    frames = frames.to(device)
    captions = captions.to(device)

    true_caption_indices = captions[0]
    generated_caption = generate_caption(transformer_model, vocab, frames, device)

    # Convert true caption indices to string
    true_caption = indices_to_string(true_caption_indices, vocab)

    # Calculate and accumulate WER
    wer = calculate_wer(true_caption, generated_caption)
    total_wer += wer
    num_samples += 1

    print(f"True Caption: {true_caption}")
    print(f"Generated Caption: {generated_caption}")
    print(f"WER: {wer}\n")

# Calculate average WER across the test set
average_wer = total_wer / num_samples
print(f"Average WER on the Test Set: {average_wer}")

True Caption: اليوم اقدم انتم برنامج اخر
Generated Caption: لا شرك الله
WER: 1.0

True Caption: الله اكبر
Generated Caption: لا شرك الله
WER: 1.5

True Caption: اسم الله
Generated Caption: الحمد الله
WER: 0.5

True Caption: اليوم اقدم انتم برنامج اخر
Generated Caption: لا شرك الله
WER: 1.0

True Caption: السلام عليكم رحمة الله بركة
Generated Caption: الحمد الله
WER: 0.8

True Caption: موضوع دراسة لغة الاشارة العربية
Generated Caption: لا شرك الله
WER: 1.0

True Caption: اليوم اقدم انتم برنامج اخر
Generated Caption: لا شرك الله
WER: 1.0

True Caption: لا شرك الله
Generated Caption: لا شرك الله
WER: 0.0

True Caption: اليوم اقدم انتم برنامج اخر
Generated Caption: لا شرك الله
WER: 1.0

True Caption: السلام عليكم رحمة الله بركة
Generated Caption: الحمد الله
WER: 0.8

True Caption: كلمات اليوم متفرقة في الدين
Generated Caption: لا شرك الله
WER: 1.0

True Caption: ايضا كلمات عادية
Generated Caption: لا شرك الله
WER: 1.0

True Caption: الله اكبر
Generated Caption: لا شرك الله
WER: 1.5

True C

## Average WER on the Test Set: 0.79999999999

This appears to be very bad results. Although we are performing well on both the training and validation set, it seems the testing set is giving bad results. this could be due to the vast difference in distribution between train and test sets.