# CLASSES

In [1]:
# take 2 dictionary and control if they are equal
def verify_equality_between_models(d1:dict, d2:dict, n_digit=10):
    # control if the keys of dicitonaries are the same
    print(f"Key are equal: {sorted(list(d1.keys())) == sorted(list(d2.keys()))}")
    # count number of mistake
    m_i = 0
    # control if each value for each key is equal for both models
    for k in d1.keys():
        s1 = sorted(d1[k])
        s2 = sorted(d2[k])
        s1 = [(d, round(v, n_digit)) for (d, v) in s1]
        s2 = [(d, round(v, n_digit)) for (d, v) in s2]
        if s1 != s2:
            # key with different elements in values
            print(k)
            m_i+=1
    print(f"Total mistake: {m_i}")

In [2]:
import numpy as np


class ReadData:
    __FOLDER = "C:/Users/zanna/PycharmProjects/Assignment 3  LMD/Datasets/{}"

    @staticmethod
    def get(data_name):
        # load data
        docs = np.load(ReadData.__FOLDER.format(data_name + '.npy'), allow_pickle=True).item()
        # extract dictionary with id_doc: only text of docs (without title and so on....)
        return {k: d["text"] for (k, d) in docs.items()}

In [3]:
from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np
from scipy.sparse import csr_matrix, dok_matrix, vstack
from tqdm import tqdm

class AllPairsSimilarityMaster:

    def __init__(self, docs_list, threshold):
        # set threshold
        self.threshold = threshold
        # number of docs
        self.n_docs = None
        # contain pair of (doc_id, tf_idf value)
        self.tfidf_docs = None
        # is a dictionary where {doc_key: list of id docs that have similarity over the threshold}
        self.similarity_map = {}
        # extract list of docs id
        self.keys = list(docs_list.keys())
        # calculate number of docs
        self.n_docs = len(self.keys)
        # extract doc's text
        docs = docs_list.values()
        # istance of TF-IDF
        TFIDF_transformer = TfidfVectorizer(strip_accents='unicode')
        # train algorithm
        TFIDF_transformer.fit(docs)
        # transform all docs in TF-IDF and create a csr matrix
        self.tfidf_computation = TFIDF_transformer.transform(docs)

    # measure similarity between pairs of documents
    def measure_similarity(self):
        pass

    # return similarity dictionary
    def get_similarity_map(self):
        return self.similarity_map
    
        # determine number of pairs
    def get_number_pairs(self):
        tmp_l = list(self.get_similarity_map().items())
        pairs = []
        for (k, l) in tmp_l:
            pairs += [(k, v[0]) if k<v[0] else (v[0], k) for v in l]
        return len(set(pairs))

In [4]:
class AllPairsSimilaritySeqVersion(AllPairsSimilarityMaster):

    def __init__(self, docs_list, threshold):
        super().__init__(docs_list, threshold)
        # create a list of pairs (id, tf-idf vector)
        self.tfidf_docs = [(ids, tfidf) for (ids, tfidf) in zip(self.keys, self.tfidf_computation)]
        # sort list of tuple in descendent way considering the number of element not equal zero
        # (optimization for reducing computational cost)
        self.tfidf_docs = sorted(self.tfidf_docs, reverse=True, key=AllPairsSimilaritySeqVersion.__key_order)
        # computation of minimum number of elements that must present in a vector to outdo threshold for doc i
        self.min_number_to_outdo_threshold = [self.__find_min_dot_prod_value(i) for i in range(self.n_docs)]

    # define order for sorting
    @staticmethod
    def __key_order(d):
        # extract sparse vector tf-idf
        sparse_vector_tfidf = d[1]
        # count how many element are different from zero
        # in vector
        return (sparse_vector_tfidf.nonzero()[1]).shape[0]

    # determine min number of values that are necessary to overtake the threshold
    def __find_min_dot_prod_value(self, n):
        # sort in decresing order by tf-idf value
        sorted_v = np.sort(self.tfidf_docs[n][1].toarray().flat)[::-1]
        # pre computation of dot product between it self
        pre_dot_prod = 0
        # find min number of element necessary to overtake threshold
        for n_min_values in range(sorted_v.shape[0]):
            # compute dot product of itself for each element in vector
            pre_dot_prod += sorted_v[n_min_values] ** 2
            if pre_dot_prod >= self.threshold:
                break
        return n_min_values + 1

    # measure similarity between pairs of documents
    def measure_similarity(self):
        # compute similarity
        # with this 2 for I only compute half of the similarity matrix
        for first_n in tqdm(range(self.n_docs), ncols=47):
            # determine how many values are necessary to overcome the threshold
            n_min_values = self.min_number_to_outdo_threshold[first_n]
            for second_n in range(first_n + 1, self.n_docs):
                # condition that indicates that no one vector can overtake threshold
                if (self.tfidf_docs[second_n][1].nonzero()[1]).shape[0] < n_min_values:
                    break
                # calculate similarity
                cosine_sim = self.tfidf_docs[first_n][1].dot(self.tfidf_docs[second_n][1].transpose())
                # extract similarity value
                cosine_sim = cosine_sim.toarray().flat[0]
                # if similarity is grater than threshold save pair
                if cosine_sim > self.threshold:
                    # populate list of first doc vector
                    self.similarity_map[self.tfidf_docs[first_n][0]] = self.similarity_map.get(
                        self.tfidf_docs[first_n][0], []) + [(self.tfidf_docs[second_n][0], cosine_sim)]
                    # populate list of second doc vector
                    self.similarity_map[self.tfidf_docs[second_n][0]] = self.similarity_map.get(
                        self.tfidf_docs[second_n][0], []) + [(self.tfidf_docs[first_n][0], cosine_sim)]


