In [None]:

import os, sys
sys.path.append('../') 
import os
from helper import utils
import pandas as pd

In [None]:


def create_dict(df, columns, value=None):
    """
    Create a dictionary where the keys are tuples of values from the specified columns,
    and the values are set to the passed value (default is 1).

    df: The input DataFrame.
    columns: column names to use as keys.
    
    return: A dictionary with tuple keys
    """
    if len(columns) < 1:
        raise ValueError("At least one column must be specified.")
    
    # Use zip to pair values from the specified columns
    dict_result = {}
    for row in df.itertuples():
        key_list = []
        for column in columns:
            key_list.append(getattr(row, column))
        key = tuple(key_list)
        if value is None:
            dict_result[key] = row
        else:
            dict_result[key] = value

    # dict_result = {tuple(row): value for row in df[columns].itertuples(index=False, name=None)}
    
    return dict_result


def check_save_dir(save_file):
    # Create the output directory if it doesn't exist
    if not os.path.exists(os.path.dirname(save_file)):
        os.makedirs(os.path.dirname(save_file), exist_ok=True)



def check_run_ranking(df):
    # Sort by qid (ascending) and score (descending within each qid group)
    df_sorted = df.sort_values(by=['qid', 'score'], ascending=[True, False])

    # Make the ranking within each 'qid' group starting from 1
    df_sorted['rank'] = df_sorted.groupby('qid').cumcount() + 1
    df_sorted = df_sorted.reset_index(drop=True)
    return df_sorted


def sort_run_by_rank(run, save_path=""):
    df = utils.read_run(run)
        # Get the sorted unique values of the third column dynamically
    rank_sorted_values = sorted(df['rank'].unique())

    # print(rank_sorted_values)
    # Sort dynamically based on unique rank values
    df_sorted = pd.concat([df[df['rank'] == val] for val in rank_sorted_values])

    # Reset index for cleaner output
    df_sorted = df_sorted.reset_index(drop=True)
    if save_path != "":
        df_sorted.to_csv(save_path, index=False, header=False, sep="\t")

    return df_sorted



def make_pool_rrf(list_of_runs, topX=500, rrf_den=60):
    """
        topX = Number of documents per qid. Default: 500.
        rrf_den = Value for the Reciprocal Rank Fusion denominator. Default is 60 as in the original paper:
        Reciprocal Rank Fusion outperforms Condorcet and individual Rank Learning Methods. G. V. Cormack. University of Waterloo. Waterloo, Ontario, Canada.
    """

    big_df = pd.DataFrame(columns=["qid", "docno", "rbp_value"])
    run_length = 0
    for run in list_of_runs:
        df = utils.read_run(run)
        df['qid'] = df['qid'].astype(str)
        df['docno'] = df['docno'].astype(str)
        df = check_run_ranking(df)
        # NOTE: Everything is made based on the rank col. It HAS TO start by '1'
        df["rrf_value"] = 1.0 / (rrf_den + df["rank"])
        df = df.groupby('qid').head(topX)
        run_length += len(df)

        # Concatenate all dfs into a single big_df
        big_df = pd.concat([big_df, df[["qid", "docno", "rrf_value"]]], ignore_index=True)
        # big_df = pd.concat((big_df, df[["qid", "docno", "rrf_value"]]), sort=True)

    big_df['qid'] = big_df['qid'].astype(str)
    big_df['docno'] = big_df['docno'].astype(str)
    # Default startegy is the sum.
    # grouped_by_docno = big_df.groupby(["qid", "docno"])["rrf_value"].sum().reset_index()
    grouped_by_docno = big_df.groupby(["qid", "docno"], as_index=False)['rrf_value'].sum()

    # Sort documents by rbp value inside each qid group
    grouped_by_docno.sort_values(by=["qid", "rrf_value"], ascending=[True, False], inplace=True)

    # Selects only the top X from each query
    # result = grouped_by_docno.groupby("qid").head(topX)

    # Transform pandas data into a dictionary
    pool = {}
    pool_with_rrf = {}
    query_doc_dict = {}
    for row in grouped_by_docno[["qid", "docno", "rrf_value"]].itertuples():
        # q = int(row.qid)
        qid = str(row.qid)
        rrf_value = row.rrf_value
        docno = str(row.docno)
        if qid not in pool:
            pool[qid] = set([])
            pool_with_rrf[qid] = {}

        if docno not in pool[qid]:
            pool[qid].add(docno)
            pool_with_rrf[qid].update({docno: rrf_value})


        line_pair = (qid, docno)
        # print(int(row.rank))
        # assert rank == int(row.rank)
        # rank += 1
        if query_doc_dict.get(line_pair, 0) == 0:
            query_doc_dict[line_pair] = 1

    dict_len = 0
    for qid in pool_with_rrf.keys():
        dict_len += len(pool_with_rrf[qid])
    
    unique_cnt = len(grouped_by_docno)
    assert unique_cnt == dict_len
    print(f"Number of rows of the joined dataframe (big_df) at depth {topX} is {len(big_df)}, unique pairs = {len(grouped_by_docno)}, num queries = {len(pool_with_rrf)}")
    return pool_with_rrf, run_length, unique_cnt
    



