In [None]:
# !pip install -q pytorchvideo evaluate

In [None]:
# pip install -q pyarrow==14.0.1

In [None]:
# pip install -q transformers --upgrade

In [None]:
# pip install -q torch==2.0.1 torchvision==0.15.2 --extra-index-url https://download.pytorch.org/whl/cu118 xformers==0.0.21

# **Data Collection and Aggregation**

In [1]:
import os
import pandas as pd

root_path = "/kaggle/input/nepali-sign-lang"
folder_list = os.listdir(root_path)
label_list = [path for path in folder_list if not path.endswith((".csv"))]

total_df = pd.read_csv(os.path.join(root_path,"train.csv"))

total_df.reset_index(drop = True, inplace = True)
total_df['label'].value_counts()

label
म घर मा धेरै काम गर्छु ।          16
म संग धेरै पैसा छैन ।             12
मेरो साथी हरु भक्तपुर मा छन् ।    12
मेरो घर भक्तपुर मा छ ।             8
मेरो धेरै साथी हरु छन् ।           8
Name: count, dtype: int64

# **Data Splitting**

In [2]:
from sklearn.model_selection import train_test_split

def correct_file_path(file_name: str, root_path: str):
    return os.path.join(root_path, file_name)

def preprocess_meta_df(df, root_path, label2id):
    df.rename(columns={"video_name": "video_path"}, inplace=True)
    df['video_path'] = df['video_path'].apply(lambda x: correct_file_path(x, root_path))
    df['label'] = df['label'].apply(lambda x: label2id[x])
    df['label'] = df['label']
    
    return df

train_meta_df, test_meta_df = train_test_split(total_df, test_size=0.2, stratify=total_df['label'], random_state=42)

label_list = list(set(train_meta_df['label']))
class_labels = sorted(label_list)
label2id = {label: i for i, label in enumerate(class_labels)}
id2label = {i: label for label, i in label2id.items()}

print(f"Unique classes: {list(label2id.keys())}.")

train_meta_df = preprocess_meta_df(train_meta_df, root_path, label2id)
test_meta_df = preprocess_meta_df(test_meta_df, root_path, label2id)

print("Splitted data:", len(train_meta_df), len(test_meta_df))

Unique classes: ['म घर मा धेरै काम गर्छु ।', 'म संग धेरै पैसा छैन ।', 'मेरो घर भक्तपुर मा छ ।', 'मेरो धेरै साथी हरु छन् ।', 'मेरो साथी हरु भक्तपुर मा छन् ।'].
Splitted data: 44 12


#  **Model Selection and Design**

## Preparing Configuration for the final Model

In [None]:
def get_config():
    return {
        "batch_size": 1,
        "num_epochs": 2,
        "lr": 10**-4,
        "seq_len": 3136,
        "d_model": 768,
        "lang_tgt": "ne",
        "model_folder": "weights",
        "model_basename": "tmodel_",
        "preload": "latest",
        "tokenizer_file": "tokenizer_{0}.json",
        "experiment_name": "/content/drive/MyDrive/English2Nepali/runs"
        # "experiment_name": "/content/drive/MyDrive/translation/runs"
    }

config=get_config()
device='cuda' if torch.cuda.is_available() else 'cpu'
device

## Pre-trained Encoder Model (Video Vision Transformer)

In [13]:
import torch
import pytorchvideo.data
from torch.utils.data import Dataset
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
from transformers import VivitImageProcessor, VivitModel, VivitForVideoClassification

from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    RandomShortSideScale,
    RemoveKey,
    ShortSideScale,
    UniformTemporalSubsample,
)

from torchvision.transforms import (
    Compose,
    Lambda,
    RandomCrop,
    RandomHorizontalFlip,
    Resize,
)

model_checkpoint = "google/vivit-b-16x2-kinetics400"
image_processor = VivitImageProcessor.from_pretrained(model_checkpoint)

# model = VivitForVideoClassification.from_pretrained(model_checkpoint)
vivit_model=VivitModel.from_pretrained(model_checkpoint)

