In [None]:
import os 
from IPython.display import clear_output

if not os.path.exists('/content/clip-video-embedder'):
    !git clone -b dev https://github.com/abreza/clip-video-embedder.git
    %cd /content/clip-video-embedder
    !pip install -r requirements.txt
    clear_output()

%cd /content/clip-video-embedder

/content/clip-video-embedder


In [None]:
import json
import matplotlib.pyplot as plt
import time
import numpy as np
import math
import cv2
import random

In [None]:
def load_dataset(dataset_name):

    json_file = 'train' if dataset_name == 'ActivityNet Captions' else 'activity_net.v1-3.min'
    data_path = f'datasets/ActivityNet/{json_file}.json'

    with open(data_path) as f:
      data = json.load(f)

    if dataset_name == 'ActivityNet':
      data = data["database"]
      data = {id : data[id] for id in list(data.keys()) if data[id]['subset'] == 'training'}

    return data
 

def load_video_informations(video_id):

    data = load_dataset('ActivityNet Captions')
    duration = data[video_id]["duration"]
    sentences = data[video_id]["sentences"]
    timestamps = data[video_id]["timestamps"]

    data = load_dataset('ActivityNet')
    corresponding_label = data[video_id[2:]]["annotations"][0]["label"] 

    return sentences, timestamps, duration, corresponding_label


def choice_video(data, length, var=10):
    duration = -1
    video_ids = list(data.keys())

    min_length = length - var
    max_length = length + var

    while not(duration > min_length and duration < max_length):
        id = random.choice(video_ids)
        duration = data[id]["duration"]

    return id

# ActivityNet Captions

In [None]:
import random

activityNet_train_path = 'datasets/ActivityNet/train.json'

with open(activityNet_train_path) as f:
  data = json.load(f)

In [None]:
import torch
from transformers import CLIPProcessor, CLIPModel
from torch.nn.utils.rnn import pad_sequence

from utils.video_loader import download_video_from_youtube
from dataloaders.rawvideo_util import RawVideoExtractor

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

clear_output()

In [None]:
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device).eval()
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
clear_output()

In [None]:
def frame_sampler(video_path, framerate):
    video_extractor = RawVideoExtractor(framerate=framerate)
    frames = video_extractor.get_video_data(video_path)['video'].to(device)
    return frames

def get_video_dimensions(video_path):
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    width = str(int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)))
    height = str(int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))
    cap.release()
    cv2.destroyAllWindows()

    return (width, height)

def choice_video(data, length, var=10):
    duration = -1
    video_ids = list(data.keys())

    min_length = length - var
    max_length = length + var

    while not(duration > min_length and duration < max_length):
        id = random.choice(video_ids)
        duration = data[id]["duration"]

    return id

def extract_frames(video_path, framerate):
    tensors = frame_sampler(video_path, framerate)
    tensors = pad_sequence(tensors, batch_first=True, padding_value=0)
    return tensors

In [None]:
def trim_sentences(sentences, max_seq_len=77):

    trimmed_sentences = []

    for sentence in sentences:
        tokens = sentence.strip().split()

        # Trim the sentence if it is too long
        if len(tokens) > max_seq_len:
            tokens = tokens[:max_seq_len]

        # Join the tokens back into a string
        trimmed_sentence = " ".join(tokens)

        trimmed_sentences.append(trimmed_sentence)

    return trimmed_sentences

def truncate_sentence(sentence, max_display_length=120):
    truncated_sentence = sentence.strip()
    if len(truncated_sentence) > max_display_length:
        truncated_sentence = truncated_sentence[:max_display_length] + " ..."
    else:
        truncated_sentence = truncated_sentence
    return truncated_sentence

def generate_clippable_text(sentences, corresponding_label, random_sentences=None):
    all_sentences = [' '.join(sentences), corresponding_label, f"It's a video of {corresponding_label}."] + sentences
    all_sentences = all_sentences + random_sentences if random_sentences else all_sentences
    all_sentences = trim_sentences(all_sentences,  max_seq_len=67)
    return all_sentences

In [None]:
def plot_video_frames(video_path, frames):
    cap = cv2.VideoCapture(video_path)
    frames_data = []
    for frame_num in frames:
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
        _, frame = cap.read()
        frames_data.append(frame)
    cap.release()
    fig, axs = plt.subplots(1, 4, figsize=(16, 4))
    for i in range(4):
        axs[i].imshow(frames_data[i])
        axs[i].set_title(f"Frame {frames[i]}")
    plt.show()

In [None]:
def print_line(length=75):
  print('-'*length)


def print_sentences(corresponding_label, sentences, random_sentences=None):

    print_line()
    print(f"Label: {corresponding_label}")
    print_line()
    for i, sentence in enumerate(sentences):
        print(f"Description {i+1:2d}: {truncate_sentence(sentence)}")

    if random_sentences:
      print_line()
      for i, random_sent in enumerate(random_sentences):
        print(f'Random Description {i+1:2d}: {truncate_sentence(random_sent)}')

    print_line()