def save_pool_as_trec_run(pool_with_rrf, run_file, tag='unified_pool', sort_by_rank=True):
    
    check_save_dir(run_file)
    with open(run_file, "w") as fout:
        for qid, document_rrf in sorted(iter(list(pool_with_rrf.items())), key=lambda x: x[0]):
            # document_rrf = dict(document_rrf)
            rank = 1
            for docno, rrf in sorted(iter(list(document_rrf.items())),  key=lambda x: x[1], reverse=True):
                rrf = round(rrf, 6)
                line = f"{str(qid)}\tQ0\t{docno}\t{rank}\t{rrf}\t{tag}\n"
                rank += 1
                fout.write(line)
    print(f"Unified run was saved to {run_file}")
    if sort_by_rank:
        sort_run_by_rank(run_file, save_path=run_file)


def map_query_var_to_orig_query(rrf_run, save_path):
    df = utils.read_run(rrf_run)
    query_doc_dict = create_dict(df, columns=['qid', 'docno'])

    orig_qids = []
    qids = []
    unique_rows = []
    unique_query_doc_dict = {}
    for (qid, docno), row in query_doc_dict.items():
        qid_orig = qid if len(qid.split("_")) <= 1 else "QV_"+qid.split("_")[1]
        qid = qid.split("_")[0]
        qid = str(qid)
        qid_orig = str(qid_orig)
        if unique_query_doc_dict.get((qid, docno), 0) == 0:
            unique_query_doc_dict[(qid, docno)] = row
            qids.append(qid)
            unique_rows.append(row)
            orig_qids.append(qid_orig)


    df_unified = pd.DataFrame(unique_rows)
    df_unified['qid'] = qids # store the 
    df_unified['Q0'] = orig_qids # store the 
    # print(df_unified.info())
    df_unified = df_unified.drop(['Index'], axis=1) # drop the index column created by pandas
    df_unified.to_csv(save_path, index=False, header=False, sep="\t")
    print(f"Number of unique query-document pairs after mapping query variations to original query ids is {len(unique_query_doc_dict)}")
    return df_unified, unique_query_doc_dict


def get_model_runs(tr_model, runs_dir):
    run_names = []
    for root, dirs, files in os.walk(runs_dir):
        for filename in files:
            run_file = os.path.join(root, filename)
            if filename.startswith(tr_model) and filename.endswith(".tsv"):
                run_names.append(run_file)

    return run_names





def convert_query_var_to_separate_runs(rrf_run, save_dir):
    df = utils.read_run(rrf_run)
    qids = []
    qids = list(df['qid'].unique())
    df_copy = df.copy()
    for qid in qids:
        qid_orig = str(qid.split("_")[0])
        df_q = df.loc[df.qid == qid, :].copy()
        df_q['qid'] = [qid_orig for _ in range(len(df_q))]
        df_q['Q0'] = [qid for _ in range(len(df_q))]
        run_path = f"{save_dir}/{qid_orig}/{qid}.tsv"
        check_save_dir(run_path)
        df_q.to_csv(run_path, sep='\t', index=False, header=False)

    assert df.equals(df_copy) == True
    return 

