# Vision Preprocessing

Features:
- Zero shot classification (what best describes what is happening in this video during sampling)

In [1]:
import os

def perror(msg):
    print("error: " + msg)

def mkdir(path):
    if not os.path.exists(path):
        os.makedirs(path)
        return path

    print("path already exists")
    return

VIDEO_URL = "https://www.youtube.com/watch?v=33pBg_UaeJk"

OUTPUT_DIR = 'cache/'  # target output for preoprocessing is cache
extracted_id = VIDEO_URL.split("/")[-1]

mkdir(OUTPUT_DIR)

path already exists


In [2]:
from pytube import YouTube

yt = YouTube(VIDEO_URL)
yt.streams.filter(progressive=True, file_extension='mp4').order_by(
    'resolution').desc().first().download(output_path=OUTPUT_DIR)

extracted_title = yt.streams[0].default_filename
OUTPUT_FILE = OUTPUT_DIR + extracted_title

Sampling

In [None]:
import av
import numpy as np
from transformers import AutoProcessor, AutoModel
np.random.seed(0)

import pandas as pd

df = pd.DataFrame(columns=['start_frame', 'end_frame', 'start_time', 'end_time'])

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.
    Args:
        container (`av.container.input.InputContainer`): PyAV container.
        indices (`List[int]`): List of frame indices to decode.
    Returns:
        result (np.ndarray): np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)

    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_frame_indices(clip_len, frame_sample_rate, seg_len, end_idx):
    '''
    Sample a given number of frame indices from the video.
    Args:
        clip_len (`int`): Total number of frames to sample.
        frame_sample_rate (`int`): Sample every n-th frame.
        seg_len (`int`): Maximum allowed index of sample's last frame. 
        end_idx (`int`): Last index considered

    Returns:
        indices (`List[int]`): List of sampled frame indices
    '''
    converted_len = int(clip_len * frame_sample_rate)
    print("converted", converted_len)
    print("seg", seg_len)

    #end_idx = np.random.randint(converted_len, seg_len)

    start_idx = end_idx - converted_len
    print("start: ", start_idx)
    print("end: ", end_idx)

    indices = np.linspace(start_idx, end_idx, num=clip_len)
    indices = np.clip(indices, start_idx, end_idx - 1).astype(np.int64)

    return indices

Setup VideoReader

In [None]:
# decord notes:
# https://github.com/dmlc/decord?tab=readme-ov-file            aarch64: build from source to avoid shambles
# $ cd decord/python && pip install .                          

# https://github.com/huggingface/transformers/issues/21054     CUDA GPU support for inference + XClip issue
# https://github.com/huggingface/datasets/issues/5225          dataset video support
from decord import VideoReader, cpu

videoreader = VideoReader(OUTPUT_FILE, num_threads=1, ctx=cpu(0))


In [None]:
# frame sample size
SAMPLE_SIZE = 8
FRAME_SAMPLE_RATE = 1

container = av.open(OUTPUT_FILE)
SEGMENT_LENGTH = container.streams.video[0].frames

print(SEGMENT_LENGTH)


Setup frame indices

In [None]:
avg_fps = videoreader.get_avg_fps()

for i in range(0, (SEGMENT_LENGTH // SAMPLE_SIZE)):
    start_frame = i * SAMPLE_SIZE
    end_frame = (i * SAMPLE_SIZE) + SAMPLE_SIZE
    start_time = round(start_frame / avg_fps, 2)
    end_time = round(end_frame / avg_fps, 2)


    df.loc[len(df)] = [start_frame, end_frame, start_time, end_time]

# Account for any clipping
last_value = df['end_frame'].iloc[-1]
df.loc[len(df)] = [last_value, SEGMENT_LENGTH-1, round(last_value / avg_fps), round(SEGMENT_LENGTH-1 / avg_fps)]

df

Get Video

In [None]:
def get_video(row):
    return videoreader.get_batch(sample_frame_indices(clip_len=SAMPLE_SIZE, frame_sample_rate=FRAME_SAMPLE_RATE, seg_len=container.streams.video[0].frames, end_idx=row['end_frame'])).asnumpy()

df['video'] = df.apply(get_video, axis=1)

df

# XClip Model

XClip takes a list of text and determines which go best with the video

In [None]:
from transformers import XCLIPProcessor, XCLIPModel

processor = AutoProcessor.from_pretrained("microsoft/xclip-base-patch32")
model = AutoModel.from_pretrained("microsoft/xclip-base-patch32")

# Inference

Classification

In [None]:
# Prompts - essentially a lookup table of visual interests.
# if a input passes a specific threshhold, assign greater weight/importance to the frame buffer
# for clipping

# TODO consider a pipeline that allows user input to be sent to a LLM to prompt and create this list
'''
interesting_moments = [
    "A surprising or unexpected answer",
    "A candid or emotional confession",
    "A witty one-liner or humorous remark",
    "A memorable quote or sound bite",
    "A dramatic pause or intense gaze",
    "A revealing or insightful comment about their craft",
    "A heartfelt or inspiring message to fans",
    "A surprising admission or revelation",
    "A passionate defense of a particular issue",
    "A unique or quirky habit or ritual",
    "A fascinating story from their personal life",
    "A memorable moment when they first got into the industry",
    "A prediction or forecast for future events",
    "A candid critique of themselves or others",
    "A moving tribute to a mentor or idol",
    "A surprising revelation about their own strengths or weaknesses",
]
'''

video_labels = [
    "Highlight Worthy",
    "Neutral",
    "Not Highlight Worthy",
]

In [None]:
import torch

# TODO MAJOR perf bottleneck right here, batch this with torch dataset loader or something
def get_classification(row):
    video = numpy.array(row['video']) # perf improvement 
    inputs = processor(text=video_labels, videos=list(video), return_tensors="pt", padding=True)
    # forward pass
    with torch.no_grad():
        outputs = model(**inputs)
    
    probs = outputs.logits_per_video.softmax(dim=1)
    _, predicted = torch.max(probs, dim=1)

    return video_labels[predicted.item()]

df['vision_classification'] = df.apply(get_classification, axis=1)


Drop video bc its huge

In [None]:
df = df.drop(columns=['video'])

In [None]:
df

## Export

In [None]:
df.to_csv(OUTPUT_DIR + 'out_vision_preprocessing.csv', index=False) 