# Similarities
When embeddings are created, similarity between images can be calculated. This needs to be done in batches as constructing a dataframe of shape `nr_articles x nr_articles` is too large to fit in memory. I created batches of `NR_ROWS_PER_BATCH` articles each. Each batch is written to a feather file. Afterwards, these feather files are trimmed to only keep the `NR_TO_KEEP` most similar articles. Two new csv files are created: one containing the indices of the `NR_TO_KEEP` most similar articles, while the other one contains the corresponding similarity values. The dimensions of the dataframes in these csv files are now small enough to join them into a single csv file per file type (one for the similarity values and one for the indices). Finally, it is more interesting to have a dataframe with similar `article_ids` rather than a dataframe with the indices of similar articles, so the indices are converted to their corresponding `article_id`.

In [None]:
import pandas as pd
import math, threading

from functions import idp, odp, create_directory_if_not_exists, list_directory_if_exists

from sklearn.metrics import pairwise_distances

In [None]:
NR_ROWS_PER_BATCH = 1000        # nr of rows to process in each batch in the calculation phase
NR_FILES_PER_THREAD = 10        # nr of files per thread in the trimming phase
NR_TO_KEEP = 100                # nr of similar articles to store for each article_id

In [None]:
def similarities_odp(filename, creation=False):
    """
    Get the filename including the path to store/open a file containing similarities.
    :param filename: the filename of the file
    :param creation: if True, the directory is 'similarities_creation' instead of 'similarities'
    :return: the filename with path
    """
    directory = 'similarities_creation' if creation else 'similarities'
    create_directory_if_not_exists(directory=directory)
    return odp(filename=f'{directory}/{filename}')

In [None]:
embeddings_directories = list_directory_if_exists(directory=odp(filename='embeddings'))
similarity_directories = list_directory_if_exists(directory=odp(filename='similarities'))

directories_to_process = [
    emb_filename for emb_filename in sorted(list(
        set([sim_filename.replace('embeddings', 'similarities') for sim_filename in embeddings_directories]) - set(similarity_directories)
    ))
]

## Similarity pipeline
### Calculate similarities in batches

In [None]:
article_df = pd.read_feather(idp(filename='articles_processed.feather'))
nr_articles = article_df.shape[0]

In [None]:
def calculate_similarities_for_batch(min_row_ind, max_row_ind, batch_nr, embeddings_df):
    """
    Extract the rows of the dataframe from index min_ind to index max_ind.
    Then, calculate pairwise similarities.
    The similarities for all selected rows are written to a file numbered by batch_nr.
    :param min_row_ind: smallest row index in the range to retrieve
    :param max_row_ind: largest row index in the range to retrieve
    :param batch_nr: nr of the batch (printing purposes only)
    :param embeddings_df: dataframe containing the embeddings
    """
    print(f"[=>    ] Started              : Batch {batch_nr} ({min_row_ind} --> {max_row_ind})")
    similarities = 1 - pairwise_distances(embeddings_df.iloc[min_row_ind:max_row_ind], embeddings_df, metric='cosine')
    similarity_df = pd.DataFrame(similarities)
    similarity_df['article_index'] = list(range(min_row_ind, max_row_ind))
    similarity_df.columns = similarity_df.columns.astype(str)
    similarity_df.to_feather(similarities_odp(filename=f'similarities_{batch_nr}.feather', creation=True))
    print(f"[=====>] Finished             : Batch {batch_nr} ({min_row_ind} --> {max_row_ind})")
    return

def run_batch_similarity_calculation(embeddings_df):
    """
    Calculate the similarities for all embeddings.
    Calculating the similarities for all embeddings at once requires a matrix with size nr_articles x nr_articles to be created, which is impossible with limited ram.
    Therefore, the calculation is split into batches of similarity_step rows.
    For each batch, the similarities are stored in a file numbered by batch_nr.
    :param embeddings_df: dataframe containing the embeddings
    """

    min_row_ind, max_row_ind = 0, NR_ROWS_PER_BATCH      # lower and upperbound of rows to extract within a thread
    batch_nr = 1                                         # only for progress printing

    while min_row_ind <= article_df.shape[0] and min_row_ind != max_row_ind:
        # similarity calculation
        calculate_similarities_for_batch(min_row_ind=min_row_ind, max_row_ind=max_row_ind, batch_nr=batch_nr, embeddings_df=embeddings_df)
        # update parameters
        min_row_ind, max_row_ind = max_row_ind, min(nr_articles, max_row_ind + NR_ROWS_PER_BATCH)
        batch_nr += 1

    return