In [5]:
class AllPairsSimilaritySparkVersionBase(AllPairsSimilarityMaster):

    def __init__(self, docs_list, threshold, spark_context, compute_tf_idf_docs=True):
        super().__init__(docs_list, threshold)
        # spark context
        self.spark = spark_context
        # decide if computing or not the tf_idf, is useful for subclass to avoid useless computation
        if compute_tf_idf_docs:
            # create a list of pairs (id, tf-idf vector)
            self.tfidf_docs = [(ids, tfidf) for (ids, tfidf) in zip(self.keys, self.tfidf_computation)]

    @staticmethod
    # (doc_id, tf_idf vector) -> (term_id, (doc_id, tf_idf vector))
    def _first_map_flat_func(x):
        return [(term_id, (x[0], x[1])) for term_id in x[1].nonzero()[1]]
    
    @staticmethod
    # compute the cosine similarity between docs
    def _second_map_func(x, threshold):
        # values of key
        l_pairs = x[1]
        # returned vector
        cosine_vector = []
        # num docs
        num_docs = len(l_pairs)
        for i_1 in range(num_docs):
            for i_2 in range(i_1 + 1, num_docs):
                # calculate similarity
                cosine_sim = l_pairs[i_1][1].dot(l_pairs[i_2][1].transpose())
                # extract similarity value
                cosine_sim = cosine_sim.toarray().flat[0]
                # if similarity is grater than threshold save pair
                if cosine_sim > threshold:
                    # thanks to the following operations I can create a for each doc_id 
                    # the docs id that outdoes threshold
                    # populate list of first doc vector
                    cosine_vector.append((l_pairs[i_1][0], l_pairs[i_2][0], cosine_sim))
                    # populate list of second doc vector
                    cosine_vector.append((l_pairs[i_2][0], l_pairs[i_1][0], cosine_sim))
        return (x[0], cosine_vector)
                
    # measure similarity between pairs of documents
    def measure_similarity(self):
        threshold = self.threshold
        # RDD composed by pairs of (doc_id, tf_idf vector)
        rdd_tfidf = self.spark.parallelize(self.tfidf_docs)
        # execute map flat function
        mapped = rdd_tfidf.flatMap(AllPairsSimilaritySparkVersionBase._first_map_flat_func)
        # execute group by key and I map values as list
        group_by_key = mapped.groupByKey().mapValues(list)
        # execute cosine similarity for each key (term_id) in independent way
        cosines = group_by_key.map(lambda x: AllPairsSimilaritySparkVersionBase._second_map_func(x, threshold))
        # extract only values for each key
        cosines_values = cosines.values()
        # flat vectors for removing duplicates
        flat_cosines_values = cosines_values.flatMap(lambda x: x)
        # remove duplicates
        cosines_no_duplicates = flat_cosines_values.distinct()
        # create new map (doc_id_1, (doc_id_2, cosine))
        mapped_doc_keys = cosines_no_duplicates.map(lambda x: (x[0], (x[1], x[2])))
        # execute group by key and map values as list
        group_by_doc_key = mapped_doc_keys.groupByKey().mapValues(list)
        # save the final dictionary
        self.similarity_map = dict(group_by_doc_key.collect())

