In [1]:
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
import cv2
import mediapipe as mp
import torch
import pandas as pd
from scipy.fft import fft, ifft
from torchvision.transforms import v2
import os
import timm


In [2]:
transform = v2.Compose(
    [
    v2.ToImage(),
    v2.Resize([100,160]),
    # v2.RandomHorizontalFlip(p=0.5),
    v2.ToTensor(),
    ])



In [3]:
# create dataset
from torch.nn.utils.rnn import pad_sequence

class VideoDataset(torch.utils.data.Dataset):
    def __init__(self, dir, transform ):
        super().__init__()
        self.dir = dir
        self.transform = transform
        self.list_of_frames, self.list_of_class_id, self.list_of_class_name = self.walk(self.dir)
        
        self.list_map_class = self.makeDictClass(dir=self.dir)
    
    def video2frame(self,videopath):
        _video = cv2.VideoCapture(videopath)
        _fps_in = _video.get(cv2.CAP_PROP_FPS)
        _count = 0
        _success = 1
        _frames = []
        
        _fps_out = 30

        _index_in = -1
        _index_out = -1
        
        # get class name
        _path = os.path.split(videopath)
        _path2class = os.path.split(_path[0])
        _className = _path2class[1]
        print(_className)
        
        # convert classname to id
        _map_class = self.makeDictClass(dir=self.dir)
        _classId = self.class2id(classname=_className, dictclass=_map_class)
        
        
        # extract video to frame
        while _success:
            _success = _video.grab()
            if not _success: break
            _index_in += 1
            _out_due = int(_index_in / _fps_in * _fps_out)
            if _out_due > _index_out:
                _success, _frame = _video.read()
                if(_success==True):
                    # print(_success)
                    _transformed = transform(_frame)
                    # print(_transformed.shape)
                    _frames.append(_transformed)
                else:
                    continue
        
        _video.release()
        
            
        return _frames, _classId, _className
    
    def makeDictClass(self, dir):
        dictClass = {}
        i = 0
        for _i in os.listdir(dir):
            if (os.path.isdir(os.path.join(dir, _i)) == True):
                # print(_i)
                dictClass[i] = _i
                i = i + 1
        return dictClass

    def class2id(self, classname, dictclass):
        id = 0
        for i in dictclass:
            if (dictclass[i] == classname):
                id = i
                break
        return id
        
    def walk(self, dir):
        list_of_frames = []
        list_of_class_name = []
        list_of_class_id = []
        
        for _i in os.listdir(dir):
           
            for _j in os.listdir(os.path.join(dir, _i)):
                _pathFile = os.path.join(dir, _i, _j)
                _pathFile = _pathFile.replace(os.sep, "/")
                _frames, _classId, _classname = self.video2frame(_pathFile)
                _frames = torch.stack(_frames)
                list_of_frames.append(_frames)
                # print(_frames.shape)
                # print(len(_frames))
                # list_of_frames = torch.stack(_frames)
                list_of_class_name.append(_classname)
                list_of_class_id.append(_classId)
        
        # list_of_frames = torch.stack(list_of_frames)
        list_of_frames_pad = pad_sequence(list_of_frames, batch_first=True)
        return list_of_frames_pad, list_of_class_id, list_of_class_name
        
    
    def __len__(self):
        return len(self.list_of_frames)
    
    def __getitem__(self, idx):
        class_name = self.list_of_class_name[idx]
        class_id = self.list_of_class_id[idx]
        frames = self.list_of_frames[idx]
        # print(frames)
        
        return frames, class_id, class_name
                
        
    
    
        

In [4]:
dir='../../dataset/WLASL100/'
data_torch = VideoDataset(dir=dir, transform=transform)