In [None]:
def plot_list_of_lists(outputs, sentences, random_sentences=None,
                       timestamps=[], framerate=1, max_display_length=100,
                       force_separate_subplots = False,
                       show_random_plots = False,
                       show_corresponding_label_plot=False,
                       show_prompt_label_plot = False,
                       show_concate_descriptions_plot=False,
                       show_average_plot = False, 
                       show_each_plot=False):
           
    concat_output = outputs[0].tolist()
    corresponding_label_output = outputs[1].tolist()
    prompt_label_output = outputs[2].tolist()
    list_of_lists = np.array([output.tolist() for output in outputs[3:]])

    t = np.arange(outputs.shape[1])

    _temp = len(t)/framerate
    interval_length = 1 if _temp<25 else 2 if _temp < 50 else 5 if _temp < 100 else 10 if _temp <260 else 10 if _temp<360 else 20
    plot_w = 12 if _temp<150 else 18 if _temp<350 else 22


    if force_separate_subplots:

        number_of_subplots = len(sentences)+len(random_sentences) if random_sentences and show_random_plots else len(sentences)

        fig , axs = plt.subplots(number_of_subplots, 1, figsize=(plot_w, number_of_subplots*2))

        for i,rasentence in enumerate(sentences):
          axs[i].plot(t/ framerate, list_of_lists[i])
          axs[i].set_title(truncate_sentence(sentences[i]))
          axs[i].set_xlabel("Time (s)")
          axs[i].set_ylabel("Similarity")
          axs[i].axvline(x=timestamps[i][0] , color='r', linestyle='--')
          axs[i].axvline(x=timestamps[i][1] , color='r', linestyle='--')
          axs[i].set_xticks(np.arange(0, round(len(t)/ framerate) + 1, interval_length))


        if random_sentences and show_random_plots:        
          for i, random_sent in enumerate(random_sentences):
            index = i + len(sentences)
            axs[index].plot(t/ framerate, list_of_lists[index], color='orange')
            axs[index].set_title(truncate_sentence('Random Sentence: '+random_sent))
            axs[index].set_xlabel("Time (s)")
            axs[index].set_ylabel("similarity")
            axs[index].set_xticks(np.arange(0, round(len(t)/ framerate) + 1, interval_length))


    else:
        
        fig , ax = plt.subplots(1, 1, figsize=(plot_w, 4))

        if show_concate_descriptions_plot:
            ax.plot(t/ framerate, concat_output, label='Concatenated Descriptions')

        if show_corresponding_label_plot:
            ax.plot(t/ framerate, corresponding_label_output, label='Corresponding Label')

        if show_prompt_label_plot:
            ax.plot(t/ framerate, prompt_label_output, label="Prompt: It's a video of {Label}")

        if show_average_plot:
            mean_of_lists = np.mean(list_of_lists[:len(sentences)], axis=0)
            ax.plot(t/ framerate, mean_of_lists, label='Average Plot')
        
        if show_each_plot:
            for i in range(len(sentences)):
                ax.plot(t/ framerate, list_of_lists[i], label=f'Description {i+1}')

        if random_sentences and show_random_plots:
          if len(random_sentences) == 1:
            ax.plot(t/ framerate, list_of_lists[len(sentences)], label=f'Random Description', linestyle='--')
          
          else:  
            for i in range(len(random_sentences)):
              index = len(sentences)+i
              ax.plot(t/ framerate, list_of_lists[index], label=f'Random Description {i+1}', linestyle='--')

        ax.set_title("CLIP Text-Frame Cosine Similarity")
        ax.set_xlabel("Time (s)")
        ax.set_ylabel("text-frame similarity")
        ax.set_xticks(np.arange(0, round(len(t)/ framerate) + 1, interval_length))
        ax.legend()

    # if save_plots: fig.savefig(f"{video_id} - {framerate}fps.png")

    fig.subplots_adjust(hspace=1)
    plt.show()

In [None]:
def clip_inference(sentences, tensors):
    
    inputs = processor(text=sentences, images=tensors, return_tensors="pt", padding=True)

    new_inputs = {}
    for key in list(inputs.keys()):
        new_inputs[key] = inputs[key].to(device)
    
    with torch.no_grad():
        outputs = model(**new_inputs)

    return outputs

In [None]:
def find_available_video(data, length):
  number_of_failure = 0
  while True:
    try:
        video_id = choice_video(data, length)
        sentences, _, duration, corresponding_label = load_video_informations(video_id)
        video_path = download_video_from_youtube(video_id[2:], f'./videos/')
        print(f'Number of Failure: {number_of_failure}')
        print_line()
        return video_id, video_path
    except Exception as e:
        number_of_failure += 1
        clear_output()
        continue

