In [1]:
import sys
sys.path.append('/home/sebaq/Documents/GitHub/LWMD_assignments')

## Definition of Map and Reduce functions

In [2]:
def document_map(
        doc_info: tuple[str, int, list[tuple[int, float]]]
) -> list[tuple[str, tuple[str, int, list[tuple[int, float]]]]]:
    """
    Mapping function
    :param doc_info: document information is represented as a triple:
        - doc-id, represented as a string
        - term-threshold, referring to the index of a specific column up to which do not map terms
        - document vector, a sparse vector as a list of pairs (column, value) for each non-zero entries,
            where the column is actually a term-id
    :return: list of key-value pairs:
        - key: term-id, which is actually a column index
        - value: consists of a triple:
            - doc-id  (the same as input)
            - term-id (the same as the key)
            - document vector (the same as input)
    """

    # unpacking
    doc_id: str
    term_threshold: int
    sparse_entries: list[tuple[int, float]]
    doc_id, term_threshold, sparse_entries = doc_info

    mapped: list[tuple[str, tuple[str, int, list[tuple[int, float]]]]] = [

        (str(term_id), (doc_id, term_id, sparse_entries))
        for term_id, value in sparse_entries  # document terms by using non-zero entries
        if term_id > term_threshold  # OPTIMIZATION 1:
        # we only map term with higher term-id with respect to the threshold one
        #  (thus, we only consider columns after the threshold one)
    ]

    return mapped

In [3]:
def documents_reduce(docs: list[tuple[int, int, list[tuple[int, float]]]]) -> list[tuple[tuple[int, int], float]]:
    """
    Reduce function
    :param docs: list of triplets:
        - doc-id
        - term-id (actually a column index of the vector)
        - document vector as a sparse matrix of pairs (column, value)
    :return: list of tuples:
        - the first element is the pair of documents represented by their doc-id
        - the second element represent their cosine-similarity
    """

    # list of output pairs
    pairs = []

    # DOC-SIZE HEURISTIC pt. 1 - sort items for document length
    # docs = sorted(docs, key=lambda x: len(x[2]), reverse=True)

    # total number of documents
    n_docs = len(docs)

    # loop among all possible pairs
    for i in range(n_docs - 1):

        doc1_id, term_id, doc1 = docs[i]

        for j in range(i + 1, n_docs):

            doc2_id, _, doc2 = docs[j]  # since the operation is an aggregation by key,
            # term_id is expected to be the same

            # DOC-SIZE HEURISTIC pt. 2 - skip if too-high length mismatch
            # if len(doc1) / len(doc2) > 1.3:
            #     break

            # ----------------- OPTIMIZATION 2 -----------------

            # collect term-ids of each document
            terms_1: list[int] = [t_id1 for t_id1, _ in doc1]  # term-ids for the first document
            terms_2: list[int] = [t_id2 for t_id2, _ in doc2]  # term-ids for the second document

            # perform their intersection
            common_terms: set[int] = set(terms_1).intersection(terms_2)

            # get the maximum term-id
            max_term: int = max(common_terms)

            # if the maximum term-id is not the same of aggregation key, skip similarity computation
            if term_id != max_term:
                pass

            # --------------------------------------------------

            # Computing similarity with dot-product

            # getting iterator
            iter_doc1 = iter(doc1)
            iter_doc2 = iter(doc2)

            # we assume documents with at least on term
            term1, value1 = next(iter_doc1)
            term2, value2 = next(iter_doc2)

            sim = 0.  # total similarity

            # we use iterators to keep a pointer over term-ids of the two vectors
            # if they have the same term-id, we add its contribution to the cumulative sum and we move both pointers over
            # otherwise we move over the one with smallest term-id

            while True:

                try:
                    if term1 == term2:  # they have common term-id; we add its contribution to final similarity
                        sim += value1 * value2
                        term1, value1 = next(iter_doc1)
                        term2, value2 = next(iter_doc2)
                    elif term1 < term2:  # the first one has a smaller term-id
                        term1, value1 = next(iter_doc1)
                    else:  # the second one has a smaller term-id
                        term2, value2 = next(iter_doc2)
                except StopIteration:  # we scanned all terms of one of the vectors so there's no more term in common
                    break

            # we add the pairwise similarity to final output
            pairs.append(((doc1_id, doc2_id), sim))

    return pairs


In [4]:
from assignment3.utils import jaccard
from assignment3.io_ import load_evaluation, get_exact_solution_file
from typing import List, Tuple