abdomen
abdomen
abdomen
abdomen
abdomen
accent
accent
accent
accent
accent
accept
accept
accept
accept
accept
accept
accept
accept
accident
accident
accident
accident
accident
accident
accident
accident
accident
accident
accident
accident
accident
affect
affect
affect
affect
affect
affect
again
again
again
again
again
again
again
again
ago
ago
ago
ago
ago
ago
ago
ago
ago
aim
aim
aim
aim
aim
already
already
already
already
already
already
already
already
annoy
annoy
annoy
annoy
appear
appear
appear
appear
appear
appear
appear
appointment
appointment
appointment
appointment
appointment
appointment
appointment
appointment
appointment
appointment
approve
approve
approve
approve
approve
approve
approve
approve
arm
arm
arm
arm
arm
arm
arm
arrest
arrest
arrest
arrest
arrest
arrest
article
article
article
article
authority
authority
authority
authority
authority
aware
aware
aware
aware
babysitter
babysitter
babysitter
babysitter
bad
bad
bad
bad
bad
bad
bad
bad
bad
bad


In [5]:
import torch.utils.data.dataloader

train_data, test_data = torch.utils.data.random_split(dataset=data_torch, lengths=([0.8,0.2]))

train_data_loader = torch.utils.data.DataLoader(train_data, batch_size=1, drop_last=True)
test_data_loader = torch.utils.data.DataLoader(test_data, batch_size=1, drop_last=True)
print(len(train_data_loader))
print(len(test_data_loader))

109
27


In [11]:
for i in test_data_loader:
    print(i[0].shape)

torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])
torch.Size([1, 92, 3, 100, 160])


In [75]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [76]:
import torch
import torch.nn as nn
import torchvision.models as models
import timm
from torch.nn import TransformerEncoder, TransformerEncoderLayer

class VideoRecognitionModel(nn.Module):
    def __init__(self, num_classes, num_frames, embed_dim, num_heads, num_layers, hidden_dim):
        super(VideoRecognitionModel, self).__init__()
        
        
        
        # Load a pre-trained MobileNet model from timm
        self.mobilenet = timm.create_model('mobilenetv3_large_100', pretrained=True, features_only=True)
        
        # Remove the last layer to get feature maps
        self.mobilenet.global_pool = nn.Identity()
        self.mobilenet.classifier = nn.Identity()
        
        # Feature dimension from MobileNet
        self.feature_dim = 960  # For mobilenetv3_large_100, adjust if using a different model
        
        # Transformer Encoder
        self.embed_dim = embed_dim
        self.positional_encoding = nn.Parameter(torch.zeros(1, num_frames, embed_dim))
        encoder_layers = TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, dim_feedforward=hidden_dim)
        self.transformer_encoder = TransformerEncoder(encoder_layers, num_layers=num_layers)
        
        # Fully connected layer for classification
        self.fc = nn.Linear(embed_dim, num_classes)
        
    def forward(self, x):
        # x shape: (batch_size, num_frames, channels, height, width)
        batch_size, num_frames, channels, height, width = x.shape
        
        # Process each frame through MobileNet
        features = []
        for t in range(num_frames):
            frame = x[:, t, :, :, :]  # Extract frame at time t
            frame_features = self.mobilenet(frame)  # Extract features using MobileNet
            frame_features = frame_features[-1]  # Use the last feature map
            frame_features = frame_features.mean([2, 3])  # Global average pooling
            features.append(frame_features)
        
        # Stack features along the time dimension
        features = torch.stack(features, dim=1)  # Shape: (batch_size, num_frames, feature_dim)
        
        # Project features to the embedding dimension
        features = nn.Linear(self.feature_dim, self.embed_dim).to(device)(features)
        
        # Add positional encoding
        features = features + self.positional_encoding
        
        # Pass through Transformer Encoder
        transformer_output = self.transformer_encoder(features)  # Shape: (batch_size, num_frames, embed_dim)
        
        # Aggregate over time (e.g., mean pooling)
        aggregated_output = transformer_output.mean(dim=1)
        
        # Final classification
        output = self.fc(aggregated_output)
        return output



In [77]:
num_classes = 20  # Number of classes for classification
num_frames = 92   # Number of frames in the video
embed_dim = 512   # Embedding dimension for Transformer
num_heads = 8     # Number of attention heads
num_layers = 2    # Number of Transformer layers
hidden_dim = 1024 # Hidden dimension in Transformer feed-forward network