def get_runs_in_dir(runs_dir):
    run_names = []
    for root, dirs, files in os.walk(runs_dir):
        for filename in files:
            run_file = os.path.join(root, filename)
            if filename.endswith(".tsv"):
                run_names.append(run_file)
    return run_names


def fuse_runs_per_topic(model_runs_dir, fused_save_dir):
    model_topics_dir = [(dir, os.path.join(model_runs_dir, dir)) for dir in os.listdir(model_runs_dir) if os.path.isdir(os.path.join(model_runs_dir, dir))]
    # print(len(model_topics_dir))
    total_len = 0
    total_unique = 0
    for (topic, topic_runs_dir) in model_topics_dir:
        print(f"Merging the runs of topic {topic} from directory: {topic_runs_dir}")
        topic_runs = get_runs_in_dir(topic_runs_dir)
        topic_fused_run = f"{fused_save_dir}/{topic}.tsv"
        check_save_dir(topic_fused_run)
        pool_with_rrf, run_length, unique_cnt = make_pool_rrf(filenames=topic_runs, topX=1000, rrf_den=60)
        total_len += run_length
        total_unique += unique_cnt
        save_pool_as_trec_run(pool_with_rrf, run_file=topic_fused_run, tag='fused_topic', sort_by_rank=False)

    print(f"total_len = {total_len}, total_unique= {total_unique}")
    return 



def fuse_topics_into_run(topics_dir, run_path, num_of_parts=1):
    '''
    topics_dir: directory containg the runs of each topic (should have 100 file per transcription model)
    run_path: path to save the combined run
    num_of_parts: number of files needed to split the combined run to.
    '''
    model_runs = get_runs_in_dir(topics_dir)
    df_merged = pd.DataFrame()
    cnt = 0
    for run in model_runs:
        df_run = utils.read_run(run)
        cnt += len(df_run)
        df_merged = pd.concat([df_merged, df_run], ignore_index=True)
        # print(f"{len(df_run)} rows read from run {run}")
    
    # assert cnt == len(df_merged)
    print(f"Number of rows is {len(df_merged)} after merging runs from {topics_dir}")

    # sort the merged dataframe by rank so that top scored segment per query comes first, and then second top segment and so on
    rank_sorted_values = sorted(df_merged['rank'].unique())
    # Sort dynamically based on unique rank values
    df_merged = pd.concat([df_merged[df_merged['rank'] == val] for val in rank_sorted_values])
    # Reset index for cleaner output
    # df_merged = df_merged.reset_index(drop=True)

    start_idx = 0
    end_idx = len(df_merged) -1
    step = int(len(df_merged)/ num_of_parts + 1)
    for i in range(num_of_parts):
        start_idx = i * step
        end_idx = min(len(df_merged), start_idx + step - 1)
        run_part = f"{run_path}_p{i+1}.tsv"
        check_save_dir(run_part)
        df_merged.iloc[start_idx:end_idx].to_csv(run_part, index=False, sep='\t', header=False)
        print(f"part {i+1} will be from row {start_idx} to {end_idx} and will be saved to {run_part}")



def form_pairs(input_file, query_dict, doc_dict, result, logger, input_type='run', save_format='jsonl', clean_text=True):
    if input_type == "run":
        df_input = utils.read_run(input_file)
    else:
        df_input = utils.read_qrels(input_file)
    
    logger.info(f"Loaded the query and input files successfully.")
    df_input['q_description'] = df_input['qid'].map(query_dict)
    df_input['doc_text'] = df_input['docno'].map(doc_dict)
    if clean_text:
        df_input['doc_text'] = df_input['doc_text'].apply(utils.clean_string)

    save_columns = ["qid", "docno", "q_description", "doc_text"]

    if save_format =='jsonl':
        df_input[save_columns].to_json(result, index=False, orient='records', lines=True)
    else: # tsv
        df_input[save_columns].to_csv(result, index=False, sep='\t',)

    logger.info(f"Done writing {len(df_input)} pairs to {result}")


