In [1]:
import os
import csv
from towhee import ops, pipe, register
from towhee.operator import PyOperator
from towhee import DataCollection
from tqdm import tqdm
import pandas as pd
import json
import numpy as np
from helpers import milvus_utils
from helpers import eval_utils as my_eval_utils

Connected to Milvus server at port 19530


In [2]:
# CONSTANTS

# Files
MSRVTT_SAMPLES = "./MSRVTT_1K.csv"

# Database Collections
VIDEO_RET_TUNING_COLLECTION_PREFIX = "msrvtt_vid_hyp_tun_"

In [3]:

raw_samples_df = pd.read_csv(MSRVTT_SAMPLES)
raw_samples_df[['video_id', 'video_path', 'sentence']].head()

Unnamed: 0,video_id,video_path,sentence
0,video7579,./test_1k_compress/video7579.mp4,a girl wearing red top and black trouser is pu...
1,video7725,./test_1k_compress/video7725.mp4,young people sit around the edges of a room cl...
2,video9258,./test_1k_compress/video9258.mp4,a person is using a phone
3,video7365,./test_1k_compress/video7365.mp4,cartoon people are eating at a restaurant
4,video8068,./test_1k_compress/video8068.mp4,a woman on a couch talks to a a man


In [8]:
def create_loader_pipeline(uts_value, milvus_col_name):
    print(f"Creating loader pipeline for {uts_value} with collection {milvus_col_name}")
    def read_loader_csv(csv_file):
        with open(csv_file, 'r', encoding='utf-8-sig') as f:
            data = csv.DictReader(f)
            for line in data:
                yield int(line['video_id'][len('video'):]), line['video_path']

    video_loader_pipeline = (
        pipe.input('csv_file')
        .flat_map('csv_file', ('video_id', 'video_path'), read_loader_csv)
        # Create 12 evenly distributed frames per video
        .map('video_path', 'frames', ops.video_decode.ffmpeg(sample_type='uniform_temporal_subsample', 
                                                             args={'num_samples': uts_value}))
        # I have a M2 Max, so device is set to mps for better performance
        .map('frames', 'vec', ops.video_text_embedding.clip4clip(model_name='clip_vit_b32', 
                                                                 modality='video', device='mps'))
        .map(('video_id', 'vec'), (), ops.ann_insert.milvus_client(collection_name=milvus_col_name))
        .output('video_id')
    )
    return video_loader_pipeline

def create_searcher_pipeline(milvus_col_name):
    print(f"Creating searcher pipeline with collection {milvus_col_name}")
    def read_video_search_csv(csv_file):
        with open(csv_file, 'r', encoding='utf-8-sig') as f:
            data = csv.DictReader(f)
            for line in data:
                yield line['video_id'], line['sentence']

    video_search_pipeline = (
        pipe.input('csv_file')
        .flat_map('csv_file', ('rel_video_id', 'query'), read_video_search_csv)
        .map('query', 'vec', ops.video_text_embedding.clip4clip(model_name='clip_vit_b32', modality='text', device='mps'))
        .map('vec', 'top10_raw_res', 
            ops.ann_search.milvus_client(collection_name=milvus_col_name, limit=10))
        .map('top10_raw_res', ('top1', 'top5', 'top10'), lambda x: (x[:1], x[:5], x[:10]))
        .output('rel_video_id', 'query', 'top1', 'top5', 'top10')
    )
    return video_search_pipeline
        


def load_and_query_c4c(uts_values):
    # (1) Create a new collection for each experiment
    milvus_cols = {uts_val: VIDEO_RET_TUNING_COLLECTION_PREFIX +
                   str(uts_val) for uts_val in uts_values}
    print(f"Creating collections: {list(milvus_cols.values())}")
    for uv, m_col_name in milvus_cols.items():
        milvus_utils.create_milvus_collection(m_col_name, 512)

    # (2) Create different loader pipelines for each experiment
    loader_pipelines = {}
    searcher_pipelines = {}
    search_dcs = {}
    search_results_dfs = {}
    for uv, m_col_name in milvus_cols.items():
        print(f"Creating loader pipeline for {uv}")
        loader_pipelines[uv] = create_loader_pipeline(uv, m_col_name)
        searcher_pipelines[uv] = create_searcher_pipeline(m_col_name)
    
    # (3) Call each loader pipeline
    for uv, m_col_name in milvus_cols.items():
        print(f"Loading data into {m_col_name}")
        pipe = loader_pipelines[uv]
        pipe(MSRVTT_SAMPLES)
        print(f"Finished loading data into {m_col_name}")
    
    # (4) Call each searcher pipeline
    for uv, m_col_name in milvus_cols.items():
        print(f"Searching data in {m_col_name}")
        search_pipe = searcher_pipelines[uv]
        search_dc = DataCollection(search_pipe(MSRVTT_SAMPLES))
        search_dcs[uv] = search_dc
        search_results_dfs[uv] = my_eval_utils.twohee_data_col_to_df(search_dc)
        print(f"Finished searching data in {m_col_name}")

    return search_results_dfs