In [78]:
model = VideoRecognitionModel(num_classes, num_frames, embed_dim, num_heads, num_layers, hidden_dim)


Unexpected keys (classifier.bias, classifier.weight, conv_head.bias, conv_head.weight) found while loading pretrained weights. This may be expected if model is being adapted.


In [80]:


# Assuming that we are on a CUDA machine, this should print a CUDA device:

print(device)
model.to(device=device)

cuda:0


VideoRecognitionModel(
  (mobilenet): MobileNetV3Features(
    (conv_stem): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (act1): Hardswish()
    (blocks): Sequential(
      (0): Sequential(
        (0): DepthwiseSeparableConv(
          (conv_dw): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
          (bn1): BatchNormAct2d(
            16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
            (drop): Identity()
            (act): ReLU(inplace=True)
          )
          (aa): Identity()
          (se): Identity()
          (conv_pw): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn2): BatchNormAct2d(
            16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True
            (drop): Identity()
            (act): Identity()
          )
          (drop_path)

In [81]:
dummy_input = torch.randn(1, num_frames, 3, 224, 224)
aa = model(dummy_input.to(device))
print(aa)

tensor([[ 0.3968,  0.0950, -0.0312, -1.3888, -0.0100, -0.0032,  0.7392,  0.6924,
          0.6920,  0.2462,  0.3348, -0.0523, -0.5238, -1.1940, -0.4603,  0.1687,
          0.3902, -0.3159, -0.1865,  0.8156]], device='cuda:0',
       grad_fn=<AddmmBackward0>)


In [86]:
learning_rate = 0.001
momentum = 0.9
weight_decay = 0.0001

num_steps = len(train_data_loader)
iterator = iter(train_data_loader)
count_steps = 1   
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate, momentum=momentum, weight_decay=weight_decay) 

# print(train_data_loader.batch_size)

for i in range(0,200):
    running_loss = 0.
    last_loss = 0.
    for a,b in enumerate(train_data_loader):
        frames = b[0].to(device)
        target = b[1].to(device)
        optimizer.zero_grad()
        # print(b[0].shape)
        output = model(frames.float())
       
        loss = loss_fn(output, target)
        # print(output, b[1])
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        print("class", target)
        print("pred", output)
        print(loss)
    last_loss = running_loss / 1000
    print(running_loss)

class tensor([15], device='cuda:0')
pred tensor([[ 0.5859, -0.4739, -1.4033, -3.0098,  1.1526, -1.1138, -0.2305, -0.2424,
         -0.8428,  1.0511, -0.4722, -0.5178,  1.5815, -0.4632, -0.2127,  0.2286,
         -0.2402,  0.2237, -0.2527, -0.5069]], device='cuda:0',
       grad_fn=<AddmmBackward0>)
tensor(2.9186, device='cuda:0', grad_fn=<NllLossBackward0>)
class tensor([8], device='cuda:0')
pred tensor([[ 2.2782, -1.3372, -1.4782,  1.4679,  0.5165,  0.4883, -1.1585,  0.7047,
          1.1832,  0.4331, -0.5837, -1.3998,  1.0021, -0.9567, -0.6248, -1.3765,
          0.0225,  0.7187, -0.7950,  0.5833]], device='cuda:0',
       grad_fn=<AddmmBackward0>)
tensor(2.3738, device='cuda:0', grad_fn=<NllLossBackward0>)
class tensor([12], device='cuda:0')
pred tensor([[ 1.1732, -0.7650,  0.0911,  1.4074,  1.0134, -0.7325, -0.7429,  0.2196,
          0.3273, -0.1509,  2.2693, -1.6234,  0.1930, -1.7144,  0.5763, -1.8095,
          0.4832, -1.6478, -0.8735,  0.8641]], device='cuda:0',
       grad_fn

KeyboardInterrupt: 