In [6]:
# FUNZIA
class AllPairsSimilaritySparkVersion2(AllPairsSimilaritySparkVersionBase):
    
    def __init__(self, docs_list, threshold, spark_context):
        super().__init__(docs_list, threshold, spark_context, compute_tf_idf_docs=False)
        # compute tfidf mean for each term along all docs and flat matrix
        tf_idf_mean_terms = self.tfidf_computation.mean(axis=0).A1
        # sort in decreasing order
        index_decresing_sort = np.argsort(tf_idf_mean_terms)[::-1]
        # order matrix by mean frequency
        self.ordered_tfidf_computation = self.tfidf_computation[::, index_decresing_sort]
        # vector d* (vector containing max value for each term)
        d_star = self.ordered_tfidf_computation.max(axis=0).tocsr(copy=False)
        # number of terms
        n_terms = self.ordered_tfidf_computation.shape[1]
        # list containing boundaries for each doc
        boundary_docs = AllPairsSimilaritySparkVersion2.__find_boundaries(self.ordered_tfidf_computation, d_star, self.n_docs, n_terms, self.threshold)
        # create a list of pairs (id, tf-idf vector, boundary)
        self.tfidf_docs = [(ids, tfidf, b) for (ids, tfidf, b) in zip(self.keys, self.ordered_tfidf_computation, boundary_docs)]
        
    @staticmethod
    # find min value of terms necessary to outdo the threshold
    def __find_boundaries(ordered_tfidf, d_star, n_docs, n_terms, threshold):
        boundary_docs = []
        for d in range(n_docs):
            for t in range(n_terms):
                # calculate partial cosiee
                partial_cosine = ordered_tfidf[d, 0:t+1].dot(d_star[0, 0:t+1].transpose())[0, 0]
                if partial_cosine >= threshold:
                    # this indicates the start to find a term different from zero to outdo threshold
                    boundary_docs.append(t)
                    # change doc
                    break
        return boundary_docs
    
    @staticmethod
    # (doc_id, tf_idf vector, boundary) -> (term_id, (doc_id, tf_idf vector))
    def _first_map_flat_func(x):
        vector = []
        # number of terms
        dim_terms = x[1].shape[1]
        # populate vector
        for term_id in range(x[2], dim_terms):
            if x[1][0, term_id] != 0:
                vector.append((term_id, (x[0], x[1])))
        return vector

In [9]:
#SECONDA VERSIONE
class AllPairsSimilaritySparkVersion2(AllPairsSimilaritySparkVersionBase):
    
    def __init__(self, docs_list, threshold, spark_context):
        super().__init__(docs_list, threshold, spark_context, compute_tf_idf_docs=False)
        # compute tfidf mean for each term along all docs and flat matrix
        tf_idf_mean_terms = self.tfidf_computation.mean(axis=0).A1
        # sort in decreasing order
        index_decresing_sort = np.argsort(tf_idf_mean_terms)[::-1]
        # order matrix by mean frequency
        self.ordered_tfidf_computation = self.tfidf_computation[::, index_decresing_sort]
        # adjust indices due to previous sorting
        self.__adjust_indices()
        # vector d* (vector containing max value for each term)
        d_star = self.ordered_tfidf_computation.max(axis=0).tocsr(copy=False)
        # number of terms
        n_terms = self.ordered_tfidf_computation.shape[1]
        # list containing boundaries for each doc
        boundary_docs = AllPairsSimilaritySparkVersion2.__find_boundaries(self.ordered_tfidf_computation, d_star, self.n_docs, n_terms, self.threshold)
        # create a list of pairs (id, tf-idf vector, boundary)
        self.tfidf_docs = [(ids, tfidf, b) for (ids, tfidf, b) in zip(self.keys, self.ordered_tfidf_computation, boundary_docs)]
        
    # adjust indices after sorted by frequency   
    def __adjust_indices(self):
        # empty csr
        final_csr = dok_matrix((0, self.ordered_tfidf_computation[0,::].shape[1]))
        # correct indices in csr
        for i in range(self.n_docs):
            # transform csr to array to csr (need to adjust indices)
            adjust_csr = csr_matrix(self.ordered_tfidf_computation[i, ::].toarray())
            # append a new row in matrix
            final_csr = vstack((final_csr, adjust_csr))
        # transform from coo in csr
        self.ordered_tfidf_computation = final_csr.tocsr(copy=False)
        
    @staticmethod
    # find min value of terms necessary to outdo the threshold
    def __find_boundaries(ordered_tfidf, d_star, n_docs, n_terms, threshold):
        boundary_docs = []
        for d in range(n_docs):
            # watch only indices with values different from zero
            for t in ordered_tfidf[d, ::].indices:
                # calculate partial cosiee
                partial_cosine = ordered_tfidf[d, 0:t+1].dot(d_star[0, 0:t+1].transpose())[0, 0]
                if partial_cosine >= threshold:
                    # this indicates the start to find a term different from zero to outdo threshold
                    boundary_docs.append(t)
                    # change doc
                    break
        return boundary_docs
    
    @staticmethod
    # find pos of val in vector
    def __binary_search(vector, f, t, val):
        # nearest value
        if f == t:
            return f+1 if vector[f] < val else f
        mid = (t+f)//2
        if vector[mid] < val:
            return AllPairsSimilaritySparkVersion2.__binary_search(vector, mid+1, t, val)
        elif vector[mid] > val:
            return AllPairsSimilaritySparkVersion2.__binary_search(vector, f, mid-1, val)
        else:
            return mid+1 if vector[mid] < val else mid

    @staticmethod
    # (doc_id, tf_idf vector, boundary) -> (term_id, (doc_id, tf_idf vector))
    def _first_map_flat_func(x):
        vector = []
        # list of terms not equal zero
        terms_list = x[1].indices
        # find initial point in terms_list to watch
        start_point = AllPairsSimilaritySparkVersion2.__binary_search(terms_list, 0, terms_list.shape[0]-1, x[2])
        # populate vector
        for pos in range(start_point, terms_list.shape[0]):
            vector.append((terms_list[pos], (x[0], x[1])))
        return vector

