In [None]:
# !pip install av numpy transformers torch

# Importing necessary libraries

In [None]:
import av
import numpy as np
from transformers import VivitImageProcessor, VivitModel
import torch
import os
import numpy as np
import gc

# Loading cropped videos

In [None]:
base_path="/kaggle/input/vid-deepfake/cropped_celeb_df_fake"
celeb_df_fake_vids=[os.path.join(base_path,vid) for vid in os.listdir(base_path) if vid.endswith(".mp4")]
celeb_df_fake_vids

# Function to read cropped videos and sample frames

In [None]:
np.random.seed(0)


def read_video_pyav(container, indices):
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_frame_indices(clip_len, frame_sample_rate, seg_len):

    converted_len = int(clip_len * frame_sample_rate)
    if converted_len >= seg_len:
        raise ValueError("Not enough frames in the video to sample the specified number of frames.")
    end_idx = np.random.randint(converted_len, seg_len)
    start_idx = end_idx - converted_len
    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)
    return indices


# Function to extract the kinetics-400 pretrained ViViT model embeddings from videos 

In [7]:
def extract_video_embeddings(video_path, model, processor, device, clip_len=32, frame_sample_rate=1):
    container = av.open(video_path)
    try:
        indices = sample_frame_indices(clip_len, frame_sample_rate, container.streams.video[0].frames)
    except ValueError as e:
        print(f"Skipping {video_path} due to error: {e}")
        return None
    video = read_video_pyav(container=container, indices=indices)

    inputs = processor(list(video), return_tensors="pt").to(device)
    with torch.no_grad(): 
        outputs = model(**inputs)
    last_hidden_states = outputs.last_hidden_state
    return last_hidden_states

# Making output directory

In [8]:
output_dir="/kaggle/working/final_celeb_df_fake_video_embeddings"
os.makedirs(output_dir)

# Extacting embeddings

In [None]:
def store_embeddings(embeddings, filename):
    torch.save(embeddings, filename)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
# # Iterate through video paths and extract embeddings
image_processor = VivitImageProcessor.from_pretrained("google/vivit-b-16x2-kinetics400")
model = VivitModel.from_pretrained("google/vivit-b-16x2-kinetics400").to(device)

total=len(celeb_df_fake_vids)

c=1
for video_path in celeb_df_fake_vids:
    try:
        embeddings = extract_video_embeddings(video_path, model, image_processor, device)
        if embeddings is not None:
            store_embeddings(embeddings, os.path.join(output_dir, f"embeddings_{os.path.basename(video_path)[:-8]}.pt"))  
            print("done with ", c, " out of ", total)
        else:
            print("embeddings none for....",video_path)
    except RuntimeError as e:
        print(f"Error processing {video_path}: {e}")
    finally:
        # Free up GPU memory
        del embeddings
        torch.cuda.empty_cache()
        gc.collect()
    c += 1
print("Finished processing all videos!")

In [None]:
!zip -r final_celeb_df_fake_video_embeddings.zip /kaggle/working/final_celeb_df_fake_video_embeddings