In [1]:
print("Hello World!!")

Hello World!!


# Imports

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from transformers import ViTFeatureExtractor, ViTModel, BertTokenizer, BertModel
import os
import json
import cv2
import pandas as pd
from torchvision import transforms
from PIL import Image

# Define Classes

In [3]:
class VideoEncoder(nn.Module):
    def __init__(self, pretrained_model_name, hidden_size):
        super(VideoEncoder, self).__init__()
        self.feature_extractor = ViTFeatureExtractor.from_pretrained(pretrained_model_name)
        self.vit_model = ViTModel.from_pretrained(pretrained_model_name)
        self.hidden_size = hidden_size
    
    def forward(self, video_frames):
        # video_frames: (batch_size, num_frames, channels, height, width)
        print(f'{video_frames}')
        
        batch_size, num_frames, _, _, _ = video_frames.size()
        
        # Reshape video_frames to (batch_size * num_frames, channels, height, width)
        video_frames = video_frames.view(-1, *video_frames.shape[2:])
        
        # Extract features using ViT
        inputs = self.feature_extractor(images=video_frames, return_tensors="pt")
        inputs = {key: value.to(video_frames.device) for key, value in inputs.items()}
        
        with torch.no_grad():
            outputs = self.vit_model(**inputs)
        
        # Extract the features from the model's output
        features = outputs.last_hidden_state  # (batch_size * num_frames, seq_len, hidden_size)
        
        # Reshape features to (batch_size, num_frames, seq_len, hidden_size)
        features = features.view(batch_size, num_frames, *features.shape[1:])
        
        return features


In [4]:
class TextEncoder(nn.Module):
    def __init__(self, pretrained_model_name, hidden_size):
        super(TextEncoder, self).__init__()
        self.bert_tokenizer = BertTokenizer.from_pretrained(pretrained_model_name)
        self.bert_model = BertModel.from_pretrained(pretrained_model_name)
        self.hidden_size = hidden_size
    
    def forward(self, captions):
        # captions: (batch_size, seq_len)
        
        # Tokenize captions and get BERT embeddings
        input_ids = captions
        attention_mask = (input_ids != 0).float()  # Create attention mask (0 indicates padding)
        
        with torch.no_grad():
            outputs = self.bert_model(input_ids=input_ids, attention_mask=attention_mask)
        
        # Extract the BERT embeddings from the model's output
        embeddings = outputs.last_hidden_state  # (batch_size, seq_len, hidden_size)
        
        return embeddings

In [5]:
def similarity_loss(video_encoded, text_encoded):
    # Normalize the encodings
    video_encoded = F.normalize(video_encoded, p=2, dim=-1)
    text_encoded = F.normalize(text_encoded, p=2, dim=-1)

    # Calculate cosine similarity
    similarity = torch.matmul(video_encoded, text_encoded.transpose(1, 2))

    # Calculate cross-entropy loss
    # In this case, you want to maximize similarity, so use -log(probability) as the loss
    loss = -torch.log(similarity + 1e-8)  # Adding a small epsilon to avoid log(0)

    # Calculate the mean loss over the batch
    loss = torch.mean(loss)

    return loss

In [6]:
video_encoder = VideoEncoder(pretrained_model_name="google/vit-base-patch16-224-in21k", hidden_size=768)
text_encoder = TextEncoder(pretrained_model_name="bert-base-uncased", hidden_size=768) 



In [7]:
class VideoCaptioningModel(nn.Module):
    def __init__(self, video_encoder, text_encoder):
        super(VideoCaptioningModel, self).__init__()
        self.video_encoder = video_encoder
        self.text_encoder = text_encoder

    def forward(self, video_features, captions):
        video_encoded = self.video_encoder(video_features)
        text_encoded = self.text_encoder(captions)
        similarity = similarity_loss(video_encoded, text_encoded)
        return similarity

model = VideoCaptioningModel(video_encoder, text_encoder)