In [17]:
experiment_query_results = load_and_query_c4c([6, 9, 12])

Creating collections: ['msrvtt_vid_hyp_tun_6', 'msrvtt_vid_hyp_tun_9', 'msrvtt_vid_hyp_tun_12']
Creating loader pipeline for 6
Creating loader pipeline for 6 with collection msrvtt_vid_hyp_tun_6
Creating searcher pipeline with collection msrvtt_vid_hyp_tun_6
Creating loader pipeline for 9
Creating loader pipeline for 9 with collection msrvtt_vid_hyp_tun_9
Creating searcher pipeline with collection msrvtt_vid_hyp_tun_9
Creating loader pipeline for 12
Creating loader pipeline for 12 with collection msrvtt_vid_hyp_tun_12
Creating searcher pipeline with collection msrvtt_vid_hyp_tun_12


2025-04-16 18:43:43,696 - 20599304192 - node.py-node:167 - INFO: Begin to run Node-_input
2025-04-16 18:43:43,696 - 25954381824 - node.py-node:167 - INFO: Begin to run Node-read_loader_csv-0
2025-04-16 18:43:43,696 - 25971208192 - node.py-node:167 - INFO: Begin to run Node-video-decode/ffmpeg-1
2025-04-16 18:43:43,696 - 20599304192 - node.py-node:167 - INFO: Begin to run Node-video-text-embedding/clip4clip-2
2025-04-16 18:43:43,697 - 25988034560 - node.py-node:167 - INFO: Begin to run Node-ann-insert/milvus-client-3
2025-04-16 18:43:43,697 - 26004860928 - node.py-node:167 - INFO: Begin to run Node-_output


Loading data into msrvtt_vid_hyp_tun_6


2025-04-16 18:44:37,545 - 26356314112 - node.py-node:167 - INFO: Begin to run Node-_input
2025-04-16 18:44:37,546 - 26373140480 - node.py-node:167 - INFO: Begin to run Node-read_loader_csv-0
2025-04-16 18:44:37,546 - 26389966848 - node.py-node:167 - INFO: Begin to run Node-video-decode/ffmpeg-1
2025-04-16 18:44:37,547 - 26406793216 - node.py-node:167 - INFO: Begin to run Node-video-text-embedding/clip4clip-2
2025-04-16 18:44:37,547 - 26423619584 - node.py-node:167 - INFO: Begin to run Node-ann-insert/milvus-client-3
2025-04-16 18:44:37,547 - 26356314112 - node.py-node:167 - INFO: Begin to run Node-_output


Finished loading data into msrvtt_vid_hyp_tun_6
Loading data into msrvtt_vid_hyp_tun_9


2025-04-16 18:45:39,864 - 27344547840 - node.py-node:167 - INFO: Begin to run Node-_input
2025-04-16 18:45:39,864 - 27361374208 - node.py-node:167 - INFO: Begin to run Node-read_loader_csv-0
2025-04-16 18:45:39,865 - 27378200576 - node.py-node:167 - INFO: Begin to run Node-video-decode/ffmpeg-1
2025-04-16 18:45:39,865 - 27344547840 - node.py-node:167 - INFO: Begin to run Node-video-text-embedding/clip4clip-2
2025-04-16 18:45:39,866 - 27548266496 - node.py-node:167 - INFO: Begin to run Node-ann-insert/milvus-client-3
2025-04-16 18:45:39,866 - 27531440128 - node.py-node:167 - INFO: Begin to run Node-_output


Finished loading data into msrvtt_vid_hyp_tun_9
Loading data into msrvtt_vid_hyp_tun_12


2025-04-16 18:46:33,961 - 18687864832 - node.py-node:167 - INFO: Begin to run Node-_input
2025-04-16 18:46:33,961 - 26088796160 - node.py-node:167 - INFO: Begin to run Node-read_video_search_csv-0
2025-04-16 18:46:33,962 - 26105622528 - node.py-node:167 - INFO: Begin to run Node-video-text-embedding/clip4clip-1
2025-04-16 18:46:33,962 - 18687864832 - node.py-node:167 - INFO: Begin to run Node-ann-search/milvus-client-2
2025-04-16 18:46:33,963 - 26122448896 - node.py-node:167 - INFO: Begin to run Node-lambda-3
2025-04-16 18:46:33,963 - 26139275264 - node.py-node:167 - INFO: Begin to run Node-_output