def load_query(query_file, clean_text=True):
    df_query = utils.read_query(query_file)
    if clean_text:
        df_query['description'] = df_query['description'].apply(utils.clean_string)
    query_dict = {}
    for row in df_query.itertuples():
        query_dict[row.qid] = row.description
    return query_dict


def load_corpus(corpus_file, logger):
    doc_dict = {}
    df_doc = utils.read_jsonl(corpus_file)
    logger.info(f"Done loading corpus from {corpus_file}.")
    for row in df_doc.itertuples():
        doc_dict[row.id] = getattr(row, "seg_words")
    return doc_dict





    

In [None]:

tr_models = ["spotify" ,
        "whisperX-base" ,
        "whisperX-large-v3",
        "silero-large",
        "silero-small"
        ]

project_dir = "your_project_dir"
pool_dir=f"{project_dir}/data/runs/pool"
rrf_dir = f"{project_dir}/data/runs/pool_rrf"
final_run_dir = f"{rrf_dir}/st4_final_runs"
pairs_dir = f"{rrf_dir}/st5_judgement_pairs"
query_file = f"{project_dir}/data/queries/podcasts_2020_and_2021_topics_test_with_description.tsv"
corpus_dir = f"{project_dir}/data/corpus"
log_dir =f"{project_dir}/data/logs/pool"

query_dict = load_query(query_file)


for tr_model in tr_models:
    model_runs = get_model_runs(tr_model, pool_dir)
    rrf_run = f'{rrf_dir}/st1_fuse_system_runs/{tr_model}_rrf.tsv'
    separated_topics_dir = f"{rrf_dir}/st2_fuse_query_variations/{tr_model}"
    fused_topics_dir = f"{rrf_dir}/st3_fuse_topics_runs/{tr_model}"
    final_run = f"{rrf_dir}/st4_final_runs/{tr_model}"

    print(f"Forming unified run for {tr_model} out of {len(model_runs)} runs and unified run will be saved to {rrf_run}")
    # 1. Fuse the pool runs using RRF
    pool_with_rrf, _, _ = make_pool_rrf(filenames=model_runs, topX=10, rrf_den=60)
    save_pool_as_trec_run(pool_with_rrf, run_file=rrf_run, tag='unified_pool', sort_by_rank=True)

    # 2. Converted query variations to their original query id and saved them as separate runs
    convert_query_var_to_separate_runs(rrf_run, save_dir=separated_topics_dir)

    # 3. Fused runs per topic
    fuse_runs_per_topic(model_runs_dir=separated_topics_dir, fused_save_dir=fused_topics_dir)

    # 4. Fuse the topics runs into one run (or multiple parts), and sort the result by rank, i.e., top scored segments first
    # change num_of_parts parameter value as needed
    fuse_topics_into_run(topics_dir=fused_topics_dir, run_path=final_run, num_of_parts=1)



for tr_model in tr_models:
    model_runs = get_model_runs(tr_model, final_run_dir)
    corpus=f"{corpus_dir}/{tr_model}_120_60_time_segment.jsonl"
    log_file = f"{log_dir}/form_pairs_{tr_model}.log"
    logger = utils.get_logger(log_file)
    doc_dict = load_corpus(corpus, logger)

    for run in model_runs:
        run_name = os.path.basename(run).split('.')[0]+"_pairs.jsonl"
        pairs_file = f"{pairs_dir}/{run_name}"
        logger.info(f"forming the pairs from run on: {run}, and pairs file will be saved to {pairs_file}")
        # 5. form the pairs files for LLM judgement, by including the query description and segment txt
        form_pairs(input_file=run, query_dict=query_dict, doc_dict=doc_dict, result=pairs_file, logger=logger,
                input_type='run', save_format='jsonl', clean_text=True)
    