In [1]:
from IPython.display import display, Latex
import numpy as np
import pandas as pd
import os

from sklearn.metrics.pairwise import cosine_similarity
from similarity_summary_eng import read_data, cosine_search_output, prepare_search
from similarity_summary_eng import evaluate as eval_orig

df_all = read_data()
df_all.shape

(317, 31)

In [2]:
# used for ease of reading
# https://stackoverflow.com/questions/39473297/how-do-i-print-colored-output-with-python-3
from typing import Any

class color_printing():
    END = "\x1b[0m"
    Italic = "\x1b[3m"
    # If Foreground is False that means color effect on Background

    def RGB(R, G, B, Foreground = True): # R: 0-255  ,  G: 0-255  ,  B: 0-255
        FB_G = 38 # Effect on foreground
        if Foreground != True:
            FB_G = 48 # Effect on background
        return "\x1b[" + str(FB_G) + ";2;" + str(R) + ";" + str(G) + ";" + str(B) + "m"
    
    def print_string(text, colours):
        return color_printing.RGB(*colours) + text + color_printing.END
    
class exampleCalculation():
    def evaluate(self, retrieval_scores, missed_retrieval_scores, top_k):
        metrics = {}

        #### precision 
        metrics[f"precision@{top_k}"] = round(np.mean(np.sum(retrieval_scores, axis = 1)/top_k, 0), 3)

        #### mean average precision for a retrieval window of k = 5
        average_precision = np.sum([(np.sum(retrieval_scores[:, :k], axis = 1)/k) * retrieval_scores[:, k-1] for k in range(1, top_k+1)])/top_k
        metrics[f"map@{top_k}"] = round(average_precision/len(retrieval_scores), 3)
        
        #### MRR and MRR missed
        aux_retrieval_scores = np.hstack([np.zeros((retrieval_scores.shape[0], 1)), retrieval_scores])
        rank = np.argmax(aux_retrieval_scores, axis = 1) # get first non-zero rank
        missed = np.min([top_k - np.count_nonzero(retrieval_scores, axis = 1), np.sum(missed_retrieval_scores, axis = 1)], axis = 0)
        running_mrr = np.sum(np.divide(1, rank, where = rank != 0)) # rank starts at 0 bcs. of this so rank + 1
        running_mrr_miss = np.sum(np.divide(1, rank + missed, where = rank != 0)) # rank starts at 0 bcs. of this so rank + 1
        metrics[f"mrr@{top_k}"] = round(running_mrr/(len(retrieval_scores)), 3)
        metrics[f"mrr_miss@{top_k}"] = round(running_mrr_miss/len(retrieval_scores), 3)

        #### hitrate
        metrics[f"hitrate@{top_k}"] = round(np.sum(np.any(retrieval_scores, axis = 1))/len(retrieval_scores), 3)

        # display of formula computations to facilitate understanding
        #### precision
        running_precision_latex = [r"\frac{%d}{%d}" % (np.sum(retrieved), top_k) for retrieved in retrieval_scores]
        display(Latex("Precision@%d $= \\frac{\\#\\text{relevant retrieved cases}}{\\#\\text{total retrieved cases}} = " % (top_k) + "\\frac{{1}}{%d}\cdot(%s)" % (len(running_precision_latex), '+'.join(running_precision_latex)) + f"={metrics[f'precision@{top_k}']}$"))

        #### MRR
        sum_mrr_latex = [r'\frac{%d}{%d}' % (1, np.argmax(np.hstack([[0], retrieved]))) for retrieved in retrieval_scores]
        running_mrr_latex = "\\frac{{1}}{%d}\cdot(%s)" % (retrieval_scores.shape[0], '+'.join(sum_mrr_latex))
        display(Latex("MRR@%d $= \\frac{1}{\\text{\\# queries}}\\sum_q^Q \\frac{1}{rank_q}=" % (top_k) + running_mrr_latex + f"= {metrics[f'mrr@{top_k}']}$"))
        
        ### MRR miss
        sum_mrr_miss_latex = [r'\frac{%d}{%d+%d}' % (1, np.argmax(np.hstack([[0], retrieved])), np.min([top_k - np.count_nonzero(retrieved), np.sum(missed)])) for retrieved, missed in zip(retrieval_scores, missed_retrieval_scores)]
        running_mrr_miss_latex = "\\frac{{1}}{%d}\cdot(%s)" % (retrieval_scores.shape[0], '+'.join(sum_mrr_miss_latex))
        display(Latex("MRR missed@%d $= \\frac{1}{\\text{\\# queries}}\\sum_q^Q \\frac{1}{rank_q+miss_q}=" % (top_k) + running_mrr_miss_latex + f"= {metrics[f'mrr_miss@{top_k}']}$"))

        ### hitrate
        running_hitrate_latex = ["%d" % (np.any(retrieved)) for retrieved in retrieval_scores]
        display(Latex("Hitrate@%d $=\\frac{1}{\\text{\\# queries}}\\sum_q^Q any\\_rel(q)=" % (top_k) + "\\frac{{1}}{%d}\cdot (%s)" % (len(retrieval_scores), '+'.join(running_hitrate_latex)) + f" = {metrics[f'hitrate@{top_k}']}$"))

        ### average precision
        running_ap_latex = ["[" + "+".join(['\\frac{%d}{%d}\cdot%d' % (np.sum(retrieved[:k]), k, retrieved[k-1]) for k in range(1, top_k+1)]) + "]" for retrieved in retrieval_scores]
        running_ap_latex_aux_calculations = np.round([[np.sum(retrieved[:k])/k*retrieved[k-1] for k in range(1, top_k+1)] for retrieved in retrieval_scores], 3)
        display(Latex("MAP@%d $ = \\frac{1}{\\text{\\# queries}}\\sum_q^Q \\frac{1}{\\text{top\\_k}}\sum_{k=1}^{\\text{top\\_k}}P_q(k)\\cdot rel_q(k)=" % (top_k) + "\\frac{{1}}{%d}(%s)" % (len(retrieval_scores), '+'.join(running_ap_latex)) + "= \\frac{{1}}{%d}" % (len(retrieval_scores))+ f"({'+'.join(np.sum(running_ap_latex_aux_calculations, axis = 1).astype(str))}) = {metrics[f'map@{top_k}']}$"))

        return metrics

    def example(self, sample_case, search_corpus, query_type, top_k, model):
        # name mapping to disambiguate what the system used as the ground truth
        print(top_k)
        query_map = {
            "celex": "CELEX ID",
            "citation_article": "article-only EU citation"
        }

        if len(sample_case) < 2:
            cosine_scores = cosine_similarity(np.asarray(sample_case[model].tolist()).reshape(1,-1), np.asarray(search_corpus[model].tolist()))
        else:
            cosine_scores = cosine_similarity(np.asarray(sample_case[model].tolist()), np.asarray(search_corpus[model].tolist()))

        retrieval_scores = []
        missed_retrieval_scores = []

        for idx in range(sample_case.shape[0]):
            all_matches = np.argsort(cosine_scores[idx])[::-1]

            best_matches = all_matches[1:top_k+1]
            query_celex = sample_case.iloc[idx][query_type]

            print(f"CELEX ID of query case = {color_printing.print_string(str(query_celex), colours = (255, 150, 0))}")
            print("---------------")

            # see which top_k retrieved documents were retrieved
            score = []
            for retrieved_case in best_matches:
                retrieved_celex = search_corpus.iloc[retrieved_case][query_type]
                score.append(int(len(query_celex & retrieved_celex) > 0))

                print(f"CELEX IDs of retrieved case = {color_printing.print_string(str(retrieved_celex), colours = (255, 150, 0))} | They share at least one {color_printing.print_string(query_map[query_type], colours = (0, 150, 0))} = {color_printing.print_string(str(len(retrieved_celex & query_celex) > 0), colours = (255, 150, 0))} ===>> Retrieval score becomes = {color_printing.print_string(str(score), colours = (255, 150, 0))}")

            ### find all relevant cases that were missed
            missed_matches = []
            # for retrieved_case in all_matches[1:len(df_temp)]:
            for retrieved_case in all_matches[1:]:
                retrieved_celex = search_corpus.iloc[retrieved_case][query_type]
                missed_matches.append(int(len(query_celex & retrieved_celex) > 0))

            print("=====================")
            print(f"Final retrieval score = {color_printing.print_string(str(score), colours = (255, 150, 0))} ==>> There are {color_printing.print_string(str(np.sum(score)), colours = (255, 150, 0))} retrieved cases which share the same {color_printing.print_string(query_map[query_type], colours = (0, 150, 0))} as the query case, out of k = {color_printing.print_string(str(top_k), colours = (255, 150, 0))}")
            print(f"Number of cases relevant to the query, which are present in the entire dataset but not in the retrieval window = {color_printing.print_string(str(np.sum(missed_matches)), colours = (255, 150, 0))}; however, there are {color_printing.print_string(str(top_k - np.count_nonzero(score)), colours = (255, 150, 0))} retrieved non-relevant cases")

            score = np.asarray(score)
            missed_matches = np.asarray(missed_matches)
            retrieval_scores.append(score)
            missed_retrieval_scores.append(missed_matches)

        metrics = self.evaluate(np.asarray(retrieval_scores), np.asarray(missed_retrieval_scores), top_k = top_k)
            
        return metrics
    
    def __call__(self, *args: Any, **kwds: Any) -> Any:
        self.example(*args)

