In [2]:
import os
import cv2
import glob
import timm
import torch
import itertools
import numpy as np
import pandas as pd
import torch.nn as nn
import albumentations as A
import matplotlib.pyplot as plt
import torch.nn.functional as F
# from vit_pytorch.vivit import ViT
from tqdm.autonotebook import tqdm
from transformers import DistilBertModel, DistilBertConfig, DistilBertTokenizer
import pandas as pd
from sklearn.model_selection import train_test_split
import time
import torchvision.models as models


In [3]:
class CFG:
    debug = False
    video_path = "testing"
    captions_path = "."
    batch_size = 1
    num_workers = 4
    head_lr = 1e-3
    video_encoder_lr = 1e-4
    text_encoder_lr = 1e-5
    classification_encoder_lr = 1e-4
    weight_decay = 1e-3
    patience = 1
    factor = 0.8
    epochs = 20
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    frame_size = 224
    video_len = 16
    # video_embedding = 1024
    video_embedding = 400
    text_encoder_model = "distilbert-base-uncased"
    text_embedding = 768
    text_tokenizer = "distilbert-base-uncased"
    max_length = 200

    pretrained = True # for both image encoder and text encoder
    trainable = True # for both image encoder and text encoder
    temperature = 1.0

    # image size
    size = 224

    # for projection head; used for both image and text encoders
    num_projection_layers = 1
    projection_dim = 256 
    dropout = 0.1


In [4]:
class AvgMeter:
    def __init__(self, name="Metric"):
        self.name = name
        self.reset()

    def reset(self):
        self.avg, self.sum, self.count = [0] * 3

    def update(self, val, count=1):
        self.count += count
        self.sum += val * count
        self.avg = self.sum / self.count

    def __repr__(self):
        text = f"{self.name}: {self.avg:.4f}"
        return text

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group["lr"]

# %%
class CLIPDataset(torch.utils.data.Dataset):
    def __init__(self, video_data, tokenizer, transforms, video_len):
        """
        image_filenames and cpations must have the same length; so, if there are
        multiple captions for each image, the image_filenames must have repetitive
        file names 
        """
        video_filenames = []
        prompts = []
        for vid_data in video_data:
            raw_data = vid_data.split('|')
            video_filenames.append(raw_data[0])
            prompts.append(raw_data[1].replace('\n',''))

        self.video_filenames = video_filenames
        self.captions = list(prompts)
        self.encoded_captions = tokenizer(
            list(prompts), padding=True, truncation=True, max_length=CFG.max_length
        )
        self.transforms = transforms
        self.video_len = video_len

    def read_video_as_tensor(self, video_path):
        cap = cv2.VideoCapture(video_path)

        frames = []
        padd_frame = np.zeros((CFG.frame_size, CFG.frame_size, 3), dtype=np.uint8)

        frame_idx = 0
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # Convert to RGB
            frame = cv2.resize(frame,(CFG.frame_size, CFG.frame_size))
            frame_tensor = torch.from_numpy(frame)  # Convert to PyTorch tensor
            frames.append(frame_tensor)
            frame_idx += 1
        while frame_idx < CFG.video_len:
            frame_tensor = torch.from_numpy(padd_frame)  # Convert to PyTorch tensor
            frames.append(frame_tensor)
            frame_idx += 1

        cap.release()
        video_tensor = torch.stack(frames)  # Stack frames to create video tensor
        return video_tensor

    def __getitem__(self, idx):
        item = {
            key: torch.tensor(values[idx])
            for key, values in self.encoded_captions.items()
        }
        video = self.read_video_as_tensor(f"{CFG.video_path}/{self.video_filenames[idx]}")
        
        item['video'] = torch.tensor(video).permute(3, 0, 1, 2).float()
        item['caption'] = self.captions[idx]
        item['filename'] = self.video_filenames[idx]

        return item


    def __len__(self):
        return len(self.captions)


In [5]:
def get_transforms(mode="train"):
    if mode == "train":
        return A.Compose(
            [
                A.Resize(CFG.size, CFG.size, always_apply=True),
                A.Normalize(max_pixel_value=255.0, always_apply=True),
            ]
        )
    else:
        return A.Compose(
            [
                A.Resize(CFG.size, CFG.size, always_apply=True),
                A.Normalize(max_pixel_value=255.0, always_apply=True),
            ]
        )

# %%
class VideoEncoder(nn.Module):
    """
    Encode images to a fixed size vector
    """

    def __init__(
        self, pretrained=CFG.pretrained, trainable=CFG.trainable
    ):
        super().__init__()
        self.model = models.video.r3d_18(pretrained=True)
        
        # # video = torch.randn(4, 3, 16, 128, 128) # (batch, channels, frames, height, width)
        # for p in self.model.parameters():
        #     p.requires_grad = trainable
        # self.model = swin3d_t(pretrained=True)
        # self.model = torch.nn.Sequential(*list(model.children())[:-2])
        # video = torch.randn(4, 3, 16, 128, 128) # (batch, channels, frames, height, width)
        for p in self.model.parameters():
            p.requires_grad = trainable

    def forward(self, x):
        return self.model(x)