Some weights of VivitModel were not initialized from the model checkpoint at google/vivit-b-16x2-kinetics400 and are newly initialized: ['vivit.pooler.dense.bias', 'vivit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## Don't Need the Final Pooler Layer

In [14]:
#Replace pooler layer with the identity function, it just returns what it gets

class Identity(torch.nn.Module):
    def __init__(self):
        super().__init__()
        
    def forward(self, x):
        return x
    

vivit_model.pooler=Identity()
# model.classifier=Identity()
# model.classifier=torch.nn.Linear(768,5)
vivit_model

VivitModel(
  (embeddings): VivitEmbeddings(
    (patch_embeddings): VivitTubeletEmbeddings(
      (projection): Conv3d(3, 768, kernel_size=(2, 16, 16), stride=(2, 16, 16))
    )
    (dropout): Dropout(p=0.0, inplace=False)
  )
  (encoder): VivitEncoder(
    (layer): ModuleList(
      (0-11): 12 x VivitLayer(
        (attention): VivitAttention(
          (attention): VivitSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.0, inplace=False)
          )
          (output): VivitSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.0, inplace=False)
          )
        )
        (intermediate): VivitIntermediate(
          (dense): Linear(in_features=768, out_features=3072, bias=True)
          (dropout): D

## Custom Decoder Model

In [16]:
import math

class InputEmbeddings(torch.nn.Module):

    def __init__(self, d_model: int=768, vocab_size: int=20) -> None:
        super().__init__()
        self.d_model = d_model
        self.vocab_size = vocab_size
        self.embedding = torch.nn.Embedding(vocab_size, d_model)

    def forward(self, x):
        # (batch, seq_len) --> (batch, seq_len, d_model)
        # Multiply by sqrt(d_model) to scale the embeddings according to the paper
        return self.embedding(x) * math.sqrt(self.d_model)


class PositionalEncoding(torch.nn.Module):

    def __init__(self, d_model: int=768, seq_len: int=3136, dropout: float=0.1) -> None:
        super().__init__()
        self.d_model = d_model
        self.seq_len = seq_len
        self.dropout = torch.nn.Dropout(dropout)
        # Create a matrix of shape (seq_len, d_model)
        pe = torch.zeros(seq_len, d_model)
        # Create a vector of shape (seq_len)
        position = torch.arange(
            0, seq_len, dtype=torch.float).unsqueeze(1)  # (seq_len, 1)
        # Create a vector of shape (d_model)
        div_term = torch.exp(torch.arange(0, d_model, 2).float(
        ) * (-math.log(10000.0) / d_model))  # (d_model / 2)
        # Apply sine to even indices
        # sin(position * (10000 ** (2i / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        # Apply cosine to odd indices
        # cos(position * (10000 ** (2i / d_model))
        pe[:, 1::2] = torch.cos(position * div_term)
        # Add a batch dimension to the positional encoding
        pe = pe.unsqueeze(0)  # (1, seq_len, d_model)
        # Register the positional encoding as a buffer
        self.register_buffer('pe', pe)

    def forward(self, x):
        # (batch, seq_len, d_model)
        x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
        return self.dropout(x)
    
    
    
class MultiHeadAttentionBlock(torch.nn.Module):

    def __init__(self, d_model: int=768, h: int=8, dropout: float=0.1) -> None:
        super().__init__()
        self.d_model = d_model  # Embedding vector size
        self.h = h  # Number of heads
        # Make sure d_model is divisible by h
        assert d_model % h == 0, "d_model is not divisible by h"

        self.d_k = d_model // h  # Dimension of vector seen by each head
        self.w_q = torch.nn.Linear(d_model, d_model, bias=False)  # Wq
        self.w_k = torch.nn.Linear(d_model, d_model, bias=False)  # Wk
        self.w_v = torch.nn.Linear(d_model, d_model, bias=False)  # Wv
        self.w_o = torch.nn.Linear(d_model, d_model, bias=False)  # Wo
        self.dropout = torch.nn.Dropout(dropout)

    @staticmethod
    def attention(query, key, value, mask, dropout: torch.nn.Dropout):
        d_k = query.shape[-1]
        # Just apply the formula from the paper
        # (batch, h, seq_len, d_k) --> (batch, h, seq_len, seq_len)
        attention_scores = (query @ key.transpose(-2, -1)) / math.sqrt(d_k)
        if mask is not None:
            # Write a very low value (indicating -inf) to the positions where mask == 0
            attention_scores.masked_fill_(mask == 0, -1e9)
        # (batch, h, seq_len, seq_len) # Apply softmax
        attention_scores = attention_scores.softmax(dim=-1)
        if dropout is not None:
            attention_scores = dropout(attention_scores)
        # (batch, h, seq_len, seq_len) --> (batch, h, seq_len, d_k)
        # return attention scores which can be used for visualization
        return (attention_scores @ value), attention_scores

    def forward(self, q, k, v, mask):
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        query = self.w_q(q)
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        key = self.w_k(k)
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        value = self.w_v(v)

        # (batch, seq_len, d_model) --> (batch, seq_len, h, d_k) --> (batch, h, seq_len, d_k)
        query = query.view(
            query.shape[0], query.shape[1], self.h, self.d_k).transpose(1, 2)
        key = key.view(key.shape[0], key.shape[1],
                       self.h, self.d_k).transpose(1, 2)
        value = value.view(
            value.shape[0], value.shape[1], self.h, self.d_k).transpose(1, 2)

        # Calculate attention
        x, self.attention_scores = MultiHeadAttentionBlock.attention(
            query, key, value, mask, self.dropout)

        # Combine all the heads together
        # (batch, h, seq_len, d_k) --> (batch, seq_len, h, d_k) --> (batch, seq_len, d_model)
        x = x.transpose(1, 2).contiguous().view(
            x.shape[0], -1, self.h * self.d_k)

        # Multiply by Wo
        # (batch, seq_len, d_model) --> (batch, seq_len, d_model)
        return self.w_o(x)
    

class LayerNormalization(torch.nn.Module):

    def __init__(self, features: int, eps: float = 10**-6) -> None:
        super().__init__()
        self.eps = eps
        # alpha is a learnable parameter
        self.alpha = torch.nn.Parameter(torch.ones(features))
        # bias is a learnable parameter
        self.bias = torch.nn.Parameter(torch.zeros(features))

    def forward(self, x):
        # x: (batch, seq_len, hidden_size)
        # Keep the dimension for broadcasting
        mean = x.mean(dim=-1, keepdim=True)  # (batch, seq_len, 1)
        # Keep the dimension for broadcasting
        std = x.std(dim=-1, keepdim=True)  # (batch, seq_len, 1)
        # eps is to prevent dividing by zero or when std is very small
        return self.alpha * (x - mean) / (std + self.eps) + self.bias


class FeedForwardBlock(torch.nn.Module):

    def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
        super().__init__()
        self.linear_1 = torch.nn.Linear(d_model, d_ff)  # w1 and b1
        self.dropout = torch.nn.Dropout(dropout)
        self.linear_2 = torch.nn.Linear(d_ff, d_model)  # w2 and b2

    def forward(self, x):
        # (batch, seq_len, d_model) --> (batch, seq_len, d_ff) --> (batch, seq_len, d_model)
        return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))