Finished loading data into msrvtt_vid_hyp_tun_12
Searching data in msrvtt_vid_hyp_tun_6


2025-04-16 18:47:40,700 - 26197651456 - node.py-node:167 - INFO: Begin to run Node-_input
2025-04-16 18:47:40,701 - 26214477824 - node.py-node:167 - INFO: Begin to run Node-read_video_search_csv-0
2025-04-16 18:47:40,701 - 26231304192 - node.py-node:167 - INFO: Begin to run Node-video-text-embedding/clip4clip-1
2025-04-16 18:47:40,702 - 26248130560 - node.py-node:167 - INFO: Begin to run Node-ann-search/milvus-client-2
2025-04-16 18:47:40,702 - 26197651456 - node.py-node:167 - INFO: Begin to run Node-lambda-3
2025-04-16 18:47:40,702 - 26264956928 - node.py-node:167 - INFO: Begin to run Node-_output


Finished searching data in msrvtt_vid_hyp_tun_6
Searching data in msrvtt_vid_hyp_tun_9


2025-04-16 18:48:54,391 - 26323890176 - node.py-node:167 - INFO: Begin to run Node-_input
2025-04-16 18:48:54,392 - 26440445952 - node.py-node:167 - INFO: Begin to run Node-read_video_search_csv-0
2025-04-16 18:48:54,392 - 26457272320 - node.py-node:167 - INFO: Begin to run Node-video-text-embedding/clip4clip-1
2025-04-16 18:48:54,393 - 26474098688 - node.py-node:167 - INFO: Begin to run Node-ann-search/milvus-client-2
2025-04-16 18:48:54,393 - 26323890176 - node.py-node:167 - INFO: Begin to run Node-lambda-3
2025-04-16 18:48:54,393 - 26490925056 - node.py-node:167 - INFO: Begin to run Node-_output


Finished searching data in msrvtt_vid_hyp_tun_9
Searching data in msrvtt_vid_hyp_tun_12
Finished searching data in msrvtt_vid_hyp_tun_12


In [35]:
!mkdir hyperparam_tuning

In [None]:
# TODO save these results to CSV file

TypeError: Object of type DataFrame is not JSON serializable

In [None]:
for uv, res in experiment_query_results.items():
    print(f"Results for uts value = {uv}")
    print(get_all_eval_scores(res))

Results for uts value = 6
{'recall@1': 0.411, 'recall@5': 0.708, 'recall@10': 0.795, 'map': 0.5326599206349206, 'ndcg@1': 0.411, 'ndcg@5': 0.5676151896815294, 'ndcg@10': 0.595778355143179}
Results for uts value = 9
{'recall@1': 0.417, 'recall@5': 0.705, 'recall@10': 0.805, 'map': 0.5381325396825392, 'ndcg@1': 0.417, 'ndcg@5': 0.5695032823417581, 'ndcg@10': 0.6021801975658738}
Results for uts value = 12
{'recall@1': 0.426, 'recall@5': 0.716, 'recall@10': 0.814, 'map': 0.5456543650793645, 'ndcg@1': 0.426, 'ndcg@5': 0.5780321865313451, 'ndcg@10': 0.6100154270801753}


In [33]:
# Present results in a pandas dataframe table
experiment_metrics = {uv: get_all_eval_scores(res) for uv, res in experiment_query_results.items()}
experiment_metrics_df = pd.DataFrame(experiment_metrics)
# flip rows and columns
experiment_metrics_df = experiment_metrics_df.transpose()
experiment_metrics_df = experiment_metrics_df.rename(columns={'uts_value': 'UTS Value'})
experiment_metrics_df

Unnamed: 0,recall@1,recall@5,recall@10,map,ndcg@1,ndcg@5,ndcg@10
6,0.411,0.708,0.795,0.53266,0.411,0.567615,0.595778
9,0.417,0.705,0.805,0.538133,0.417,0.569503,0.60218
12,0.426,0.716,0.814,0.545654,0.426,0.578032,0.610015


In [26]:
import pandas as pd
import numpy as np


def twohee_data_col_to_df(twohee_data_collection):
    res_list = twohee_data_collection.to_list()
    res_obj_list = []
    for r in res_list:
        res_obj = vars(r)
        res_obj_list.append(res_obj)
    res_df = pd.DataFrame(res_obj_list)
    # Add ground truth column
    res_df['ground_truth'] = res_df['rel_video_id'].apply(
        lambda x: int(x[len('video'):]))

    # TODO add conditional for frame_id here
    return res_df.copy()


