# Learning with Massive Data
<p>
Assignment 3 - Similarity search for document pairs<br>
Giovanni Costa - 880892
</p>

<p>
<b>SPARK VERSION</b>
</p>

Contents: TODO: MODIFICARE
- [Document sparse representation](#doc_repr)
- [Sequential Implementation](#s_impl)
    - [Exact similarity search](#exact_s)
    - [Approximate similarity search](#approx_s)
- [Evaluations](#eval)

In [None]:
import os
import numpy as np
import pandas as pd
from utils import compute_sparse_repr, compute_cosine_similarity, eval_sol
from pyspark.sql import SparkSession
from pyspark.ml.linalg import SparseVector
from scipy.sparse import save_npz, load_npz

os.environ['PYSPARK_PYTHON'] = 'C:\ProgramData\mambaforge\envs\ML-base\python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:\ProgramData\mambaforge\envs\ML-base\Scripts\ipython.exe'

results="results/"

<a id="doc_repr"></a>
## Document sparse representation of subset

In [None]:
datasets=["datasets/nfcorpus/corpus.jsonl", "datasets/scifact/corpus.jsonl"]

In [None]:
df_sampled1=pd.read_json(datasets[0], lines=True).sample(500, axis=0, random_state=5)
df_sampled2=pd.read_json(datasets[1], lines=True).sample(500, axis=0, random_state=5)

In [None]:
sparse_repr_sampled1, vocab_sampled1=compute_sparse_repr(df_sampled1)
sparse_repr_sampled2, vocab_sampled2=compute_sparse_repr(df_sampled2)

In [None]:
pd.DataFrame(df_sampled1["_id"]).to_parquet(results+"ids_nfcorpus_sampled.parquet")
save_npz(results+"sparse_repr_nfcorpus_sampled.npz", sparse_repr_sampled1)

pd.DataFrame(df_sampled2["_id"]).to_parquet(results+"ids_scifact_sampled.parquet")
save_npz(results+"sparse_repr_scifact_sampled.npz", sparse_repr_sampled2)

## Computing ground thruth

In [None]:
thresholds=[0.3, 0.5, 0.8]
threshold=0.2

In [None]:
df_id1=pd.read_parquet(results+"ids_nfcorpus_sampled.parquet")["_id"]
df_id2=pd.read_parquet(results+"ids_scifact_sampled.parquet")["_id"]

In [None]:
sparse_repr1=load_npz(results+"sparse_repr_nfcorpus_sampled.npz")
sparse_repr2=load_npz(results+"sparse_repr_scifact_sampled.npz")
print(sparse_repr1.shape)
print(sparse_repr2.shape)

In [None]:
num_of_pairs, pairs=compute_cosine_similarity(sparse_repr2, df_id2, threshold, True)

In [None]:
pairs

In [None]:
num_of_pairs

## Spark 1 worker

In [None]:
spark = SparkSession.builder.appName("MyApp").getOrCreate()
sc=spark.sparkContext
spark

In [None]:
def csr_to_sparse_vector(row):
        return SparseVector(row.shape[1], list(zip(row.indices, row.data)))

def sparse_argsort(matrix, idx):
        row=matrix.getrow(idx)
        sorted_indices = np.argsort(row.data)[::-1]
        return row.indices[sorted_indices].tolist() #TODO: check this

def preprocessingForSpark(sparse_repr, ids):
    docs_sparse_forSpark = [csr_to_sparse_vector(sparse_repr.getrow(i)) for i in range(sparse_repr.shape[0])]
    sorted_index_term_doc=[sparse_argsort(sparse_repr, idx) for idx in range(sparse_repr.shape[0])]

    doc_ids = ids.reset_index(drop=True)
    d_star=sparse_repr.max(axis=0).toarray().reshape(-1)
    #d_star=d_star/np.linalg.norm(d_star, 2)
    
    res_forMap=[(doc_ids[i], (docs_sparse_forSpark[i], sorted_index_term_doc[i])) for i in range(sparse_repr.shape[0])]

    return res_forMap, d_star

In [None]:
res_forMap, d_star=preprocessingForSpark(sparse_repr2, df_id2)

In [None]:
""" docs_sparse_forSpark = [csr_to_sparse_vector(sparse_repr.getrow(i)) for i in range(sparse_repr.shape[0])]
sorted_index_term_doc=[sparse_argsort(sparse_repr, idx) for idx in range(sparse_repr.shape[0])] """

In [None]:
d_star_sc=sc.broadcast(d_star)
rdd_forMap=sc.parallelize(res_forMap)

In [None]:
def b_d(sparse_repr, term_order):
    #print(sparse_repr)
    sparse_repr_tmp=[sparse_repr[t] for t in term_order]
    #print(sparse_repr_tmp)
    d_star_tmp=[d_star_sc.value[t] for t in term_order]
    cum_sum=0
    index=0
    #print()

    for i in range(len(d_star_tmp)):
        mult_val=sparse_repr_tmp[i]*d_star_tmp[i]
        cum_sum+=mult_val
        index=i
        if cum_sum>=threshold:
            index=index-1
            break

    return index


def my_map(elem):
    result=[]
    doc_id=elem[0]
    sparse_repr=elem[1][0]
    sorted_index=elem[1][1]

    for i, t_idx in enumerate(sorted_index): #TODO: check
        if i>b_d(sparse_repr, sorted_index):
            result.append( (t_idx, (doc_id, sparse_repr)) )
    return result

rdd_forReduce=rdd_forMap.flatMap(my_map)

In [None]:
def max_of_intersection(list1, list2):
    max=0
    i = 0
    j = 0
    while i < len(list1) and j < len(list2):
        elem1=list1[i]
        elem2=list2[j]

        if elem1 == elem2:
            if elem1>max:
                max=elem1
            i += 1
            j += 1
        elif elem1 < elem2:
            i += 1
        else:
            j += 1

    return max


def my_reduce(elem):
    result=[]
    #For directly prune the symmetric pairs 
    #pairs_dict={}
    key=elem[0]
    values=elem[1]
    for id1, d1 in values:
        for id2, d2 in values:
            if id1!=id2 and key==max_of_intersection(d1.indices, d2.indices): #and (not pairs_dict.get((id2, id1), False))
                sim=round(d1.dot(d2), 4) #because vector are already normalized
                if sim>=threshold:
                    #pairs_dict[(id1, id2)]=True
                    result.append((id1, id2, sim))

    return result

result_pairs=rdd_forReduce.groupByKey().flatMap(my_reduce)

In [None]:
result_pairs=result_pairs.collect()

In [None]:
result_pairs

In [None]:
unique_pairs = set()
result = []
for d1, d2, score in result_pairs:
    sorted_pair = tuple(sorted((d1, d2)))
    if sorted_pair not in unique_pairs:
        result.append((sorted_pair[0],sorted_pair[1], score))
        unique_pairs.add(sorted_pair)

In [None]:
sorted(result)

In [None]:
len(result)

In [None]:
spark.stop()