def compare_with_exact(data_name: str, collected_: List[Tuple[str, str]]) -> float:
    """
    Compares results coming from spark to sequential execution
    :param data_name: name of dataset
    :param collected_: pairs of similar docs from spark
    :return: jaccard similarity with exact solution
    """

    exact = load_evaluation(path_=get_exact_solution_file(data_name=data_name))['pairs']
    exact = [(a, b) for a, b in exact]

    return jaccard(set(collected_), set(exact))


## Evaluation over small example

In [5]:
DATA_NAME = 'small'

In [6]:
SIMILARITY = 0.8

In [7]:
IDF_ORDER = True

### Loading document info

In [8]:
from assignment3.model.documents import DocumentVectors
docs_vet = DocumentVectors(data_name=DATA_NAME, idf_order=True)

Loading vectors... 
Loading mapping... 
Loading inverse mapping... 


In [9]:
docs_vet

small Vector Documents [4735]

In [10]:
docs_info = docs_vet.get_documents_info(similarity=SIMILARITY)

### Spark

In [11]:
rdd = sc.parallelize(docs_info)

In [12]:
out = rdd.flatMap(document_map).\
    combineByKey(lambda x: [x], lambda x, y: x + [y], lambda x, y: x + y).\
    flatMapValues(documents_reduce).\
    filter(lambda x: x[1][1] > SIMILARITY).\
    map(lambda x: x[1][0]).\
    distinct()

In [13]:
collected = out.collect()