In [None]:
def summarize_settings(settings):
    if settings.get('force_separate_subplots', False):
        return "multiple"
    
    summary = []
    
    if settings.get('show_random_plots', False):
        summary.append("random")
        
    if settings.get('show_corresponding_label_plot', False):
        summary.append("label")
        
    if settings.get('show_prompt_label_plot', False):
        summary.append("prompt")
        
    if settings.get('show_concate_descriptions_plot', False):
        summary.append("concat")
        
    if settings.get('show_average_plot', False):
        summary.append("avg")
        
    if settings.get('show_each_plot', False):
        summary.append("all")
    
    return "single({})".format("+".join(summary))

In [None]:
def video_analysis(length, random_sentences = None, framerate=1):

    video_id, video_path = find_available_video(data, length=length)

    sentences, timestamps, duration, corresponding_label = load_video_informations(video_id)

    _dimension = "x".join(get_video_dimensions(video_path))
    print(f'ID: {video_id} | Duration: {duration:.2f} sec | {_dimension} | #sent: {len(sentences):2d} | fps: {framerate}',end='')
              
    tic = time.time()
    tensors = extract_frames(video_path, framerate)
    print(f' | Frame extraction: {time.time()-tic :2.2f} sec',end='')

    tic = time.time()
    outputs = clip_inference(generate_clippable_text(sentences, corresponding_label, random_sentences), tensors)
    outputs = outputs.logits_per_text.detach().cpu().numpy()
    
    print(f' | CLIP inference ({device}): {time.time()-tic :2.2f} sec')
    
    print_sentences(corresponding_label, sentences, random_sentences)

    return outputs, video_id

In [None]:
random_sentences = ['None']
framerate = 10

torch.cuda.empty_cache()
outputs, video_id = video_analysis(length=329, random_sentences = random_sentences, framerate = framerate)

setting = {
           'force_separate_subplots'  : True, #Disables all of the followings if be True
           'show_random_plots': True,
           'show_corresponding_label_plot' : True,
           'show_prompt_label_plot' : True,
           'show_concate_descriptions_plot': True,
           'show_average_plot':True,
           'show_each_plot': True,
           }

sentences, timestamps, duration, corresponding_label = load_video_informations(video_id)

# print_sentences(corresponding_label, sentences, random_sentences)
plot_list_of_lists(outputs, sentences, random_sentences, timestamps, framerate=framerate, **setting)

print(f"{video_id[2:]}-{framerate}fps-{summarize_settings(setting)}.png")
print(f'\nURL: https://www.youtube.com/watch?v={video_id[2:]}')

Downloading YouTube video 8z8FprjMNbI.
Download complete.
Number of Failure: 14
---------------------------------------------------------------------------
ID: v_8z8FprjMNbI | Duration: 332.45 sec | 568x320 | #sent:  3 | fps: 10

# Statistical analysis of the data 

In [None]:
import math
def compute_size(tensor_shape):
    size = math.prod(tensor_shape) * 4 / 1024/1024

    if size/1024 <1:
      print(f"Size: {size:.2f} MB")
    else:
      print(f"Size: {size/1024:.2f} GB")

In [None]:
compute_size((224,224,3,200,5))

Size: 574.22 MB


In [None]:
data_path = f'datasets/ActivityNet/activity_net.v1-3.min.json'

with open(data_path) as f:
  data = json.load(f)

data = data["taxonomy"]

In [None]:
def build_tree(nodes, parent_id=None):
    # Filter nodes with matching parent_id
    children = [node for node in nodes if node['parentId'] == parent_id]

    # Sort children by nodeName
    children.sort(key=lambda x: x['nodeName'])

    # Recursively build tree for each child
    tree = {}
    for child in children:
        tree[child['nodeName']] = build_tree(nodes, child['nodeId'])

    return tree

def print_tree(tree, level=0, indicators=['-','○','+','•']):
  # Iterate over tree
  for i, (name, child) in enumerate(tree.items()):
    # Select indicator based on level
    indicator = indicators[level % len(indicators)]

    # Print node with indicator and level
    print("  " * level + f"{indicator} {name}")

    # Recursively print child nodes
    if child:
        print_tree(child, level + 1, indicators)

In [None]:
def get_min_max_time_segments(segments):
    start_times = [segment[0] for segment in segments]
    end_times = [segment[1] for segment in segments]
    min_start_time = min(start_times)
    max_end_time = max(end_times)
    return [min_start_time, max_end_time]

def get_total_length(segments):
    return sum([segment[1]-segment[0] for segment in segments])

def get_ratio(min_max_array,total_length):
    return (min_max_array[1]-min_max_array[0])/total_length