# Imports

In [None]:
!pip install torcheval

Collecting torcheval
  Downloading torcheval-0.0.7-py3-none-any.whl.metadata (8.6 kB)
Downloading torcheval-0.0.7-py3-none-any.whl (179 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/179.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m179.2/179.2 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import torch
import torch.nn as nn
import torchvision
from torchvision.datasets import ImageFolder
from torchvision.transforms import ToTensor
from torcheval.metrics import WordErrorRate
from torch.utils.data import DataLoader
from torch.nn.functional import log_softmax
import matplotlib.pyplot as plt
import numpy as np
import zipfile
import os
import glob
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Downloading training and testing folders

In [None]:

# Path to the zip file on Google Drive (update this path)
zip_file_path = '/content/drive/MyDrive/ICS471 - HW3/train.zip'

# Destination folder where the unzipped content will be placed
destination_folder = '/content/unziped_train/'

# Create the destination folder if it doesn't exist
os.makedirs(destination_folder, exist_ok=True)

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

print(f'Files extracted to {destination_folder}')



zip_file_path = '/content/drive/MyDrive/ICS471 - HW3/test.zip'


destination_folder = '/content/unziped_test/'


os.makedirs(destination_folder, exist_ok=True)

# Unzip the file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(destination_folder)

print(f'Files extracted to {destination_folder}')



Files extracted to /content/unziped_train/
Files extracted to /content/unziped_test/


# processing the dataset and putting it in a valid form for training and testing

In [None]:
class NumericLabelDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (str): Root directory with all video folders.
            transform (callable, optional): Transform to apply to each frame.
        """
        self.root_dir = root_dir
        self.video_folders = []
        self.labels = []
        self.transform = transform

        # Collect all third-level folders and extract labels
        for folder1 in sorted(os.listdir(root_dir)):
            folder1_path = os.path.join(root_dir, folder1)
            if os.path.isdir(folder1_path):
                for folder2 in sorted(os.listdir(folder1_path)):
                    folder2_path = os.path.join(folder1_path, folder2)
                    print()
                    if os.path.isdir(folder2_path) and len(os.listdir(folder2_path)) == 80:
                        self.video_folders.append(folder2_path)
                        # Extract label from folder name (e.g., '01_0001_(...)')
                        label = int(folder1)  # Assuming label is in the second-level folder name
                        self.labels.append(label)

    def __len__(self):
        return len(self.video_folders)

    def __getitem__(self, idx):
        video_folder = self.video_folders[idx]
        frame_paths = sorted(glob.glob(os.path.join(video_folder, "*.jpg")))  # Adjust file extension if needed

        frames = []
        for frame_path in frame_paths:
            image = Image.open(frame_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            frames.append(image)

        # Stack frames into a tensor of shape (num_frames, channels, height, width)
        video_tensor = torch.stack(frames)
        label = self.labels[idx]
        return video_tensor, label

# Example Usage
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to MobileNetV2 input size
    transforms.ToTensor(),          # Convert to PyTorch tensor
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # ImageNet normalization
])

train_dataset = NumericLabelDataset(root_dir="/content/unziped_train/train_full", transform=transform)
test_dataset = NumericLabelDataset(root_dir="/content/unziped_test/test", transform=transform)








































































































































































































































































































































































































































































































































































































































In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=True)

In [None]:
def build_vocab(sentences):
    vocab = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}  # Special tokens
    for sentence in sentences:
        for word in sentence.split():
            if word not in vocab:
                vocab[word] = len(vocab)
    return vocab

# Example sentences from the uploaded file
sentences = [
    "اسم الله",
    "جميع الناس العرب",
    "السلام عليكم ورحمة الله وبركاته",
    "اليوم العالم يقدم برنامج أخر",
    "موضوع دراستنا عن الإشارات التعليمية",
    "كلمات اليوم مبتذلة في الدين",
    "إبقى كلمات هادئة",
    "لا تهتم",
    "الله أكبر"
]

vocab = build_vocab(sentences)
filepath = "/content/glove.6B.100d.txt"


def load_glove_embeddings(filepath, vocab, embedding_dim=100):
    embeddings_index = {}

    # Load pre-trained GloVe vectors
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs

    # Create embedding matrix for your vocabulary
    embedding_matrix = np.zeros((len(vocab), embedding_dim))
    for i, word in enumerate(vocab):
        if word in embeddings_index:
            embedding_matrix[i] = embeddings_index[word]
        else:
            embedding_matrix[i] = np.random.uniform(-0.1, 0.1, embedding_dim)  # Random initialization

    return torch.tensor(embedding_matrix, dtype=torch.float)




In [None]:
def tokenize_sentence(sentence, vocab):
    """
    Tokenize a sentence into a list of token indices based on the provided vocabulary.

    Args:
        sentence (str): The sentence to tokenize.
        vocab (dict): A mapping from words to token indices.

    Returns:
        List[int]: A list of token indices representing the sentence.
    """
    tokens = ['<sos>'] + sentence.split() + ['<eos>']  # Add <sos> and <eos> tokens
    return [vocab.get(word, vocab['<unk>']) for word in tokens]

# Example vocabulary
vocab = {
    '<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3,
    'اسم': 4, 'الله': 5, 'الحمد': 6, 'جميع': 7, 'الصم': 8,
    'العرب': 9, 'السامع': 10, 'السلام': 11, 'عليكم': 12, 'رحمة': 13,
    'بركة': 14, 'اليوم': 15, 'اقدم': 16, 'انتم': 17, 'برنامج': 18,
    'اخر': 19, 'موضوع': 20, 'دراسة': 21, 'لغة': 22, 'الاشارة': 23,
    'العربية': 24, 'كلمات': 25, 'متفرقة': 26, 'في': 27, 'الدين': 28,
    'ايضا': 29, 'عادية': 30, 'لا': 31, 'شرك': 32, 'اكبر': 33
}


# Mapping video numbers to sentences
video_to_sentence = {
    1: "اسم الله",
    2: "الحمد الله",
    3: "جميع الصم العرب السامع",
    4: "السلام عليكم رحمة الله بركة",
    5: "اليوم اقدم انتم برنامج اخر",
    6: "موضوع دراسة لغة الاشارة العربية",
    7: "كلمات اليوم متفرقة في الدين",
    8: "ايضا كلمات عادية",
    9: "لا شرك الله",
    10: "الله اكبر"
}


# Tokenize each sentence
video_to_tokens = {
    video_id: tokenize_sentence(sentence, vocab)
    for video_id, sentence in video_to_sentence.items()
}

# Print tokenized mapping
print(video_to_tokens)


{1: [1, 4, 5, 2], 2: [1, 6, 5, 2], 3: [1, 7, 8, 9, 10, 2], 4: [1, 11, 12, 13, 5, 14, 2], 5: [1, 15, 16, 17, 18, 19, 2], 6: [1, 20, 21, 22, 23, 24, 2], 7: [1, 25, 15, 26, 27, 28, 2], 8: [1, 29, 25, 30, 2], 9: [1, 31, 32, 5, 2], 10: [1, 5, 33, 2]}


In [None]:
embedding_matrix = load_glove_embeddings(filepath, vocab)

# First Model Impementation

In [None]:

class EncoderLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=1, bidirectional=False):
        super(EncoderLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers,
                            bidirectional=bidirectional, batch_first=True)

    def forward(self, features):
        # Input: (batch_size, seq_len, input_dim)
        outputs, (hidden, cell) = self.lstm(features)
        # Outputs: (batch_size, seq_len, hidden_dim * num_directions)
        # Hidden: (num_layers * num_directions, batch_size, hidden_dim)
        # Cell:   (num_layers * num_directions, batch_size, hidden_dim)
        return outputs, (hidden, cell)


In [None]:
class DecoderLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers=1, embedding_matrix=None):
        super(DecoderLSTM, self).__init__()

        # Embedding Layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        if embedding_matrix is not None:
            self.embedding.weight.data.copy_(embedding_matrix)  # Load pre-trained embeddings
            self.embedding.weight.requires_grad = False  # Freeze embeddings (optional)

        # LSTM Layer
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)

        # Fully Connected Layer
        self.fc_out = nn.Linear(hidden_dim, vocab_size)

    def forward(self, input_token, hidden, cell):
        """
        Forward pass of the decoder.

        Args:
            input_token (torch.Tensor): Current input token (batch_size,).
            hidden (torch.Tensor): Hidden state from the previous time step.
            cell (torch.Tensor): Cell state from the previous time step.

        Returns:
            output (torch.Tensor): Predicted probabilities for the next token (batch_size, vocab_size).
            hidden (torch.Tensor): Updated hidden state.
            cell (torch.Tensor): Updated cell state.
        """
        # 1. Embed the input token
        embedded = self.embedding(input_token).unsqueeze(1)  # Shape: (batch_size, 1, embedding_dim)

        # 2. Pass the embedded token through the LSTM
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))  # Output: (batch_size, 1, hidden_dim)

        # 3. Predict the next token
        prediction = self.fc_out(output.squeeze(1))  # Shape: (batch_size, vocab_size)

        return prediction, hidden, cell


In [None]:
class VideoClassificationModel(nn.Module):
    def __init__(self):
        super().__init__()
        # MobileNetV2 as feature extractor
        self.feature_extractor = torchvision.models.mobilenet_v2(weights=torchvision.models.MobileNet_V2_Weights.DEFAULT)
        self.feature_extractor = nn.Sequential(
            *list(self.feature_extractor.features),  # Keep the feature layers
            nn.AdaptiveAvgPool2d((1, 1))  # Add global average pooling
        )

    def forward(self, video):
        batch_size, num_frames, C, H, W = video.size()  # (B, 80, 3, 224, 224)

        # Reshape to process each frame individually
        video = video.view(-1, C, H, W)  # (B * num_frames, 3, 224, 224)

        # Extract features
        with torch.no_grad():
            features = self.feature_extractor(video)  # (B * num_frames, 1280, 1, 1)

        # Remove the last two dimensions (1, 1) from global pooling
        features = features.view(batch_size, num_frames, -1)  # (B, 80, 1280)

        return features


In [None]:
from torch.nn.utils.rnn import pad_sequence
import torch

def process_batch_trg(batch_labels, video_to_tokens, vocab):
    """
    Convert a batch of video labels into padded tokenized sequences.

    Args:
        batch_labels (torch.Tensor): A batch of video labels (e.g., [1, 3, 2, 5, 8])
        video_to_tokens (dict): A dictionary mapping video IDs to tokenized sequences.
        vocab (dict): Vocabulary mapping words to indices.

    Returns:
        torch.Tensor: Padded tokenized sequences (batch_size, max_seq_len).
    """
    # Convert video labels to tokenized sequences
    tokenized_sequences = [torch.tensor(video_to_tokens[label.item()]) for label in batch_labels]

    # Pad the sequences to the same length (max_seq_len)
    padded_sequences = pad_sequence(tokenized_sequences, batch_first=True, padding_value=vocab['<pad>'])

    return padded_sequences


# Train

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random

# Loss function and optimizer
device = "cuda" if torch.cuda.is_available() else "cpu"

encoder = EncoderLSTM(input_dim=1280 , hidden_dim=512).to(device)
decoder = DecoderLSTM(vocab_size=len(vocab), embedding_dim=100, hidden_dim=512).to(device)
feautre_extractor = VideoClassificationModel().to(device)

Epoch = 10
criterion = nn.CrossEntropyLoss()
params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.Adam(params, lr=0.001)

teacher_forcing_ratio = 1 # Probability of using ground truth token as input

# Training Loop
for epoch in range(Epoch):
    encoder.train()
    decoder.train()

    epoch_loss = 0

    for src, trg in train_dataloader:
        # Step 1: Process `trg` (convert video IDs to tokenized sequences)
        trg = process_batch_trg(trg, video_to_tokens, vocab)

        # Step 2: Move data to device
        src, trg = src.to(device), trg.to(device)

        # Step 3: Reset gradients
        optimizer.zero_grad()

        feature = feautre_extractor(src)

        # Step 4: Forward pass through encoder
        output,(hidden, cell) = encoder(feature)

        # Step 5: Initialize decoder input with <sos> token
        input_token = trg[:, 0]  # First token in every sequence (batch_size,)

        loss = 0

        # Step 6: Loop through the target sequence
        for t in range(1, trg.size(1)):  # Start from the second token
            output, hidden, cell = decoder(input_token, hidden,cell)  # Forward pass through decoder

            # Compute loss
            loss += criterion(output, trg[:, t])  # Compare output with ground truth token

            # Teacher forcing: Use ground truth or predicted token as next input
            teacher_force = random.random() < teacher_forcing_ratio
            input_token = trg[:, t] if teacher_force else output.argmax(1)

        # Step 7: Backpropagation and optimization
        loss.backward()
        optimizer.step()

        # Normalize loss by sequence length and accumulate
        epoch_loss += loss.item() / trg.size(1)

    # Print epoch loss
    print(f"Epoch {epoch+1}/{Epoch}, Loss: {epoch_loss / len(train_dataloader):.4f}")


Downloading: "https://download.pytorch.org/models/mobilenet_v2-7ebf99e0.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v2-7ebf99e0.pth
100%|██████████| 13.6M/13.6M [00:00<00:00, 218MB/s]


Epoch 1/10, Loss: 0.9590
Epoch 2/10, Loss: 0.2950
Epoch 3/10, Loss: 0.2284
Epoch 4/10, Loss: 0.2012
Epoch 5/10, Loss: 0.1394
Epoch 6/10, Loss: 0.1085
Epoch 7/10, Loss: 0.0653
Epoch 8/10, Loss: 0.0515
Epoch 9/10, Loss: 0.0336
Epoch 10/10, Loss: 0.0264


In [None]:
def evaluate_model_with_wer(encoder, decoder,extractor,test_dataloader, video_to_tokens, vocab, max_len=50):
    """
    Evaluate the model on the test set and compute WER using TorchEval.

    Args:
        encoder (nn.Module): Encoder model.
        decoder (nn.Module): Decoder model.
        test_dataloader (DataLoader): Dataloader for the test set.
        video_to_tokens (dict): Mapping of video IDs to tokenized sequences.
        vocab (dict): Vocabulary mapping indices to words.
        max_len (int): Maximum length for generated sentences.

    Returns:
        float: Average WER across the test set.
    """
    reverse_vocab = {idx: word for word, idx in vocab.items()}  # Reverse vocab for decoding
    wer_metric = WordErrorRate()  # Initialize WER metric

    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        for src, trg_labels in test_dataloader:
            # Move data to device
            src, trg_labels = src.to(device), trg_labels.to(device)

            # Convert ground truth labels to tokenized sentences
            trg = process_batch_trg(trg_labels, video_to_tokens, vocab)

            feature = extractor(src)
            # Forward pass through encoder
            encoder_outputs, (hidden, cell) = encoder(feature)

            # Initialize input token (<sos>)
            input_token = torch.tensor([vocab['<sos>']] * src.size(0)).to(device)

            # Generate predictions
            predictions = []
            for _ in range(max_len):
                output, hidden, cell = decoder(input_token, hidden, cell)
                top1 = output.argmax(1)  # Get most probable token
                predictions.append(top1)
                input_token = top1  # Use predicted token as next input

            # Stack predictions into sequence
            predictions = torch.stack(predictions, dim=1)  # (batch_size, max_len)

            # Convert predictions and ground truths to strings
            predicted_sentences = [
                " ".join([reverse_vocab[idx.item()] for idx in prediction if idx.item() not in {vocab['<pad>'], vocab['<sos>'], vocab['<eos>']}])
                for prediction in predictions
            ]
            reference_sentences = [
                " ".join([reverse_vocab[idx.item()] for idx in reference if idx.item() not in {vocab['<pad>'], vocab['<sos>'], vocab['<eos>']}])
                for reference in trg
            ]

            # Update WER metric with current batch
            wer_metric.update(predicted_sentences, reference_sentences)

    # Compute the final WER
    return wer_metric.compute()


In [None]:
acc = evaluate_model_with_wer(encoder,decoder,feautre_extractor,test_dataloader,video_to_tokens,vocab)

In [None]:
decoder1 = decoder.to(device)
encoder1 = encoder.to(device)

In [None]:
torch.save(decoder1.state_dict(), 'decoder_1.pth')
torch.save(encoder1.state_dict(), 'encoder_1.pth')

RuntimeError: Parent directory content does not exist.

In [None]:
print(f"WER of an LSTM model : {acc} ")

WER of an LSTM model : 0.9788135886192322 


# Trying more Layers to check if it will help

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random

# Loss function and optimizer
device = "cuda" if torch.cuda.is_available() else "cpu"

encoder = EncoderLSTM(input_dim=1280 ,num_layers=2, hidden_dim=512).to(device)
decoder = DecoderLSTM(vocab_size=len(vocab),num_layers = 2, embedding_dim=100, hidden_dim=512).to(device)
feautre_extractor = VideoClassificationModel().to(device)

Epoch = 10
criterion = nn.CrossEntropyLoss()
params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.Adam(params, lr=0.001)

teacher_forcing_ratio = 1 # Probability of using ground truth token as input

# Training Loop
for epoch in range(Epoch):
    encoder.train()
    decoder.train()

    epoch_loss = 0

    for src, trg in train_dataloader:
        # Step 1: Process `trg` (convert video IDs to tokenized sequences)
        trg = process_batch_trg(trg, video_to_tokens, vocab)

        # Step 2: Move data to device
        src, trg = src.to(device), trg.to(device)

        # Step 3: Reset gradients
        optimizer.zero_grad()

        feature = feautre_extractor(src)

        # Step 4: Forward pass through encoder
        output,(hidden, cell) = encoder(feature)

        # Step 5: Initialize decoder input with <sos> token
        input_token = trg[:, 0]  # First token in every sequence (batch_size,)

        loss = 0

        # Step 6: Loop through the target sequence
        for t in range(1, trg.size(1)):  # Start from the second token
            output, hidden, cell = decoder(input_token, hidden,cell)  # Forward pass through decoder

            # Compute loss
            loss += criterion(output, trg[:, t])  # Compare output with ground truth token

            # Teacher forcing: Use ground truth or predicted token as next input
            teacher_force = random.random() < teacher_forcing_ratio
            input_token = trg[:, t] if teacher_force else output.argmax(1)

        # Step 7: Backpropagation and optimization
        loss.backward()
        optimizer.step()

        # Normalize loss by sequence length and accumulate
        epoch_loss += loss.item() / trg.size(1)

    # Print epoch loss
    print(f"Epoch {epoch+1}/{Epoch}, Loss: {epoch_loss / len(train_dataloader):.4f}")


Epoch 1/10, Loss: 1.0780
Epoch 2/10, Loss: 0.3207
Epoch 3/10, Loss: 0.2860
Epoch 4/10, Loss: 0.2549
Epoch 5/10, Loss: 0.2246
Epoch 6/10, Loss: 0.2002
Epoch 7/10, Loss: 0.1473
Epoch 8/10, Loss: 0.1455
Epoch 9/10, Loss: 0.0984
Epoch 10/10, Loss: 0.1151


In [None]:
acc = evaluate_model_with_wer(encoder,decoder,feautre_extractor,test_dataloader,video_to_tokens,vocab)

NameError: name 'evaluate_model_with_wer' is not defined

In [None]:
print(f"WER of an LSTM model with 2 Layers : {acc} ")

# With Attention

In [None]:
import torch
import torch.nn.functional as F

class Attention(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, hidden, encoder_outputs):
        """
        Compute dot-product attention.

        Args:
            hidden (torch.Tensor): Decoder hidden state (batch_size, decoder_hidden_dim).
            encoder_outputs (torch.Tensor): Encoder outputs (batch_size, seq_len, encoder_hidden_dim).

        Returns:
            context (torch.Tensor): Context vector (batch_size, encoder_hidden_dim).
            attention_weights (torch.Tensor): Attention weights (batch_size, seq_len).
        """
        # Compute dot product between hidden state and encoder outputs
        # hidden: (batch_size, decoder_hidden_dim)
        # encoder_outputs: (batch_size, seq_len, encoder_hidden_dim)
        if hidden.dim() == 3 and hidden.size(0) == 1:
            hidden = hidden.squeeze(0)
        attention_scores = torch.bmm(encoder_outputs, hidden.unsqueeze(2)).squeeze(2)  / (hidden.size(-1) ** 0.5)# (batch_size, seq_len)

        # Normalize scores with softmax
        attention_weights = F.softmax(attention_scores, dim=1)  # (batch_size, seq_len)

        # Compute context vector as weighted sum of encoder outputs
        context = torch.bmm(attention_weights.unsqueeze(1), encoder_outputs).squeeze(1)  # (batch_size, encoder_hidden_dim)

        return context, attention_weights


In [None]:
class DecoderWithAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, encoder_hidden_dim, decoder_hidden_dim, embedding_matrix = None):
        super().__init__()

        # Embedding layer


        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        if embedding_matrix is not None:
            self.embedding.weight.data.copy_(embedding_matrix)  # Load pre-trained embeddings
            self.embedding.weight.requires_grad = False  # Freeze embeddings (optional)

        # LSTM layer
        self.lstm = nn.LSTM(embedding_dim + encoder_hidden_dim, decoder_hidden_dim, batch_first=True)

        # Fully connected layer
        self.fc_out = nn.Linear(decoder_hidden_dim, vocab_size)

        # Attention mechanism
        self.attention = Attention()

    def forward(self, input_token, hidden, cell, encoder_outputs):
        """
        Forward pass for the decoder with attention.
        """
        # Ensure hidden and cell have the correct shape
        if hidden.dim() == 4:
            hidden = hidden.squeeze(0)  # Remove the extra dimension
        if cell.dim() == 4:
            cell = cell.squeeze(0)

        # Embed the input token
        embedded = self.embedding(input_token).unsqueeze(1)  # (batch_size, 1, embedding_dim)

        # Compute attention
        context, attention_weights = self.attention(hidden, encoder_outputs)

        # Concatenate context and embedded input
        lstm_input = torch.cat((embedded, context.unsqueeze(1)), dim=2)  # (batch_size, 1, embedding_dim + encoder_hidden_dim)


        # Pass through LSTM
        output, (hidden, cell) = self.lstm(lstm_input, (hidden, cell))

        # Predict next token
        prediction = self.fc_out(output.squeeze(1))  # (batch_size, vocab_size)

        return prediction, hidden, cell, attention_weights


# Train

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import random

# Loss function and optimizer
device = "cuda" if torch.cuda.is_available() else "cpu"

encoder = EncoderLSTM(input_dim=1280, hidden_dim=512).to(device)
decoder = DecoderWithAttention(vocab_size=len(vocab),
                               embedding_dim=100,
                               encoder_hidden_dim=512,
                               decoder_hidden_dim=512,
                               embedding_matrix=embedding_matrix).to(device)
feautre_extractor = VideoClassificationModel().to(device)

Epoch = 10
criterion = nn.CrossEntropyLoss()
params = list(encoder.parameters()) + list(decoder.parameters())
optimizer = optim.Adam(params, lr=0.01)

teacher_forcing_ratio = 0.5 # Probability of using ground truth token as input

# Training Loop
for epoch in range(Epoch):
    encoder.train()
    decoder.train()

    epoch_loss = 0

    for src, trg in train_dataloader:
        # Step 1: Process `trg` (convert video IDs to tokenized sequences)
        trg = process_batch_trg(trg, video_to_tokens, vocab)

        # Step 2: Move data to device
        src, trg = src.to(device), trg.to(device)

        # Step 3: Reset gradients
        optimizer.zero_grad()

        feature = feautre_extractor(src)

        # Step 4: Forward pass through encoder
        encoder_output,(hidden, cell) = encoder(feature)

        # Step 5: Initialize decoder input with <sos> token
        input_token = trg[:, 0]  # First token in every sequence (batch_size,)

        loss = 0

        # Step 6: Loop through the target sequence
        for t in range(1, trg.size(1)):  # Start from the second token

            decoder_output, hidden, cell, attention = decoder(input_token, hidden,cell,encoder_output)  # Forward pass through decoder

            # Compute loss
            loss += criterion(decoder_output, trg[:, t])  # Compare output with ground truth token

            # Teacher forcing: Use ground truth or predicted token as next input
            teacher_force = random.random() < teacher_forcing_ratio
            input_token = trg[:, t] if teacher_force else decoder_output.argmax(1)

        # Step 7: Backpropagation and optimization
        loss.backward()
        optimizer.step()

        # Normalize loss by sequence length and accumulate
        epoch_loss += loss.item() / trg.size(1)

    # Print epoch loss
    print(f"Epoch {epoch+1}/{Epoch}, Loss: {epoch_loss / len(train_dataloader):.4f}")


Epoch 1/10, Loss: 2.0175
Epoch 2/10, Loss: 1.6782
Epoch 3/10, Loss: 1.4863
Epoch 4/10, Loss: 1.3510
Epoch 5/10, Loss: 1.2655
Epoch 6/10, Loss: 1.1841
Epoch 7/10, Loss: 1.2468
Epoch 8/10, Loss: 1.0771
Epoch 9/10, Loss: 1.0831
Epoch 10/10, Loss: 1.0700


# Test

In [None]:
from torcheval.metrics import WordErrorRate

def evaluate_model_attention_with_wer(encoder, decoder,extractor,test_dataloader, video_to_tokens, vocab, max_len=50):
    """
    Evaluate the model on the test set and compute WER using TorchEval.

    Args:
        encoder (nn.Module): Encoder model.
        decoder (nn.Module): Decoder model.
        test_dataloader (DataLoader): Dataloader for the test set.
        video_to_tokens (dict): Mapping of video IDs to tokenized sequences.
        vocab (dict): Vocabulary mapping indices to words.
        max_len (int): Maximum length for generated sentences.

    Returns:
        float: Average WER across the test set.
    """
    reverse_vocab = {idx: word for word, idx in vocab.items()}  # Reverse vocab for decoding
    wer_metric = WordErrorRate()  # Initialize WER metric

    encoder.eval()
    decoder.eval()

    with torch.no_grad():
        for src, trg_labels in test_dataloader:
            # Move data to device
            src, trg_labels = src.to(device), trg_labels.to(device)

            # Convert ground truth labels to tokenized sentences
            trg = process_batch_trg(trg_labels, video_to_tokens, vocab)

            feature = extractor(src)
            # Forward pass through encoder
            encoder_outputs, (hidden, cell) = encoder(feature)

            # Initialize input token (<sos>)
            input_token = torch.tensor([vocab['<sos>']] * src.size(0)).to(device)

            # Generate predictions
            predictions = []
            for _ in range(max_len):
                output, hidden, cell,att = decoder(input_token, hidden, cell,encoder_outputs)
                top1 = output.argmax(1)  # Get most probable token
                predictions.append(top1)
                input_token = top1  # Use predicted token as next input

            # Stack predictions into sequence
            predictions = torch.stack(predictions, dim=1)  # (batch_size, max_len)

            # Convert predictions and ground truths to strings
            predicted_sentences = [
                " ".join([reverse_vocab[idx.item()] for idx in prediction if idx.item() not in {vocab['<pad>'], vocab['<sos>'], vocab['<eos>']}])
                for prediction in predictions
            ]
            reference_sentences = [
                " ".join([reverse_vocab[idx.item()] for idx in reference if idx.item() not in {vocab['<pad>'], vocab['<sos>'], vocab['<eos>']}])
                for reference in trg
            ]

            # Update WER metric with current batch
            wer_metric.update(predicted_sentences, reference_sentences)

    # Compute the final WER
    return wer_metric.compute()


In [None]:
decoder2 = decoder.to(device)
encoder2 = encoder.to(device)

In [None]:
torch.save(decoder2.state_dict(), 'decoder_2.pth')
torch.save(encoder2.state_dict(), 'encoder_2.pth')

In [None]:
acc1 = evaluate_model_attention_with_wer(encoder,decoder,feautre_extractor,test_dataloader,video_to_tokens,vocab)

In [None]:
print(f"WER on LSTM with Attention : {acc1}")

WER on LSTM with Attention : 1.0677965879440308


# Transformer

In [None]:
class TransformerVideoToText(nn.Module):
    def __init__(self, feature_dim, vocab_size, embed_dim, num_heads, num_encoder_layers, num_decoder_layers, dropout=0.1, max_seq_len=80):
        super(TransformerVideoToText, self).__init__()

        # Embedding for tokens
        self.embedding = nn.Embedding(vocab_size, embed_dim)

        # Positional Encoding for video features and token embeddings
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_len, embed_dim))

        # Linear layer to map video features to the embedding dimension
        self.feature_projection = nn.Linear(feature_dim, embed_dim)

        # Transformer
        self.transformer = nn.Transformer(
            d_model=embed_dim,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout,
            batch_first=True,
        )

        # Linear layer for vocabulary prediction
        self.fc_out = nn.Linear(embed_dim, vocab_size)

        # Dropout
        self.dropout = nn.Dropout(dropout)

    def forward(self, src, trg):
        """
        Args:
            src (Tensor): Encoder inputs (batch_size, seq_len, feature_dim).
            trg (Tensor): Decoder inputs (batch_size, seq_len).
        Returns:
            Tensor: Log probabilities for each token (batch_size, seq_len, vocab_size).
        """
        # Project video features to the embedding dimension
        src = self.feature_projection(src)

        # Add positional encoding to the encoder inputs
        src = self.dropout(src + self.positional_encoding[:, :src.size(1), :])

        # Add positional encoding to the decoder inputs
        trg = self.embedding(trg) + self.positional_encoding[:, :trg.size(1), :]
        trg = self.dropout(trg)

        # Transformer forward pass
        transformer_out = self.transformer(src, trg)

        # Output predictions
        output = self.fc_out(transformer_out)
        return log_softmax(output, dim=-1)  # Log probabilities


In [None]:
# Loss function and optimizer
device = "cuda" if torch.cuda.is_available() else "cpu"

# Define model
feature_extractor = VideoClassificationModel().to(device)
transformer_model = TransformerVideoToText(
    feature_dim=1280 ,
    vocab_size=len(vocab),
    embed_dim=512,
    num_heads=8,
    num_encoder_layers=4,
    num_decoder_layers=4,
    dropout=0.1,
    max_seq_len=80
).to(device)

criterion = nn.CrossEntropyLoss(ignore_index=vocab['<pad>'])
optimizer = torch.optim.Adam(transformer_model.parameters(), lr=0.0001)

teacher_forcing_ratio = 0.5  # Probability of using ground truth token as input

# Training Loop
Epoch = 10
for epoch in range(Epoch):
    transformer_model.train()
    epoch_loss = 0

    for src, trg in train_dataloader:

        # Step 1: Process `trg` (convert video IDs to tokenized sequences)
        trg = process_batch_trg(trg, video_to_tokens, vocab)

        # Step 2: Move data to device
        src, trg = src.to(device), trg.to(device)


        feature = feature_extractor(src)

        # Step 3: Reset gradients
        optimizer.zero_grad()

        # Step 4: Prepare input and target sequences
        trg_input = trg[:, :-1]  # Input tokens for the decoder (<sos> to last token)
        trg_output = trg[:, 1:]  # Target tokens for loss computation (second token to <eos>)

        # Step 5: Forward pass
        outputs = transformer_model(feature, trg_input)

        # Step 6: Compute loss
        outputs = outputs.reshape(-1, outputs.size(-1))  # Flatten outputs for loss computation
        trg_output = trg_output.reshape(-1)  # Flatten targets
        loss = criterion(outputs, trg_output)

        # Step 7: Backpropagation and optimization
        loss.backward()
        optimizer.step()

        # Normalize loss by sequence length and accumulate
        epoch_loss += loss.item()

    # Print epoch loss
    print(f"Epoch {epoch + 1}/{Epoch}, Loss: {epoch_loss / len(train_dataloader):.4f}")


Downloading: "https://download.pytorch.org/models/mobilenet_v2-7ebf99e0.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v2-7ebf99e0.pth
100%|██████████| 13.6M/13.6M [00:00<00:00, 79.8MB/s]


Epoch 1/10, Loss: 1.0660
Epoch 2/10, Loss: 0.0131
Epoch 3/10, Loss: 0.0064
Epoch 4/10, Loss: 0.0044
Epoch 5/10, Loss: 0.0033
Epoch 6/10, Loss: 0.0026
Epoch 7/10, Loss: 0.0022
Epoch 8/10, Loss: 0.0018
Epoch 9/10, Loss: 0.0015
Epoch 10/10, Loss: 0.0013


In [None]:
from torcheval.metrics import WordErrorRate

def test_model(transformer_model, feature_extractor, test_dataloader, vocab, video_to_tokens, max_len=50):
    """
    Test the Transformer model on the test set and compute WER.

    Args:
        transformer_model (nn.Module): Transformer model.
        feature_extractor (nn.Module): Video feature extractor.
        test_dataloader (DataLoader): Dataloader for the test set.
        vocab (dict): Vocabulary mapping tokens to indices.
        video_to_tokens (dict): Mapping from video IDs to tokenized sequences.
        max_len (int): Maximum sequence length for decoding.

    Returns:
        float: Word Error Rate (WER) on the test set.
    """
    reverse_vocab = {idx: word for word, idx in vocab.items()}  # Reverse vocab for decoding
    wer_metric = WordErrorRate()  # Initialize WER metric

    transformer_model.eval()
    feature_extractor.eval()

    with torch.no_grad():
        for src, trg in test_dataloader:
            # Step 1: Move data to device
            src, trg = src.to(device), trg.to(device)

            # Step 2: Extract features and reshape `src`
            feature = feature_extractor(src)  # (batch_size, channels, height, width)
            batch_size, frames, features = feature.size()

            src = feature.view(batch_size, frames, features)  # Reshape to (batch_size, seq_len, feature_dim)

            # Step 3: Convert `trg` to tokenized sequences
            trg = process_batch_trg(trg, video_to_tokens, vocab)

            # Step 4: Initialize decoding variables
            input_token = torch.tensor([vocab['<sos>']] * batch_size).unsqueeze(1).to(device)  # Start with <sos>
            predictions = []

            # Step 5: Generate sequences (greedy decoding)
            for _ in range(max_len):
                outputs = transformer_model(src, input_token)  # Forward pass
                next_token = outputs[:, -1, :].argmax(dim=-1).unsqueeze(1)  # Get most probable token
                predictions.append(next_token)
                input_token = torch.cat([input_token, next_token], dim=1)  # Append token

                # Stop decoding if all sequences generate <eos>
                if (next_token == vocab['<eos>']).all():
                    break

            # Step 6: Stack predictions and remove special tokens
            predictions = torch.cat(predictions, dim=1)  # (batch_size, seq_len)
            predicted_sentences = [
                " ".join([reverse_vocab[idx.item()] for idx in pred if idx.item() not in {vocab['<pad>'], vocab['<sos>'], vocab['<eos>']}])
                for pred in predictions
            ]
            reference_sentences = [
                " ".join([reverse_vocab[idx.item()] for idx in ref if idx.item() not in {vocab['<pad>'], vocab['<sos>'], vocab['<eos>']}])
                for ref in trg
            ]

            # Step 7: Update WER metric
            wer_metric.update(predicted_sentences, reference_sentences)

    # Compute final WER
    return wer_metric.compute()


In [None]:
acc2 = test_model(transformer_model,feature_extractor,test_dataloader,vocab,video_to_tokens)

In [None]:
print(f"The WER for the Transformer is: {acc2}")

The WER for the Transformer is: 1.0