23/05/25 20:13:36 WARN TaskSetManager: Stage 0 contains a task of very large size (2112 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [14]:
collected

[('x11t9pbv', 'cgx90xnt'),
 ('l9cms7ux', 'ck21e69s'),
 ('1czjl0hz', 'uqr00nzd'),
 ('ny59r4qe', '4tt0vnr4'),
 ('7krf1yxz', '9ofqelrm'),
 ('1czjl0hz', '5vpzgzvz'),
 ('76uk9tj5', 'qxo82jkv')]

In [15]:
compare_with_exact(data_name=DATA_NAME, collected_=collected)

1.0

## Evaluation over medium example

In [16]:
DATA_NAME = 'medium'

In [17]:
SIMILARITY = 0.85

In [18]:
IDF_ORDER = True

### Loading document info

In [19]:
from assignment3.model.documents import DocumentVectors
docs_vet = DocumentVectors(data_name=DATA_NAME, idf_order=True)

Loading vectors... 
Loading mapping... 
Loading inverse mapping... 


In [20]:
docs_vet

medium Vector Documents [9374]

In [21]:
docs_info = docs_vet.get_documents_info(similarity=SIMILARITY)

### Spark

In [22]:
rdd = sc.parallelize(docs_info)

In [23]:
out = rdd.flatMap(document_map).\
    combineByKey(lambda x: [x], lambda x, y: x + [y], lambda x, y: x + y).\
    flatMapValues(documents_reduce).\
    filter(lambda x: x[1][1] > SIMILARITY).\
    map(lambda x: x[1][0]).\
    distinct()

In [24]:
collected = out.collect()

23/05/25 20:18:57 WARN TaskSetManager: Stage 3 contains a task of very large size (4070 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [25]:
collected

[('1czjl0hz', 'uqr00nzd'),
 ('l9cms7ux', 'ck21e69s'),
 ('99ev94d2', 'xzuc51jz'),
 ('g1ve8zld', '58l59lbu'),
 ('3klmji5d', 'uunhkrjd'),
 ('77apiqq2', 'k7ah2zdc'),
 ('hmdikgf5', '0mi0n2zn'),
 ('3klmji5d', 'v1d0rqbi'),
 ('3klmji5d', 'cnwg4dnn'),
 ('3klmji5d', '2vsh18ia'),
 ('3klmji5d', 'kdx4hlr1'),
 ('3klmji5d', 'hm8tvkt3'),
 ('3klmji5d', 'hkbfbv3h'),
 ('v1d0rqbi', '0mi0n2zn'),
 ('uunhkrjd', 'hm8tvkt3'),
 ('uunhkrjd', 'hkbfbv3h'),
 ('cd5yoh0l', 'r8xcl0iu'),
 ('hmdikgf5', 'v1d0rqbi'),
 ('hmdikgf5', 'cnwg4dnn'),
 ('hmdikgf5', '2vsh18ia'),
 ('hmdikgf5', 'kdx4hlr1'),
 ('hmdikgf5', 'hm8tvkt3'),
 ('hmdikgf5', 'hkbfbv3h'),
 ('3klmji5d', '0mi0n2zn'),
 ('v1d0rqbi', 'cnwg4dnn'),
 ('v1d0rqbi', '2vsh18ia'),
 ('v1d0rqbi', 'kdx4hlr1'),
 ('v1d0rqbi', 'hm8tvkt3'),
 ('v1d0rqbi', 'hkbfbv3h'),
 ('0mi0n2zn', 'uunhkrjd'),
 ('cnwg4dnn', '2vsh18ia'),
 ('cnwg4dnn', 'kdx4hlr1'),
 ('cnwg4dnn', 'hm8tvkt3'),
 ('cnwg4dnn', 'hkbfbv3h'),
 ('2vsh18ia', 'kdx4hlr1'),
 ('2vsh18ia', 'hm8tvkt3'),
 ('2vsh18ia', 'hkbfbv3h'),
 

In [26]:
compare_with_exact(data_name=DATA_NAME, collected_=collected)

1.0

## Evaluation over large example

In [27]:
DATA_NAME = 'large'

In [28]:
SIMILARITY = 0.9

In [29]:
IDF_ORDER = True

### Loading document info

In [30]:
from assignment3.model.documents import DocumentVectors
docs_vet = DocumentVectors(data_name=DATA_NAME, idf_order=True)

Loading vectors... 
Loading mapping... 
Loading inverse mapping... 


In [31]:
docs_vet

large Vector Documents [13641]

In [32]:
docs_info = docs_vet.get_documents_info(similarity=SIMILARITY)

### Spark

In [33]:
rdd = sc.parallelize(docs_info)

In [34]:
out = rdd.flatMap(document_map).\
    combineByKey(lambda x: [x], lambda x, y: x + [y], lambda x, y: x + y).\
    flatMapValues(documents_reduce).\
    filter(lambda x: x[1][1] > SIMILARITY).\
    map(lambda x: x[1][0]).\
    distinct()

In [35]:
collected = out.collect()

23/05/25 20:33:05 WARN TaskSetManager: Stage 6 contains a task of very large size (6088 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [36]:
collected

[('92t646ti', 'rsltf8wi'),
 ('tmq8viu0', '9xgwjvsv'),
 ('ntbuwf8i', '0q2ky9gd'),
 ('2dx6lsl3', 'j2xn6260'),
 ('wtgbdwi1', 'hjdzbdai'),
 ('hlyw0evy', '6xa46shb'),
 ('amy5h733', 'rsgtfis3'),
 ('2qiy1glv', '3xx4zm4i'),
 ('55gr63md', '762aqyaa'),
 ('ljbtsn49', 'za18znu4'),
 ('hha2sctb', '3i7wgrck'),
 ('6nh9lfkk', 'pketymav'),
 ('2j2uieil', 'ib8nkjoo'),
 ('k1lq547f', '544q6hkq'),
 ('zjrkfe9b', 'lc5nhtzw'),
 ('ckoktq1g', '7gqvbbw7'),
 ('hmjksfzn', '5y88qbty'),
 ('vdddhcuq', 'qd9ps3zb'),
 ('g1ve8zld', '58l59lbu'),
 ('xiep0udu', '9hlx36e9'),
 ('q2zsd4qr', 'ipreyvi4'),
 ('idq12eck', 'grljzlyl'),
 ('3klmji5d', 'uunhkrjd'),
 ('joxp94rl', 'h7ffv0xb'),
 ('jms7hrmp', 'egbkl2v2'),
 ('l8q7d14h', 'nwo7jka1'),
 ('77apiqq2', 'k7ah2zdc'),
 ('tefkx87y', 'psjh0pij'),
 ('ljbtsn49', 'g8e5sqh7'),
 ('xu3gfwpu', 'nbh9qw5h'),
 ('n4vs4j5x', '2j2uieil'),
 ('n4vs4j5x', 'ib8nkjoo'),
 ('k1lq547f', 'p5ns0xnq'),
 ('544q6hkq', 'p5ns0xnq'),
 ('8f22ukjc', 'wnt5j5da'),
 ('ckoktq1g', 'dl6szur0'),
 ('c41smve5', 'iuo9s4qb'),
 

In [37]:
compare_with_exact(data_name=DATA_NAME, collected_=collected)

1.0