In [94]:
class AllPairsSimilaritySparkVersion3(AllPairsSimilaritySparkVersion2):
    def __init__(self, docs_list, threshold, spark_context):
        super().__init__(docs_list, threshold, spark_context)

    @staticmethod
    # control if this term is the last contained inside tdf_idf of d1 and d2
    def __term_is_max(t, tfidf_d1, tfidf_d2):
        return tfidf_d1[0, ::].indices.max() == t and tfidf_d2[0, ::].indices.max() == t

    @staticmethod
    # compute the cosine similarity between docs
    def _second_map_func(x, threshold):
        l_pairs = x[1]
        cosine_vector = []
        num_docs = len(l_pairs)
        for i_1 in range(num_docs):
            for i_2 in range(i_1 + 1, num_docs):
                if AllPairsSimilaritySparkVersion3.__term_is_max(x[0], l_pairs[i_1][1], l_pairs[i_2][1]):
                    # calculate similarity
                    cosine_sim = l_pairs[i_1][1].dot(l_pairs[i_2][1].transpose())
                    # extract similarity value
                    cosine_sim = cosine_sim.toarray().flat[0]
                    # if similarity is grater than threshold save pair
                    if cosine_sim > threshold:
                        # thanks to the following operations I can create a for each doc_id 
                        # the docs id that outdoes threshold
                        # populate list of first doc vector
                        cosine_vector.append((l_pairs[i_1][0], l_pairs[i_2][0], cosine_sim))
                        # populate list of second doc vector
                        cosine_vector.append((l_pairs[i_2][0], l_pairs[i_1][0], cosine_sim))
        return (x[0], cosine_vector)

## READ DATA

In [5]:
# extract docs from dataset
docs = ReadData.get('corpus_scifact')
toy_docs = dict(list(docs.items())[:50])

## RUN SEQUENTIAL ALGORITHM

In [6]:
similarity_pair_seq = AllPairsSimilaritySeqVersion(toy_docs, 0.15)
similarity_pair_seq.measure_similarity()

100%|██████████| 50/50 [00:00<00:00, 72.69it/s]


In [7]:
similarity_pair_seq.get_number_pairs()

39

In [77]:
similarity_pair_seq.similarity_map