### Trim similarities using multithreading

In [None]:
def trim_similarities_of_file(file_name, column_names, trimmed_column_names):
    """
    The similarities are divided over multiple files, which makes a simple lookup more complex.
    However, there is no need to keep *all* similarities, we usually don't want to do anything with the articles that are not similar at all.
    Therefore, only keep the nr_to_keep most similar articles (and the similarity score) for each article.
    The data is split into:
        - an indices dataframe  (giving information about the indices of similar articles within the article dataframe)
        - a values dataframe    (giving information about how similar articles are)
    Two new files are created to store this information.
    :param file_name: the name of the batch file to process.
    :param column_names: column names to take into account in the original dataframe
    :param trimmed_column_names: column names for the trimmed dataframe
    """
    similarities_df = pd.read_feather(similarities_odp(filename=file_name, creation=True))
    similarities_df = similarities_df[column_names].T

    result_values, result_indices = [], []

    # obtain all similar article indices and their corresponding similarity score
    for i in range(similarities_df.shape[1]):
        x = similarities_df[i].sort_values(ascending=False).head(NR_TO_KEEP)
        result_values.append(x.values)
        result_indices.append(x.index)

    # convert to dataframes
    sim_df = pd.DataFrame(data=result_values, columns=trimmed_column_names, index=[i for i in range(len(result_values))])
    ind_df = pd.DataFrame(data=result_indices, columns=trimmed_column_names, index=[i for i in range(len(result_indices))])
    sim_df = sim_df.reset_index()
    ind_df = ind_df.reset_index()
    sim_df.columns = sim_df.columns.astype(str)
    ind_df.columns = ind_df.columns.astype(str)

    # store the dataframes
    sim_df.to_feather(similarities_odp(filename=file_name.replace('.feather', '_values.feather'), creation=True))
    ind_df.to_feather(similarities_odp(filename=file_name.replace('.feather', '_indices.feather'), creation=True))
    return

def similarity_trimming_thread_function(filename_list, thread_nr, column_names, trimmed_column_names):
    """
    Execute the trim_similarities_of_file function for each file whose name is in the filename_list
    :param filename_list: a list of filenames
    :param thread_nr: the number of the thread
    :param column_names: column names to take into account in the original dataframe
    :param trimmed_column_names: column names for the trimmed dataframe
    """
    for filename in filename_list:
        print(f"[=>    ] Started              : Thread {thread_nr} - file {filename}")
        trim_similarities_of_file(file_name=filename, column_names=column_names, trimmed_column_names=trimmed_column_names)
        print(f"[=====>] Finished             : Thread {thread_nr} - file {filename}")
    return

def run_threaded_similarity_trimming():
    """
    To speed up calculations, split the trimming of similarity over different threads, where each thread processes files_per_thread files.
    """

    # divide the filenames in batches of files_per_thread filenames
    filenames = [f'similarities_{i + 1}.feather' for i in range(math.ceil(nr_articles / NR_ROWS_PER_BATCH))]

    nr_of_full_batches = len(filenames) // NR_ROWS_PER_BATCH
    thread_filenames = \
        [filenames[i * NR_FILES_PER_THREAD:(i + 1) * NR_FILES_PER_THREAD] for i in range(nr_of_full_batches)] + \
        [filenames[nr_of_full_batches * NR_FILES_PER_THREAD:]]

    column_names = [str(i) for i in range(nr_articles)]             # there is one column for each article, with column names 0, 1, 2,... (article indices)
    trimmed_column_names = [str(i) for i in range(NR_TO_KEEP)]      # same as columns but as we only keep the NR_TO_KEEP most similar articles

    # create threads
    nr_threads = len(thread_filenames)
    threads = list()
    for thread_index in range(nr_threads):
        print("Main    : created and started thread %d.", thread_index + 1)
        thread = threading.Thread(
            target=similarity_trimming_thread_function,
            args=(thread_filenames[thread_index], thread_index + 1, column_names, trimmed_column_names,)
        )
        threads.append(thread)
        thread.start()

    # join threads
    for thread_index, thread in enumerate(threads):
        print("Main    : next thread to join: %d", thread_index + 1)
        thread.join()
        print("Main    : thread %d done", thread_index + 1)

    return