def average_precision(ground_truth, predictions):
    """
    Calculate the Average Precision (AP) for a single query.

    Args:
        ground_truth (int): The ground truth video ID.
        predictions (list): List of predicted video IDs.

    Returns:
        float: The Average Precision (AP) score for the query.
    """
    hits = 0
    sum_precision = 0
    for i, pred in enumerate(predictions):
        if pred == ground_truth:
            hits += 1
            sum_precision += hits / (i + 1)
    return sum_precision / hits if hits > 0 else 0


def calculate_mean_average_precision(df):
    """
    Calculate the Mean Average Precision (MAP) for the given dataframe.

    Args:
        df (pd.DataFrame): DataFrame containing columns 'query', 'ground_truth', 'top1', 'top5', 'top10'.

    Returns:
        float: The Mean Average Precision (MAP) score.
    """
    # Calculate AP for each query
    ap_scores = []
    for _, row in df.iterrows():
        ground_truth = row['ground_truth']
        predictions_with_scores = row['top10']
        predictions = [pred[0] for pred in predictions_with_scores]
        ap_scores.append(average_precision(ground_truth, predictions))

    # Calculate MAP
    mean_ap = sum(ap_scores) / len(ap_scores) if ap_scores else 0
    return mean_ap


def calculate_recall(df):
    """
    Calculate recall@1, recall@5, and recall@10 for the given dataframe.

    Args:
        df (pd.DataFrame): DataFrame containing columns 'query', 'ground_truth', 'top1', 'top5', 'top10'.

    Returns:
        dict: A dictionary containing recall@1, recall@5, and recall@10.
    """
    recall_at_1 = 0
    recall_at_5 = 0
    recall_at_10 = 0
    total_queries = len(df)

    for _, row in df.iterrows():
        ground_truth = row['ground_truth']
        if ground_truth in [pred[0] for pred in row['top1']]:
            recall_at_1 += 1
        if ground_truth in [pred[0] for pred in row['top5']]:
            recall_at_5 += 1
        if ground_truth in [pred[0] for pred in row['top10']]:
            recall_at_10 += 1

    return {
        'recall@1': recall_at_1 / total_queries,
        'recall@5': recall_at_5 / total_queries,
        'recall@10': recall_at_10 / total_queries
    }


def ndcg_score(ground_truth, predictions, k=10):
    """
    Calculate the Normalized Discounted Cumulative Gain (NDCG) for a single query.

    Args:
        ground_truth (int): The ground truth video ID.
        predictions (list): List of predicted video IDs with scores [(id, score), ...].
        k (int): The number of top predictions to consider.

    Returns:
        float: The NDCG score for the query.
    """
    def dcg(relevance_scores):
        return sum(rel / np.log2(idx + 2) for idx, rel in enumerate(relevance_scores))

    # Relevance scores: 1 if the prediction matches the ground truth, else 0
    relevance_scores = [1 if pred[0] ==
                        ground_truth else 0 for pred in predictions[:k]]

    # Calculate DCG and IDCG
    actual_dcg = dcg(relevance_scores)
    ideal_dcg = dcg(sorted(relevance_scores, reverse=True))

    # Return NDCG
    return actual_dcg / ideal_dcg if ideal_dcg > 0 else 0

# call this function to get the NDCG score for each query


def calculate_ndcg(df, k=10):
    """
    Calculate NDCG for the given dataframe.

    Args:
        df (pd.DataFrame): DataFrame containing columns 'query', 'ground_truth', 'top1', 'top5', 'top10'.
        k (int): The number of top predictions to consider.

    Returns:
        float: The mean NDCG score.
    """
    ndcg_scores = []
    for _, row in df.iterrows():
        ground_truth = row['ground_truth']
        predictions_with_scores = row['top10']
        ndcg_scores.append(ndcg_score(
            ground_truth, predictions_with_scores, k))

    return sum(ndcg_scores) / len(ndcg_scores) if ndcg_scores else 0


def get_all_eval_scores(df):
    """Return a dataframe with all evaluation scores: Recall@1, Recall@5, Recall@10, MAP, NDCG@1, NDCG@5, NDCG@10"""
    recall_scores = calculate_recall(df)
    map_score = calculate_mean_average_precision(df)
    ndcg_score_1 = calculate_ndcg(df, k=1)
    ndcg_score_5 = calculate_ndcg(df, k=5)
    ndcg_score_10 = calculate_ndcg(df, k=10)

    eval_scores = {
        'recall@1': recall_scores['recall@1'],
        'recall@5': recall_scores['recall@5'],
        'recall@10': recall_scores['recall@10'],
        'map': map_score,
        'ndcg@1': ndcg_score_1,
        'ndcg@5': ndcg_score_5,
        'ndcg@10': ndcg_score_10
    }

    return eval_scores