In [3]:
example_calc = exampleCalculation()
example_calc(df_all.iloc[7:10], df_all, "citation_article", 5, "embedding_multi-qa-mpnet-base-dot-v1")

5
CELEX ID of query case = [38;2;255;150;0m{'32002F0584'}[0m
---------------
CELEX IDs of retrieved case = [38;2;255;150;0m{'32002F0584'}[0m | They share at least one [38;2;0;150;0marticle-only EU citation[0m = [38;2;255;150;0mTrue[0m ===>> Retrieval score becomes = [38;2;255;150;0m[1][0m
CELEX IDs of retrieved case = [38;2;255;150;0m{'32002F0584'}[0m | They share at least one [38;2;0;150;0marticle-only EU citation[0m = [38;2;255;150;0mTrue[0m ===>> Retrieval score becomes = [38;2;255;150;0m[1, 1][0m
CELEX IDs of retrieved case = [38;2;255;150;0m{'32002F0584'}[0m | They share at least one [38;2;0;150;0marticle-only EU citation[0m = [38;2;255;150;0mTrue[0m ===>> Retrieval score becomes = [38;2;255;150;0m[1, 1, 1][0m
CELEX IDs of retrieved case = [38;2;255;150;0m{'32002F0584.Article_4 bis'}[0m | They share at least one [38;2;0;150;0marticle-only EU citation[0m = [38;2;255;150;0mFalse[0m ===>> Retrieval score becomes = [38;2;255;150;0m[1, 1, 1, 0][0m
CELE

<IPython.core.display.Latex object>

<IPython.core.display.Latex object>

<IPython.core.display.Latex object>

<IPython.core.display.Latex object>

<IPython.core.display.Latex object>

In [11]:
import unittest

class unitTestSimilarity(unittest.TestCase):
    def __init__(self, methodName: str = "runTest") -> None:
        super().__init__(methodName)
        self._get_results()

    def _get_results(self):
        embds_query, embds_search = prepare_search(df_all, "embedding_multi-qa-mpnet-base-dot-v1")
        self.retrieval_scores, self.missed_retrieval_scores, self.cosine_scores, self.idx_retrieved_cases = cosine_search_output(embds_query, embds_search, "celex", df_all, 5, 0, True)
    
    def test_similarity_order(self):
        # check if similarity of retrieved scores is indeed from most similar to least
        for (idx_query, idx_list) in self.idx_retrieved_cases:
            for idx_1, idx_2 in zip(idx_list[:-1], idx_list[1:]):
                self.assertGreaterEqual(self.cosine_scores[idx_query][idx_1], self.cosine_scores[idx_query][idx_2])

    def test_retrieval_label(self):
        # check if label of a query case and a retrieved one do share at least one label
        for (idx_query, idx_list) in self.idx_retrieved_cases:
            for idx, label in zip(idx_list, self.retrieval_scores[idx_query]):
                if label: # only assert if there is a positive label in the first place
                    self.assertGreater(len(df_all.iloc[idx]["celex"] & df_all.iloc[idx_query]["celex"]), 0)

unittest.main(argv=[''], verbosity=2, exit=False)

test_retrieval_label (__main__.unitTestSimilarity.test_retrieval_label) ... ok
test_similarity_order (__main__.unitTestSimilarity.test_similarity_order) ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.038s

OK


<unittest.main.TestProgram at 0x70b06d183d50>