# Feature Extraction Embeddings from video
## Using VideoMAE pretrained model
Model allow us to provide only 16 frames, so we'll catch evenly distributed frames based on total frames count

In [67]:
import torch
import cv2
from transformers import AutoImageProcessor, VideoMAEModel

class VideoFeatureExtractor:
    def __init__(self, *args, **kwargs):
        # Initialize the image processor and model
        self.image_processor = AutoImageProcessor.from_pretrained("MCG-NJU/videomae-base", cache_dir="./cache")
        self.model = VideoMAEModel.from_pretrained("MCG-NJU/videomae-base", cache_dir="./cache")

        # Determine the device (GPU or CPU)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = self.model.to(self.device)

    def _preprocess_frames(self, frames):
        """
        Apply preprocessing to each frame using AutoImageProcessor.
        Resize frames to the required size and normalize them.
        """
        # Resize frames and return tensors
        inputs = self.image_processor(frames, return_tensors="pt", size=(224, 224))
        return inputs

    def _video_to_frames(self, video_path, target_frame_count):
        """
        Split video into frames using OpenCV.
        Dynamically select the number of frames based on video length and the required frame count.
        """
        vidcap = cv2.VideoCapture(video_path)
        total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))  # Total number of frames in the video

        # Calculate how many frames to skip for target_frame_count
        frame_skip = max(1, total_frames / target_frame_count)

        frames = []
        count = 0
        success, image = vidcap.read()
        while success:
            if count > frame_skip * len(frames):
                # Resize the frame using OpenCV if necessary
                image_resized = cv2.resize(image, (224, 224))
                frames.append(cv2.cvtColor(image_resized, cv2.COLOR_BGR2RGB))
            success, image = vidcap.read()
            count += 1

        vidcap.release()

        # If the number of selected frames is less than target_frame_count,
        # apply padding (add empty frames)
        while len(frames) < target_frame_count:
            frames.append(torch.zeros((224, 224, 3), dtype=torch.uint8).numpy())  # Add empty frame

        return frames

    def extract_features(self, video_path: str, target_frame_count: int = 16) -> torch.Tensor:
        """
        Extract features using the VideoMAE model.
        Dynamically select the number of frames based on video length.
        """
        frames = self._video_to_frames(video_path, target_frame_count)
        
        if not frames:
            raise ValueError("Failed to extract frames from video")

        # Preprocess the frames and resize to 224x224
        inputs = self._preprocess_frames(frames)
        
        # Move tensors to GPU or CPU
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        # Extract features using the model
        with torch.no_grad():
            outputs = self.model(**inputs)
        
        # Get embeddings
        embeddings = outputs.last_hidden_state  # or another output layer if needed
        return embeddings.mean(dim=1)

## Test for feature extraction below

In [68]:

# define extractor
fe = VideoFeatureExtractor()

In [69]:
from pathlib import Path

# Iterate videos
p = Path("../../train_dataset_tag_video/videos")
videos = [x for x in p.iterdir() if str(x).endswith(".mp4")]
results = []
for v in videos:
    print("processing", str(v))
    res = fe.extract_features(str(v))
    print(res.shape)
    results.append(
        (
            res,
            v
        )
    )

processing ../../train_dataset_tag_video/videos/1f17968167d4b0487cdebb6a67b4f148.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/3ec7c2b092514dc4ebeaa3036fe9857c.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/0c069f42ac98970c28d471d615e71f7b.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/2c5bdce3e9e2c8b9db713d9f2c196820.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/3b69f98d51c1028633cff24c7d2937e0.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/1e922b7daeef2358f82b263533d450ac.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/1e244ec5c0c85f9478a83695ac9add45.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/0ac7ed0507b2364e40030d11bf52ee5d.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/videos/02d4cc029c5a7531f36992446a24478f.mp4
torch.Size([1, 768])
processing ../../train_dataset_tag_video/video

Finding most similar videos based on embeddings.

In [70]:
import torch.nn.functional as F
import itertools

# Найдем все пары тензоров
pairs = list(itertools.combinations(results, 2))

min_sim = float('inf')
max_sim = float('-inf')
most_similar = None
most_dissimilar = None

for (tensor1, path1), (tensor2, path2) in pairs:
    if str(path1) != str(path2):
        sim = F.cosine_similarity(tensor1, tensor2)
        
        if sim < min_sim:
            min_sim = sim
            most_dissimilar = ((tensor1, path1), (tensor2, path2))
        
        if sim > max_sim:
            max_sim = sim
            most_similar = ((tensor1, path1), (tensor2, path2))

# Результаты
print(f"Most similar videos: \n\t{str(most_similar[0][1])}\n\t{str(most_similar[1][1])}")
print(f"Most dissimilar videos: \n\t{str(most_dissimilar[0][1])}\n\t{str(most_dissimilar[1][1])}")

Most similar videos: 
	../../train_dataset_tag_video/videos/2dc87cc0bbb8275d1870da2f33cbdb12.mp4
	../../train_dataset_tag_video/videos/3a8760f7de25a13acc15ffb30081f13a.mp4
Most dissimilar videos: 
	../../train_dataset_tag_video/videos/0c069f42ac98970c28d471d615e71f7b.mp4
	../../train_dataset_tag_video/videos/3bf2e36c3963425263babe36b8dbd740.mp4