# %%
class TextEncoder(nn.Module):
    def __init__(self, model_name=CFG.text_encoder_model, pretrained=CFG.pretrained, trainable=CFG.trainable):
        super().__init__()
        if pretrained:
            self.model = DistilBertModel.from_pretrained(model_name)
        else:
            self.model = DistilBertModel(config=DistilBertConfig())
            
        for p in self.model.parameters():
            p.requires_grad = trainable

        # we are using the CLS token hidden representation as the sentence's embedding
        self.target_token_idx = 0

    def forward(self, input_ids, attention_mask):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = output.last_hidden_state
        return last_hidden_state[:, self.target_token_idx, :]


# %%
class ProjectionHead(nn.Module):
    def __init__(
        self,
        embedding_dim,
        projection_dim=CFG.projection_dim,
        dropout=CFG.dropout
    ):
        super().__init__()
        self.projection = nn.Linear(embedding_dim, projection_dim)
        self.gelu = nn.GELU()
        self.fc = nn.Linear(projection_dim, projection_dim)
        self.dropout = nn.Dropout(dropout)
        self.layer_norm = nn.LayerNorm(projection_dim)
    
    def forward(self, x):
        projected = self.projection(x)
        x = self.gelu(projected)
        x = self.fc(x)
        x = self.dropout(x)
        x = x + projected
        x = self.layer_norm(x)
        return x


In [6]:
class CLIPModel(nn.Module):
    def __init__(
        self,
        temperature=CFG.temperature,
        video_embedding=CFG.video_embedding,
        text_embedding=CFG.text_embedding,
    ):
        super().__init__()
        self.video_encoder = VideoEncoder()
        self.text_encoder = TextEncoder()
        self.image_projection = ProjectionHead(embedding_dim=video_embedding)
        self.text_projection = ProjectionHead(embedding_dim=text_embedding)
        self.classification_model = torch.nn.Sequential( 
                torch.nn.Linear(in_features = 256, out_features = 1), 
                torch.nn.Sigmoid() 
            )
        
        self.temperature = temperature
        self.cos = nn.CosineSimilarity(dim=1, eps=1e-6)
        self.mse_loss = nn.MSELoss()

    def forward(self, batch):
        # Getting Image and Text Features
        video_features = self.video_encoder(batch["video"])
        
        text_features = self.text_encoder(
            input_ids=batch["input_ids"], attention_mask=batch["attention_mask"]
        )
        # Getting Image and Text Embeddings (with same dimension)
        video_embeddings = self.image_projection(video_features)
        text_embeddings = self.text_projection(text_features)
        

        # output_linear = self.classification_model(video_embeddings) 
        
        # Calculating the Loss
        embeddings_similarity = (self.cos(video_embeddings, text_embeddings)+1)/2
        vec_product = video_embeddings*text_embeddings
        magnitude1 = torch.norm(video_embeddings)
        magnitude2 = torch.norm(text_embeddings)
        normalized_dot = vec_product / (magnitude1 * magnitude2)
        output_linear = self.classification_model(normalized_dot) 
        
        
        output = (embeddings_similarity + (3*output_linear))/4
        
        return output

def build_loaders(data, tokenizer, mode):
    transforms = get_transforms(mode=mode)
    dataset = CLIPDataset(
        data,
        tokenizer=tokenizer,
        transforms=transforms,
        video_len=CFG.video_len
    )
    dataloader = torch.utils.data.DataLoader(
        dataset,
        batch_size=CFG.batch_size,
        num_workers=CFG.num_workers,
        shuffle=True if mode == "train" else False,
    )
    return dataloader

In [7]:
with open('test.txt','r') as f:
    df_valid = f.readlines()

In [14]:
# def main():
    # train_df, valid_df = make_train_valid_dfs()
tokenizer = DistilBertTokenizer.from_pretrained(CFG.text_tokenizer)
valid_loader = build_loaders(df_valid, tokenizer, mode="valid")


model = CLIPModel().to(CFG.device)

model.load_state_dict(torch.load('model.pth'))
model.eval()
tqdm_object = tqdm(valid_loader, total=len(valid_loader))
final_scores = ""
start = time.time()
for batch in tqdm_object:
    batch_model = {k: v.to(CFG.device) for k, v in batch.items() if k != "caption" and k != "filename"}
    model_output = model(batch_model)
    score = model_output.detach().cpu().numpy()[0][0]
    filename = batch['filename'][0]
    final_scores += filename+","+str(score*100)+"\n"
end = time.time()


Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertModel: ['vocab_projector.weight', 'vocab_projector.bias', 'vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_transform.weight', 'vocab_transform.bias']
- This IS expected if you are initializing DistilBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
  item['video'] = torch.tensor(video).permute(3, 0, 1, 2).float()
  item['video'] = torch.tensor(video).permute(3, 0, 1, 2).float()
  item['video'] = torch.tensor(video).permute(3, 0, 1, 2).float()
  item['video'] = torch.tensor(video).permute(3, 0, 1, 2).float()
100%|

In [15]:
f = open("output.txt", "w")
f.write(final_scores)
f.close()

In [9]:
(end-start)/2000

0.022273842930793764

In [11]:
!zip submission.zip readme.txt  output.txt

updating: readme.txt (deflated 29%)
updating: output.txt (deflated 59%)