In [8]:
# Define a custom dataset class for video-caption pairs
class VideoCaptionDataset(Dataset):
    def __init__(self, json_path, video_folder, transform=None):
        self.video_folder = video_folder
        self.transform = transform
        self.data = pd.DataFrame(self.load_json_data(json_path)["sentences"])
        self.data.set_index('video_id', inplace=True)

    def __len__(self):
        return len(self.data.index.unique())

    def __getitem__(self, idx):
        video_idx = f'video{idx}'
        video_path = os.path.join(self.video_folder, f'{video_idx}.mp4')
        captions = self.data.loc[video_idx]["caption"].tolist()

        # Load video frames and apply transformations
        video_frames = self.load_video_frames(video_path)

        print(f"HAHAHAHAHAH: {len(video_frames)}")

        if self.transform:
            video_frames = [self.transform(frame) for frame in video_frames]

        print(f"JAJAJAJAJAJA: {len(video_frames)}")

        # Convert the list of tensors to a single tensor
        video_frames = torch.stack(video_frames)

        return video_frames, captions

    def load_json_data(self, json_path):
        with open(json_path, 'r') as json_file:
            data = json.load(json_file)
        return data

    def load_video_frames(self, video_path):
        frames = []
        cap = cv2.VideoCapture(video_path)
        print(f"{video_path=}")

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                print("Breakup happened over here!!!")
                break
            # Convert frame to PIL image
            frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            frames.append(frame_pil)
        print(f"Frames not empty over here as len(frames): {len(frames)}")

        cap.release()
        return frames

# Setup video encoder

In [9]:
# Define transformations for video frames (you can customize these)
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define paths and create data loaders for training and validation
json_path = 'train_val_annotation/train_val_videodatainfo.json'  # Path to your JSON file
video_folder = 'TrainValVideo'  # Path to the folder containing video files

dataset = VideoCaptionDataset(json_path, video_folder, transform=transform)

# Split the dataset into traininx`x`g and validation
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

# Create data loaders
batch_size = 32  # Adjust as needed
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [10]:
model = VideoCaptioningModel(video_encoder, text_encoder)
criterion = nn.MSELoss()  # You can use any suitable loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [11]:
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0

    for batch in train_loader:
        video_features, captions = batch
        optimizer.zero_grad()

        similarity = model(video_features, captions)

        # Backpropagation
        similarity.backward()
        optimizer.step()

        total_loss += similarity.item()

        average_loss = total_loss / len(train_loader)
        print(f'Epoch [{epoch+1}/{num_epochs}, Loss: {average_loss:.4f}')



video_path='TrainValVideo/video1189.mp4'
Frames not empty over here as len(frames): 0
HAHAHAHAHAH: 0
JAJAJAJAJAJA: 0


RuntimeError: stack expects a non-empty TensorList

In [None]:
json_path = "train_val_annotation/train_val_videodatainfo.json"
with open(json_path, 'r') as json_file:
    data = json.load(json_file)


In [None]:
data["sentences"][:5]

In [None]:
df = pd.DataFrame(data["sentences"])
df

In [None]:
df.set_index('video_id', inplace=True)
df

In [None]:
len(df.index.unique())

In [None]:
df2 = pd.DataFrame(data["videos"])
df2

In [None]:
train_loader

In [14]:
def load_video_frames(video_path):
    frames = []
    cap = cv2.VideoCapture(video_path)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        # Convert frame to PIL image
        frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        frames.append(frame_pil)

    cap.release()
    return frames

In [16]:
frames = load_video_frames('TrainValVideo/video4806.mp4')
frames

[]

In [None]:
def preprocess_frame(frame):
    # Resize frame to a fixed size (e.g., 224x224)
    frame = cv2.resize(frame, (224, 224))
    # Normalize pixel values to [0, 1] and convert to PyTorch tensor
    frame = torch.tensor(frame / 255.0, dtype=torch.float32)
    return frame


In [None]:
next(iter(train_loader))

In [None]:
len(train_dataset)

In [12]:
batch_size = 32  # Adjust as needed
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In [13]:
print(next(iter(train_loader)))

video_path='TrainValVideo/video4806.mp4'
Frames not empty over here as len(frames): 0
HAHAHAHAHAH: 0
JAJAJAJAJAJA: 0


RuntimeError: stack expects a non-empty TensorList

In [None]:
train_dataset[0]

In [None]:
load