class ResidualConnection(torch.nn.Module):

    def __init__(self, features: int, dropout: float) -> None:
        super().__init__()
        self.dropout = torch.nn.Dropout(dropout)
        self.norm = LayerNormalization(features)

    # many transformer implmentation also do like this--> normalize the input + positional embedding, then apply mhsa and add skip connection.
    # here sublayer is MHSA
    def forward(self, x, sublayer):
      return x + self.dropout(self.norm(sublayer(x)))



class DecoderBlock(torch.nn.Module):

    def __init__(self, features: int, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
        super().__init__()
        self.self_attention_block = self_attention_block
        self.cross_attention_block = cross_attention_block
        self.feed_forward_block = feed_forward_block
        self.residual_connections = torch.nn.ModuleList(
            [ResidualConnection(features, dropout) for _ in range(3)])

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        x = self.residual_connections[0](
            x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
        x = self.residual_connections[1](x, lambda x: self.cross_attention_block(
            x, encoder_output, encoder_output, src_mask))
        x = self.residual_connections[2](x, self.feed_forward_block)
        return x


class Decoder(torch.nn.Module):

    def __init__(self, features: int, layers: torch.nn.ModuleList) -> None:
        super().__init__()
        self.layers = layers
        self.norm = LayerNormalization(features)

    def forward(self, x, encoder_output, src_mask, tgt_mask):
        for layer in self.layers:
            x = layer(x, encoder_output, src_mask, tgt_mask)
        return self.norm(x)


class ProjectionLayer(torch.nn.Module):

    def __init__(self, d_model, vocab_size) -> None:
        super().__init__()
        self.proj = torch.nn.Linear(d_model, vocab_size)

    def forward(self, x) -> None:
        # (batch, seq_len, d_model) --> (batch, seq_len, vocab_size)
        return self.proj(x)

## Building Video2Text Transformer Architecture

In [17]:
class Video2Text(torch.nn.Module):

    def __init__(self, encoder, src_video, decoder: Decoder, tgt_embed: InputEmbeddings, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
        super().__init__()
        self.video_encoder = encoder
        self.decoder = decoder
        self.src_video=src_video
        self.tgt_embed = tgt_embed
        self.tgt_pos = tgt_pos
        self.projection_layer = projection_layer

    def encode(self):
    # (batch,num_frames, num_channels, height, width)
        perumuted_sample_test_video = self.src_video.permute(0,2, 1, 3, 4)

        inputs = {
            "pixel_values": perumuted_sample_test_video,
        }
        # forward pass
        outputs = self.video_encoder(**inputs)
        return outputs.logits

    def decode(self, encoder_output: torch.Tensor, src_mask: torch.Tensor, tgt: torch.Tensor, tgt_mask: torch.Tensor):
        # (batch, seq_len, d_model)
        tgt = self.tgt_embed(tgt)
        tgt = self.tgt_pos(tgt)
        return self.decoder(tgt, encoder_output, src_mask, tgt_mask)

    def project(self, x):
        # (batch, seq_len, vocab_size)
        return self.projection_layer(x)

In [19]:
def build_transformer(encoder_model,src_video,tgt_vocab_size: int, tgt_seq_len: int, d_model: int = 512, N: int = 6, h: int = 8, dropout: float = 0.1, d_ff: int = 2048) -> Video2Text:
    
    # Create the embedding layers
    tgt_embed = InputEmbeddings(d_model, tgt_vocab_size)

    # Create the positional encoding layers
    tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout)

    # Create the decoder blocks
    decoder_blocks = []
    for _ in range(N):
        decoder_self_attention_block = MultiHeadAttentionBlock(
            d_model, h, dropout)
        decoder_cross_attention_block = MultiHeadAttentionBlock(
            d_model, h, dropout)
        feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout)
        decoder_block = DecoderBlock(d_model, decoder_self_attention_block,
                                     decoder_cross_attention_block, feed_forward_block, dropout)
        decoder_blocks.append(decoder_block)

    # Create the encoder and decoder
    video_encoder = encoder_model
    decoder = Decoder(d_model, torch.nn.ModuleList(decoder_blocks))

    # Create the projection layer
    projection_layer = ProjectionLayer(d_model, tgt_vocab_size)

    # Create the transformer
    transformer = Video2Text(
        encoder=video_encoder,src_video=src_video, decoder=decoder,tgt_embed=tgt_embed, tgt_pos=tgt_pos, projection_layer=projection_layer)

    # Initialize the parameters
    for p in transformer.parameters():
        if p.dim() > 1:
            torch.nn.init.xavier_uniform_(p)

    return transformer

In [20]:
def get_model(config,enc_model,src_video,vocab_tgt_len):
    v2t_model = build_transformer(encoder_model=enc_model,src_video=src_video, tgt_vocab_size=vocab_tgt_len,
                              tgt_seq_len=config['seq_len'], d_model=config['d_model'])
    return v2t_model

In [21]:
v2t_model=get_model(config,vivit_model,torch.randn(1,3,32,224,224),20)
v2t_model

Video2Text(
  (video_encoder): VivitModel(
    (embeddings): VivitEmbeddings(
      (patch_embeddings): VivitTubeletEmbeddings(
        (projection): Conv3d(3, 768, kernel_size=(2, 16, 16), stride=(2, 16, 16))
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): VivitEncoder(
      (layer): ModuleList(
        (0-11): 12 x VivitLayer(
          (attention): VivitAttention(
            (attention): VivitSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): VivitSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): VivitIntermediate(
            (dense):

# **Apply Necessary Transform and Prepare Dataset**

In [None]:
class CustomVideoDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        video_path = row['video_path']
        label = row['label']
        return video_path, label

mean = image_processor.image_mean
std = image_processor.image_std

if "shortest_edge" in image_processor.size:
    height = width = image_processor.size["shortest_edge"]
else:
    height = image_processor.size["height"]
    width = image_processor.size["width"]

resize_to = (model.config.image_size, model.config.image_size)

# num_frames_to_sample = model.config.num_frames
num_frames_to_sample = 32
clip_duration = 8

train_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    RandomShortSideScale(min_size=256, max_size=320),
                    Resize(resize_to),
                    RandomHorizontalFlip(p=0.5),
                ]
            ),
        ),
    ]
)

val_transform = Compose(
    [
        ApplyTransformToKey(
            key="video",
            transform=Compose(
                [
                    UniformTemporalSubsample(num_frames_to_sample),
                    Lambda(lambda x: x / 255.0),
                    Normalize(mean, std),
                    Resize(resize_to),
                ]
            ),
        ),
    ]
)

train_custom_dataset = CustomVideoDataset(train_meta_df)
train_labeled_video_paths = [(video_path, {'label': label}) for video_path, label in train_custom_dataset]

test_custom_dataset = CustomVideoDataset(test_meta_df)
test_labeled_video_paths = [(video_path, {'label': label}) for video_path, label in test_custom_dataset]

In [None]:
import imageio
import numpy as np
from IPython.display import Image

train_dataset = pytorchvideo.data.LabeledVideoDataset(
    labeled_video_paths =train_labeled_video_paths,
    clip_sampler=pytorchvideo.data.make_clip_sampler("random", clip_duration),
    decode_audio=False,
    transform=train_transform,
)

test_dataset = pytorchvideo.data.LabeledVideoDataset(
    labeled_video_paths =test_labeled_video_paths,
    clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
    decode_audio=False,
    transform=val_transform,
)

def unnormalize_img(img):
    img = (img * std) + mean
    img = (img * 255).astype("uint8")
    return img.clip(0, 255)

def create_gif(video_tensor, filename="sample.gif"):
    frames = []
    for video_frame in video_tensor:
        frame_unnormalized = unnormalize_img(video_frame.permute(1, 2, 0).numpy())
        frames.append(frame_unnormalized)
    kargs = {"duration": 5}
    imageio.mimsave(filename, frames, "GIF", **kargs)
    return filename

def display_gif(video_tensor, gif_name="sample.gif"):
    video_tensor = video_tensor.permute(1, 0, 2, 3)
    gif_filename = create_gif(video_tensor, gif_name)
    return Image(filename=gif_filename)

sample_video = next(iter(train_dataset))
video_tensor = sample_video["video"]
# print(id2label[sample_video['label']])
# display_gif(video_tensor)

In [None]:
next(iter(train_dataset))['label']

In [None]:
class CustomVideoDataset2(Dataset):

    def __init__(self, dataframe, tokenizer_tgt, tgt_lang, seq_len):
        super().__init__()
        self.seq_len = seq_len

        self.dataframe = dataframe
        self.tokenizer_tgt = tokenizer_tgt
        self.tgt_lang = tgt_lang

        self.sos_token = torch.tensor(
            [tokenizer_tgt.token_to_id("[SOS]")], dtype=torch.int64)
        self.eos_token = torch.tensor(
            [tokenizer_tgt.token_to_id("[EOS]")], dtype=torch.int64)
        self.pad_token = torch.tensor(
            [tokenizer_tgt.token_to_id("[PAD]")], dtype=torch.int64)

    def __len__(self):
#         return len(self.dataframe)
        return self.dataframe.num_videos

    def __getitem__(self, idx):
        #new code here
        
        video=next(iter(self.dataframe))['video']
        label=next(iter(self.dataframe))['label']
#         target_txt=next(iter(self.dataframe))['label']
        
#         src_target_pair = self.ds[index]
#         src_text = src_target_pair[self.src_lang]
#         tgt_text = src_target_pair[self.tgt_lang]

#         # Transform the output text into tokens
#         dec_input_tokens = self.tokenizer_tgt.encode(target_txt).ids

#         # Add sos, eos and padding to each sentence
#         enc_num_padding_tokens = self.seq_len - \
#             len(enc_input_tokens) - 2  # We will add <s> and </s>

#         # We will only add <s> here, and </s> only on the label
#         dec_num_padding_tokens = self.seq_len - len(dec_input_tokens) - 1

#         # Make sure the number of padding tokens is not negative. If it is, the sentence is too long
#         if dec_num_padding_tokens < 0:
#             raise ValueError("Sentence is too long")

#         # Add <s> and </s> token
#         encoder_input = torch.cat(
#             [
#                 self.sos_token,
#                 torch.tensor(enc_input_tokens, dtype=torch.int64),
#                 self.eos_token,
#                 torch.tensor([self.pad_token] *
#                              enc_num_padding_tokens, dtype=torch.int64),
#             ],
#             dim=0,
#         )

         # Add only <s> token
#         decoder_input = torch.cat(
#             [
#                 self.sos_token,
#                 torch.tensor(dec_input_tokens, dtype=torch.int64),
#                 torch.tensor([self.pad_token] *
#                              dec_num_padding_tokens, dtype=torch.int64),
#             ],
#             dim=0,
#         )

         # Add only </eos> token
#         label = torch.cat(
#             [
#                 torch.tensor(dec_input_tokens, dtype=torch.int64),
#                 self.eos_token,
#                 torch.tensor([self.pad_token] *
#                              dec_num_padding_tokens, dtype=torch.int64),
#             ],
#             dim=0,
#          )

#         # Double check the size of the tensors to make sure they are all seq_len long
#         assert encoder_input.size(0) == self.seq_len
#         assert decoder_input.size(0) == self.seq_len
#         assert label.size(0) == self.seq_len

        return {
#             "encoder_input": encoder_input,  # (seq_len)
#             "decoder_input": decoder_input,  # (seq_len)
#             # (1, 1, seq_len)
#             "encoder_mask": (encoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int(),
#             # (1, seq_len) & (1, seq_len, seq_len),
#             "decoder_mask": (decoder_input != self.pad_token).unsqueeze(0).int() & causal_mask(decoder_input.size(0)),
#             "label": label,  # (seq_len)
#             "src_text": src_text,
#             "tgt_text": tgt_text,
            "video":video,
            "label":label,
#             "decoder_input":decoder_input,
#             "decoder_mask":(decoder_input != self.pad_token).unsqueeze(0).int() & causal_mask(decoder_input.size(0)),
#             "tgt_text":target_txt
            
        }

def causal_mask(size):
    mask = torch.triu(torch.ones((1, size, size)), diagonal=1).type(torch.int)
    return mask == 0

In [None]:
from tokenizers import Tokenizer

target_tokenizer=Tokenizer.from_file(str('/kaggle/input/nepali-tokenizer/tokenizer_sign_lang_ne.json'))

new_train_dataset=CustomVideoDataset2(train_dataset,target_tokenizer,'ne',20)
new_val_dataset=CustomVideoDataset2(test_dataset,target_tokenizer,'ne',20)

# print(next(iter(new_train_dataset))['video'].shape,next(iter(new_train_dataset))['label'])
# print(next(iter(new_val_dataset))['video'].shape,next(iter(new_val_dataset))['label'])

# **Training Without Trainer**

In [None]:
from torch.utils.data import Dataset, DataLoader, random_split

## Prepare and Test Dataloader

In [None]:
train_dataloader = DataLoader(new_train_dataset, batch_size=1)
val_dataloader = DataLoader(new_val_dataset, batch_size=1)

In [None]:
for data in train_dataloader:
    print(data['video'].shape)
    break


In [None]:
data['video'].to(device)

## Optimizer and Loss Function

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=10**-4, eps=1e-9)
loss_fn = torch.nn.CrossEntropyLoss(label_smoothing=0.1).to(device)

In [None]:
# for p in model.parameters():
#     p.requries_grad=False
    
# for p in model.classifier.parameters():
#     p.requires_grad=True

## Training Loop

In [None]:
from tqdm import tqdm

loss_list_train=[]
loss_list_val=[]

for epoch in range(2):
    torch._C._cuda_emptyCache()
    model.train()
    batch_iterator = tqdm(train_dataloader, desc=f"Processing Epoch {epoch:02d}")
    acc_loss=0
    for batch in batch_iterator:

    #             encoder_input = batch['encoder_input'].to(device)  # (b, seq_len)
    #             decoder_input = batch['decoder_input'].to(device)  # (B, seq_len)
    #             encoder_mask = batch['encoder_mask'].to(
    #                 device)  # (B, 1, 1, seq_len)
    #             decoder_mask = batch['decoder_mask'].to(
    #                 device)  # (B, 1, seq_len, seq_len)

                # Run the tensors through the encoder, decoder and the projection layer
    #             encoder_output = model.encode(encoder_input, encoder_mask)  # (B, seq_len, d_model)
    #             decoder_output = model.decode(
    #                 encoder_output, encoder_mask, decoder_input, decoder_mask)  # (B, seq_len, d_model)
                # (B, seq_len, vocab_size)
    #             proj_output = model.project(decoder_output)

                logits=run_inference(model,batch['video'].to(device))
                
                # Compare the output with the label
                label = batch['label'].to(device)  # (B, seq_len)

                # Compute the loss using a simple cross entropy
                loss = loss_fn(logits, label.view(-1))
                acc_loss+=loss.item()
                # Log the loss
    #             writer.add_scalar('train loss', loss.item(), global_step)
    #             writer.flush()
                
        
                # Backpropagate the loss
                loss.backward()

                # Update the weights
                optimizer.step()
                optimizer.zero_grad(set_to_none=True)
            
    loss_list_train.append(acc_loss/len(train_dataloader))
    batch_iterator.set_postfix({"loss": f"{acc_loss/len(train_dataloader):6.3f}"})
    
    
    model.eval()
    acc_loss=0
    for batchv in val_dataloader:
        with torch.no_grad():
            logits=run_inference(model,batchv['video'].to(device))
            label = batchv['label'].to(device)
            
            loss = loss_fn(logits, label.view(-1))
            acc_loss+=loss.item()
    
    loss_list_val.append(acc_loss/len(val_dataloader))
    
    print(f"Training Loss: {loss_list_train[-1]}")
    print(f"Validation Loss: {loss_list_val[-1]}")
                
                

In [None]:
loss_list_train[-1], loss_list_val[-1]


In [None]:
# trained_model = VivitForVideoClassification.from_pretrained('/kaggle/working/vivit-b-16x2-kinetics400-nepali_sign_lang/checkpoint-88')

# **Running Tests**

## Plot Each Frames Extracted from the Video

In [None]:
import matplotlib.pyplot as plt

# Create subplots
fig, axs = plt.subplots(7, 7, figsize=(12, 12))

# Plot images
f=0
for i in range(7):
    for j in range(7):
        f+=1
        if f<36:
          im=frame['video'].permute(1,0,2,3)[f,:,:,:].squeeze(0).permute(1,2,0)
          axs[i, j].imshow(im)
        axs[i, j].set_title(f'frame: {f}')
        axs[i, j].axis('off')  # Hide axis

plt.tight_layout()
plt.show()

## Inference on Single Video

In [None]:
def run_inference(model, video):
    """Utility to run inference given a model and test video.

    The video is assumed to be preprocessed already.
    """
    # (batch,num_frames, num_channels, height, width)
    perumuted_sample_test_video = video.permute(0,2, 1, 3, 4)

    inputs = {
        "pixel_values": perumuted_sample_test_video,
    }
    # forward pass
#     with torch.no_grad():
    outputs = model(**inputs)
    return outputs.logits

In [None]:

# Calling this function will only allow you to free unreferenced memory in cache

# 
# print(logits.shape)
model.eval()
with torch.no_grad():
    logits = run_inference(model.to(device), data['video'].to(device))
torch._C._cuda_emptyCache()