### Join similarities

In [None]:
def join_similarity_files(sim_type, extended=False):
    """
    Join the similarity files of type indices or values into a single file containing all similarities of the 50 most similar articles
    :param sim_type: either 'indices' or 'values'.
    :param extended: additional to the image embedding, article properties were added
    """
    similarities_list = [
        pd.read_feather(similarities_odp(filename=f'similarities_{i + 1}_{sim_type}.feather', creation=True))
        for i in range(math.ceil(nr_articles / NR_ROWS_PER_BATCH))
    ]
    for i, sim in enumerate(similarities_list):
        sim.rename(columns={'Unnamed: 0': 'article_index'}, inplace=True)
        sim['article_index'] = NR_ROWS_PER_BATCH * i + sim['article_index']
    all_similarities = pd.concat(similarities_list, ignore_index=True)
    if 'Unnamed: 0' in all_similarities.columns.values:
        all_similarities = all_similarities.drop(['Unnamed: 0'], axis=1)
    if 'index' in all_similarities.columns.values:
        all_similarities = all_similarities.drop(['index'], axis=1)
    all_similarities.to_feather(similarities_odp(filename=f'{"extended_" if extended else ""}similarities_{sim_type}.feather'))
    return

### Convert indices to article_ids

In [None]:
def convert_indices_to_article_ids(filename):
    """
    For similarity calculation, indices of the articles were kept to identify an article. However, it makes things easier further on if we could work with article_ids directly. The indices are in this function converted to their corresponding article id.
    :param filename: the name of the file that contains the indices
    :return: the dataframe containing the article_ids instead of the indices
    """
    article_ids = article_df['article_id'].tolist()
    article_lookup = {i: article_id for i, article_id in enumerate(article_ids)}  # translation dictionary
    similarity_ind_df = pd.read_feather(filename)

    similarity_ind_df['article_index'] = similarity_ind_df['article_index'].apply(lambda x: article_lookup[x])
    for i in range(similarity_ind_df.shape[1] - 1):
        similarity_ind_df[str(i)] = similarity_ind_df[str(i)].apply(lambda x: article_lookup[x])
    similarity_ind_df.to_feather(filename.replace('indices', 'article_ids'))
    return

### Assembly
The whole pipeline is defined, now execute it for each embedding file that is not yet processed.

In [None]:
def process_embedding_file(filename):
    """
    Execute the similarity calculation pipeline for a single file
    :param filename: name of the file containing the embeddings
    """
    print(f'START {filename} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
    embeddings_df = pd.read_feather(filename)
    print(f'DONE step 1 / 6 ---------------------------------------------------------------------------------------------------------------------------------------')
    run_batch_similarity_calculation(embeddings_df=embeddings_df)
    print(f'DONE step 2 / 6 ---------------------------------------------------------------------------------------------------------------------------------------')
    run_threaded_similarity_trimming()
    print(f'DONE step 3 / 6 ---------------------------------------------------------------------------------------------------------------------------------------')
    join_similarity_files(sim_type='values', extended='extended' in filename)
    print(f'DONE step 4 / 6 ---------------------------------------------------------------------------------------------------------------------------------------')
    join_similarity_files(sim_type='indices', extended='extended' in filename)
    print(f'DONE step 5 / 6 ---------------------------------------------------------------------------------------------------------------------------------------')
    convert_indices_to_article_ids(filename=similarities_odp(filename=f'{"extended_" if "extended"  in filename else ""}similarities_indices.feather'))
    print(f'DONE step 6 / 6 ---------------------------------------------------------------------------------------------------------------------------------------')
    print(f'END {filename} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
    return

In [None]:
for directory_to_process in directories_to_process:
    embedding_files = [
        odp(filename=f'embeddings/{directory_to_process}/embeddings.feather'),
        odp(filename=f'embeddings/{directory_to_process}/extended_embeddings.feather')
    ]
    for embedding_file in embedding_files:
        process_embedding_file(filename=embedding_file)

In [None]:
%reset -f