{'159469': [('70115', 0.15156955084744542)],
 '70115': [('159469', 0.15156955084744542),
  ('152245', 0.1649078306048879),
  ('54440', 0.20368075274043673),
  ('97884', 0.18071215646446034),
  ('236204', 0.16444437595807393),
  ('103007', 0.15280919245841593)],
 '152245': [('54440', 0.15085087369416136),
  ('236204', 0.1576662509161589),
  ('70115', 0.1649078306048879),
  ('293661', 0.15397538589377446)],
 '54440': [('152245', 0.15085087369416136),
  ('253672', 0.15509394136401228),
  ('236204', 0.17953733699712568),
  ('70115', 0.20368075274043673)],
 '236204': [('152245', 0.1576662509161589),
  ('54440', 0.17953733699712568),
  ('143251', 0.1538813579074872),
  ('97884', 0.1697628310906025),
  ('19238', 0.18481970162326697),
  ('72159', 0.16962964712036308),
  ('70115', 0.16444437595807393),
  ('123859', 0.15825126568124032),
  ('7912', 0.1910752299774629),
  ('279052', 0.2163670011545827)],
 '293661': [('152245', 0.15397538589377446)],
 '188911': [('5836', 0.15309493178251343),
  ('

## RUN SPARK ALGORITHM BASE

In [9]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# ------------------------------------
# user interface http://localhost:1111
# ------------------------------------
# #                             local cluster using all core| name of application|   name of the host                           |  specific port
spark = SparkSession.builder.master("local[*]").appName("Assignment 3").config("spark.driver.bindAddress", "localhost").config("spark.ui.port", "1111").\
getOrCreate()# get or instantiate a SparkContext and register it as a singleton object

# PySpark functionality that is used to communicate with the cluster and to create an RDD, accumulator, and broadcast variables
sc = spark.sparkContext

In [176]:
similarity_pair_spark_base = AllPairsSimilaritySparkVersionBase(toy_docs, .15, sc)

In [None]:
similarity_pair_spark_base.measure_similarity()

In [104]:
similarity_pair_spark_base.similarity_map

{'5836': [('188911', 0.15309493178251343)],
 '188911': [('5836', 0.15309493178251343),
  ('164985', 0.1547685118768981),
  ('266641', 0.15934402512329895)],
 '18670': [('279052', 0.1543503825550648)],
 '279052': [('18670', 0.1543503825550648),
  ('236204', 0.2163670011545827),
  ('36474', 0.2087707577734651)],
 '54440': [('70115', 0.20368075274043673),
  ('152245', 0.15085087369416136),
  ('236204', 0.17953733699712568),
  ('253672', 0.15509394136401228)],
 '70115': [('54440', 0.20368075274043673),
  ('97884', 0.18071215646446034),
  ('152245', 0.1649078306048879),
  ('159469', 0.15156955084744542),
  ('236204', 0.16444437595807393),
  ('103007', 0.15280919245841593)],
 '152245': [('54440', 0.15085087369416136),
  ('70115', 0.1649078306048879),
  ('236204', 0.1576662509161589),
  ('293661', 0.15397538589377446)],
 '236204': [('54440', 0.17953733699712568),
  ('70115', 0.16444437595807393),
  ('72159', 0.16962964712036308),
  ('97884', 0.1697628310906025),
  ('143251', 0.153881357907487

In [8]:
# stop all
sc.stop()

## RUN SPARK ALGORITHM VERSION 2

In [13]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
# ------------------------------------
# user interface http://localhost:1111
# ------------------------------------
# #                             local cluster using all core| name of application|   name of the host                           |  specific port
spark = SparkSession.builder.master("local[*]").appName("Assignment 3").config("spark.driver.bindAddress", "localhost").config("spark.ui.port", "1111").\
getOrCreate()# get or instantiate a SparkContext and register it as a singleton object

# PySpark functionality that is used to communicate with the cluster and to create an RDD, accumulator, and broadcast variables
sc = spark.sparkContext

In [14]:
similarity_pair_spark_2 = AllPairsSimilaritySparkVersion2(toy_docs, .15, sc)

In [15]:
similarity_pair_spark_2.measure_similarity()

In [85]:
similarity_pair_spark_2.similarity_map

{'7912': [('19238', 0.19188221057375543), ('236204', 0.19107522997746287)],
 '152245': [('54440', 0.15085087369416136),
  ('236204', 0.15766625091615893),
  ('70115', 0.16490783060488787),
  ('293661', 0.15397538589377446)],
 '243694': [('106301', 0.16580764276601717), ('92499', 0.30137567345351907)],
 '236204': [('7912', 0.19107522997746287),
  ('152245', 0.15766625091615893),
  ('54440', 0.17953733699712565),
  ('123859', 0.15825126568124032),
  ('19238', 0.18481970162326694),
  ('72159', 0.16962964712036305),
  ('279052', 0.2163670011545828),
  ('97884', 0.1697628310906025),
  ('70115', 0.16444437595807393),
  ('143251', 0.1538813579074872)],
 '54440': [('152245', 0.15085087369416136),
  ('236204', 0.17953733699712565),
  ('70115', 0.20368075274043676),
  ('253672', 0.15509394136401233)],
 '106301': [('243694', 0.16580764276601717)],
 '123859': [('236204', 0.15825126568124032), ('97884', 0.15779396212498972)],
 '285794': [('92308', 0.19733285226955793),
  ('263364', 0.16889205937435

In [16]:
verify_equality_between_models(similarity_pair_seq.similarity_map, similarity_pair_spark_2.similarity_map)

Key are equal: True
Total mistake: 0


In [166]:
sc.stop()

## RUN SPARK ALGORITHM VERSION 3

In [None]:
import findspark
findspark.init()
import pyspark

sc = pyspark.SparkContext(appName="Hei")

In [95]:
similarity_pair_spark_3 = AllPairsSimilaritySparkVersion3(toy_docs, .15, sc)

In [96]:
similarity_pair_spark_3.measure_similarity()

In [97]:
similarity_pair_spark_3.similarity_map

{'7912': [('19238', 0.19188221057375543), ('236204', 0.19107522997746287)],
 '152245': [('54440', 0.15085087369416136),
  ('236204', 0.15766625091615893),
  ('70115', 0.16490783060488787),
  ('293661', 0.15397538589377446)],
 '243694': [('106301', 0.16580764276601717), ('92499', 0.30137567345351907)],
 '236204': [('7912', 0.19107522997746287),
  ('152245', 0.15766625091615893),
  ('54440', 0.17953733699712565),
  ('123859', 0.15825126568124032),
  ('19238', 0.18481970162326694),
  ('72159', 0.16962964712036305),
  ('279052', 0.2163670011545828),
  ('97884', 0.1697628310906025),
  ('70115', 0.16444437595807393),
  ('143251', 0.1538813579074872)],
 '54440': [('152245', 0.15085087369416136),
  ('236204', 0.17953733699712565),
  ('70115', 0.20368075274043676),
  ('253672', 0.15509394136401233)],
 '106301': [('243694', 0.16580764276601717)],
 '123859': [('236204', 0.15825126568124032), ('97884', 0.15779396212498972)],
 '285794': [('92308', 0.19733285226955793),
  ('263364', 0.16889205937435

In [179]:
sc.stop()

# PROVA PYSPARK

In [2]:
import findspark
findspark.init()
import pyspark

sc = pyspark.SparkContext(appName="Hei")
data = [(i, (1, 2)) for i in range(10)]
myrdd = sc.parallelize(data)

In [3]:
data = [(i, [1, 1]) for i in range(10)] + [(i, [2, 2]) for i in range(10)] + [(i, [3, 3]) for i in range(10)]
myrdd = sc.parallelize(data)

In [44]:
data

[(0, [1, 1]),
 (1, [1, 1]),
 (2, [1, 1]),
 (3, [1, 1]),
 (4, [1, 1]),
 (5, [1, 1]),
 (6, [1, 1]),
 (7, [1, 1]),
 (8, [1, 1]),
 (9, [1, 1]),
 (0, [2, 2]),
 (1, [2, 2]),
 (2, [2, 2]),
 (3, [2, 2]),
 (4, [2, 2]),
 (5, [2, 2]),
 (6, [2, 2]),
 (7, [2, 2]),
 (8, [2, 2]),
 (9, [2, 2]),
 (0, [3, 3]),
 (1, [3, 3]),
 (2, [3, 3]),
 (3, [3, 3]),
 (4, [3, 3]),
 (5, [3, 3]),
 (6, [3, 3]),
 (7, [3, 3]),
 (8, [3, 3]),
 (9, [3, 3])]

In [45]:
m = myrdd.map(lambda x: x)

In [46]:
r = m.reduceByKey(lambda x,y: x + y)

In [38]:
r.collect()

[(0, [1, 1, 2, 2, 3, 3]),
 (8, [1, 1, 2, 2, 3, 3]),
 (1, [1, 1, 2, 2, 3, 3]),
 (9, [1, 1, 2, 2, 3, 3]),
 (2, [1, 1, 2, 2, 3, 3]),
 (3, [1, 1, 2, 2, 3, 3]),
 (4, [1, 1, 2, 2, 3, 3]),
 (5, [1, 1, 2, 2, 3, 3]),
 (6, [1, 1, 2, 2, 3, 3]),
 (7, [1, 1, 2, 2, 3, 3])]

In [4]:
data = [(i, 1) for i in range(10)] + [(i, 2) for i in range(10)] + [(i, 3) for i in range(10)]
myrdd = sc.parallelize(data)



m = myrdd.map(lambda x: x)
print(m.collect())
r = m.groupByKey()


[(0, 1), (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1), (7, 1), (8, 1), (9, 1), (0, 2), (1, 2), (2, 2), (3, 2), (4, 2), (5, 2), (6, 2), (7, 2), (8, 2), (9, 2), (0, 3), (1, 3), (2, 3), (3, 3), (4, 3), (5, 3), (6, 3), (7, 3), (8, 3), (9, 3)]


In [64]:
r.mapValues(list).collect()

[(0, [1, 2, 3]),
 (8, [1, 2, 3]),
 (1, [1, 2, 3]),
 (9, [1, 2, 3]),
 (2, [1, 2, 3]),
 (3, [1, 2, 3]),
 (4, [1, 2, 3]),
 (5, [1, 2, 3]),
 (6, [1, 2, 3]),
 (7, [1, 2, 3])]

In [30]:
def prova(x):
    v = []
    for i in x:
        if i >1:
            v.append(i)
    return tuple(v)

# x is the vector for each key
mm = r.mapValues(prova)

In [31]:
mm.collect()

[(0, (2, 3)),
 (8, (2, 3)),
 (1, (2, 3)),
 (9, (2, 3)),
 (2, (2, 3)),
 (3, (2, 3)),
 (4, (2, 3)),
 (5, (2, 3)),
 (6, (2, 3)),
 (7, (2, 3))]

In [32]:
v = mm.values()

In [33]:
v.collect()

[(2, 3),
 (2, 3),
 (2, 3),
 (2, 3),
 (2, 3),
 (2, 3),
 (2, 3),
 (2, 3),
 (2, 3),
 (2, 3)]

In [34]:
rrr = v.distinct()

In [35]:
rrr.collect()

[(2, 3)]

In [56]:
# stop all
sc.stop()

(3, 2, 3, 1)

# --------------------------------------------------

In [275]:
# compute mean and flat matrix
mean_terms = similarity_pair_seq.tfidf_computation.mean(axis=0).A1

In [276]:
# sort in decreasing order
index_decresing_sort = np.argsort(mean_terms)[::-1]

In [277]:
# order matrix by frequency
ordered_tfidf = similarity_pair_seq.tfidf_computation[::, index_decresing_sort]

In [329]:
for i in range(ordered_tfidf.shape[0]):
    ordered_tfidf[0, ::] = csr_matrix(ordered_tfidf[0, ::].toarray())
    break

In [347]:
# vector d*
d_star = ordered_tfidf.max(axis=0)

In [358]:
ordered_tfidf[0, 0:100+1].dot(d_star[0, 0:100+1].transpose())[0, 0]

0.16662577404252926

In [357]:
d_star = d_star.tocsr(copy=False)
type(d_star)

scipy.sparse._csr.csr_matrix

In [302]:
n_terms = ordered_tfidf.shape[1]
n_docs = ordered_tfidf.shape[0]

In [None]:
boundary_docs = []
for d in range(n_docs):
    for t in range(n_terms):
        partial_cosine = ordered_tfidf[d, 0:t+1].dot(d_star[0, 0:t+1].transpose())[0, 0]
        if partial_cosine >= threshold:
            boundary_docs.append(t-1)
            # change doc
            break
            
        

scipy.sparse._coo.coo_matrix

In [305]:
ordered_tfidf[1, 0:1000].dot(ordered_tfidf[1, 0:1000].transpose())[0, 0]

0.8218331623316228

In [335]:
ordered_tfidf[0, ::].shape[1]

2733

In [330]:
ordered_tfidf[0, ::].indices

array([ 961,    8,  204,   17, 1395, 2610,   21, 2615,  531, 1286, 1211,
          4,  635,  225,  255,    0,    6,  226,  191,  992, 1300,  309,
        386,  423,  325, 2598,  837,  472,  640, 2608,  808,  702, 2609,
        240, 1128,   25,    1, 2622,  705,  787, 2603,  703,  517,  266,
        203, 2624,  741,  435, 2625,  452, 1341,   36,  993, 2627,  410,
        552,    3,  832,   46,  420,  106, 1274, 2607,  144,  507, 2621,
        688,  182,  114, 2599, 2602,  202, 1152,   40,  253, 2606,  373,
         41, 2619, 2613,  500, 1446, 1082, 2601,  704,  766,  584,   51,
       2618, 2617,    9,  709,   76, 1153,   32, 1297,  727,   12, 2628,
       1445,  475,  641,  991,    2,   64, 2623, 1609, 2626, 2616, 2620,
       1444,  697, 2614, 2612, 2611,  227, 2605, 2600, 2604, 1275])

In [331]:
similarity_pair_seq.tfidf_computation[0, ::].indices

array([2713, 2710, 2698, 2691, 2689, 2683, 2681, 2672, 2653, 2641, 2543,
       2534, 2523, 2522, 2514, 2491, 2490, 2489, 2480, 2477, 2393, 2305,
       2287, 2260, 2234, 2222, 2170, 2159, 2116, 2031, 2019, 1948, 1939,
       1918, 1758, 1737, 1731, 1681, 1605, 1603, 1550, 1547, 1508, 1506,
       1496, 1481, 1464, 1452, 1440, 1430, 1426, 1335, 1328, 1303, 1269,
       1253, 1241, 1217, 1187, 1154, 1153, 1102, 1073, 1043, 1041,  980,
        979,  829,  820,  758,  750,  749,  740,  729,  728,  699,  686,
        670,  622,  612,  536,  522,  521,  510,  458,  456,  419,  412,
        410,  403,  402,  394,  388,  375,  307,  302,  300,  292,  273,
        271,  265,  261,  236,  229,  224,  208,  177,  116,   69,   58,
         50,   47,   45,   33,   29,   19,   18,   12,    6,    2])

In [272]:
l = []
for i in range(10):
    l.append(np.random.randint(low=0, high=10000, size=10))
    l[-1] = l[-1] / np.sqrt(sum([i**2 for i in l[-1]]))
    
d_star = np.matrix(l).max(axis=0)

In [273]:
for l_ in l:
    print(l_.dot(d_star.transpose()))

[[1.47813969]]
[[1.39534562]]
[[1.44659433]]
[[1.43452986]]
[[1.51445932]]
[[1.35510844]]
[[1.47826521]]
[[1.47416507]]
[[1.33269231]]
[[1.52399446]]


In [None]:
aa = []

In [431]:
aa = csr_matrix([1, 2, 3])

In [434]:
aa.indices = np.array([1, 1, 1])
aa.nonzero()

(array([0, 0, 0]), array([1, 1, 1]))

In [35]:
o = similarity_pair_spark_3.ordered_tfidf_computation

In [36]:
o

<50x2733 sparse matrix of type '<class 'numpy.float64'>'
	with 5825 stored elements in Compressed Sparse Row format>

In [61]:
final = dok_matrix((0, o[0,::].shape[1]))
for i in range(similarity_pair_spark_3.n_docs):
    change = csr_matrix(o[i, ::].toarray())
    final = vstack((final, change))
final = final.tocsr()

In [88]:
final[0, ::].indices

array([   0,    1,    2,    3,    4,    6,    8,    9,   12,   17,   21,
         25,   32,   36,   40,   41,   46,   51,   64,   76,  106,  114,
        144,  182,  191,  202,  203,  204,  225,  226,  227,  240,  253,
        255,  266,  309,  325,  373,  386,  410,  420,  423,  435,  452,
        472,  475,  500,  507,  517,  531,  552,  584,  635,  640,  641,
        688,  697,  702,  703,  704,  705,  709,  727,  741,  766,  787,
        808,  832,  837,  961,  991,  992,  993, 1082, 1128, 1152, 1153,
       1211, 1274, 1275, 1286, 1297, 1300, 1341, 1395, 1444, 1445, 1446,
       1609, 2598, 2599, 2600, 2601, 2602, 2603, 2604, 2605, 2606, 2607,
       2608, 2609, 2610, 2611, 2612, 2613, 2614, 2615, 2616, 2617, 2618,
       2619, 2620, 2621, 2622, 2623, 2624, 2625, 2626, 2627, 2628])

In [65]:
final[2, ::].dot(final[4, ::].transpose())[0,0]

0.19188221057375543

In [64]:
similarity_pair_spark_3.keys

['4983',
 '5836',
 '7912',
 '18670',
 '19238',
 '33370',
 '36474',
 '54440',
 '70115',
 '70490',
 '72159',
 '79447',
 '87758',
 '92308',
 '92499',
 '97884',
 '102662',
 '103007',
 '104130',
 '106301',
 '116792',
 '118568',
 '120626',
 '123859',
 '140874',
 '143251',
 '152245',
 '153744',
 '159469',
 '164189',
 '164985',
 '169264',
 '175735',
 '188911',
 '195352',
 '202259',
 '207972',
 '213017',
 '219475',
 '226488',
 '236204',
 '238409',
 '243694',
 '253672',
 '263364',
 '266641',
 '275294',
 '279052',
 '285794',
 '293661']

In [47]:
csr_matrix(o[0, ::].toarray())

<1x2733 sparse matrix of type '<class 'numpy.float64'>'
	with 120 stored elements in Compressed Sparse Row format>

In [None]:
dok_matrix((100, 100))
vstack((m1, m2))

In [108]:
i = final[0, ::].indices

i

array([   0,    1,    2,    3,    4,    6,    8,    9,   12,   17,   21,
         25,   32,   36,   40,   41,   46,   51,   64,   76,  106,  114,
        144,  182,  191,  202,  203,  204,  225,  226,  227,  240,  253,
        255,  266,  309,  325,  373,  386,  410,  420,  423,  435,  452,
        472,  475,  500,  507,  517,  531,  552,  584,  635,  640,  641,
        688,  697,  702,  703,  704,  705,  709,  727,  741,  766,  787,
        808,  832,  837,  961,  991,  992,  993, 1082, 1128, 1152, 1153,
       1211, 1274, 1275, 1286, 1297, 1300, 1341, 1395, 1444, 1445, 1446,
       1609, 2598, 2599, 2600, 2601, 2602, 2603, 2604, 2605, 2606, 2607,
       2608, 2609, 2610, 2611, 2612, 2613, 2614, 2615, 2616, 2617, 2618,
       2619, 2620, 2621, 2622, 2623, 2624, 2625, 2626, 2627, 2628])

In [146]:
def binary_search(vector, f, t, val):
    # nearest value
    if f == t:
        return f+1 if vector[f] < val else f
    mid = (t+f)//2
    if vector[mid] < val:
        return find_pos(vector, mid+1, t, val)
    elif vector[mid] > val:
        return find_pos(vector, f, mid-1, val)
    else:
        return mid+1 if vector[mid] < val else mid

In [149]:
binary_search(i, 0, i.shape[0]-1, 2628)

a
a
a
a
a


119

In [150]:
i[119]

2628

# --------------------------------------------------