## Assignment 1 - LSH

Implement and apply LSH to identify similar news articles. This method identifies candidate pairs of news articles that are likely to have high Jaccard similarity.

### Exercise 2.1 

### Importing libraries and loading data 

In [1]:
from pyspark import SparkContext
import json
import re
import time 
import random
import itertools

sc = SparkContext.getOrCreate()

# Load the JSON data from the file
sentences = {}
with open('covid_news_small.json', 'rt') as file:
    for line in file:
        data = json.loads(line)  # Load JSON object from line
        
        # Clean and split text into sentences
        text = re.sub(r"[\r\n]+|\W+", " ", data["text"]).strip()  # Remove newlines and non-word characters
        
        sentences[data["tweet_id"]] = text.lower()

24/07/06 08:04:19 WARN Utils: Your hostname, nudibranch-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.55.165 instead (on interface wlp2s0)
24/07/06 08:04:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/06 08:04:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Shingles Generation

Returns a list of shingles with a given length for a given document

In [2]:
k = 3  # Length of each shingle
def generate_shingles(text: str) -> set[int]:
    return {text[i:i+k] for i in range(len(text) - k + 1)}

#### MinHash Signatures Generation

MinHashing aims to estimate the Jaccard similarity between sets. 

**Signature Generation:** Returns list of signatures for sets of shingles extracted from text data.

**Universal Hashing:** Uses random coefficients a and b for universal hashing to compute hash values for each shingle for K hash functions (choosing the minimum hash value). This strategy is chosen to reduce collisions and evenly distribute shingles across hash buckets.

In [3]:
# Constants definition
num_bands = 15 # Number of bands
num_rows = 10 # Number of rows in the signature matrix
p = 2**32 - 1 # A large prime number

def choose_a_b():
    a = random.randint(1, p - 1)
    b = random.randint(0, p - 1)
    return a, b

def universal_hashing(hash_function_index):
    a, b = choose_a_b()
    length_shingles = 1_000_000_000 
    hash_value = (a * int(hash_function_index) + b) % p % length_shingles
    return hash_value

def generate_signature(shingles) -> list[int]:
    num_hashes = num_bands * num_rows  # K hash functions
    sig = [float('inf')] * num_hashes  

    for shingle in shingles:
        for h in range(num_hashes):
            hash_value = universal_hashing(h) 
            if hash_value < sig[h]:
                sig[h] = hash_value
    return sig

#### Bands Creation

returns a list of bands for a given list of signatures
each band is a hash value and this value is different for each band (offset)
this is done to put each row of bands in a bucket without collisions from other rows

In [4]:
def generate_bands(signature):
    band_indices = [i % num_bands for i in signature]
    return band_indices

#### Spark implementation of LSH

Spark’s distributed computing capabilities to handle large-scale data processing in parallel, making it suitable for tasks like LSH where computation can be split across multiple nodes. Each node processes a subset of the data in parallel, ensuring efficient computation of shingles, MinHash signatures, bands, and hash buckets.

In [5]:
# Create an RDD from the sentences
sentences_rdd = sc.parallelize(sentences)

# Invert the signature matrix
def invert(item):
    doc_id, bands = item
    return [(band, doc_id) for band in bands]

start = time.time()
lsh = ( sentences_rdd
        .mapValues(generate_shingles)
        .mapValues(generate_signature)
        .mapValues(generate_bands)
        .flatMap(invert)
        .groupByKey()
        .collect()
        )
print("Execution time: ", time.time() - start)

buckets = { band: list(docs) for band, docs in lsh }


                                                                                

Execution time:  30.30948567390442


In [8]:
print("First 5 elements of buckets:")
for i, (band, docs) in enumerate(buckets.items()):
    if i >= 5:
        break
    print(f"Band: {band}, Docs: {docs}")

First 5 elements of buckets:
Band: nan, Docs: ['1']
Band: nan, Docs: ['1']
Band: nan, Docs: ['1']
Band: nan, Docs: ['1']
Band: nan, Docs: ['1']


### Candidate pairs identification

The number of bands and rows should be parameters. Select a combination that finds as candidates at least 90% of pairs with 85% similarity and less than 5% of pairs with 60% similarity.

In [7]:
def get_candidate_pairs(buckets):
    candidate_pairs = []

    for band, group in buckets:
        doc_ids = list(group)
        pairs_in_band = list(itertools.combinations(doc_ids, 2))
        candidate_pairs.extend(pairs_in_band)

    return candidate_pairs

# Extract candidate pairs from buckets
candidate_pairs = get_candidate_pairs(buckets)
print(f"Total candidates: {len(candidate_pairs)}")

print("Candidate pairs:")
for i,pair in enumerate(candidate_pairs):
    print(pair)
    if i > 5:
        break

TypeError: cannot unpack non-iterable float object

### Exercise 2.2

Implement a function that, given a news article, returns all other news articles that are at least 85% similar. You should make use of a pre-processed set of candidate pairs, obtained by LSH, and return only the ones that have Jaccard similarity–considering the shingles–above 85%.

### Jaccard Similarity

Jaccard Similarity is a measure of how similar two sets are.
The Jaccard Similarity of two sets is the ratio of the size of the intersection of the sets to the size of the union of the sets.

In [None]:
def jaccard_similarity(set1, set2):
    return len(set1.intersection(set2)) / len(set1.union(set2))

def retrieve_similar_articles(article_id, candidate_pairs, shingles_dict):
    """
    Retrieve articles similar to the given article_id from candidate_pairs.
    Returns a list of similar article IDs.
    """
    similar_articles = []
    for pair in candidate_pairs:
        if article_id in pair:
            other_article_id = pair[0] if pair[0] != article_id else pair[1]
            similarity = jaccard_similarity(set(shingles_dict[article_id]), set(shingles_dict[other_article_id]))
            if similarity >= 0.85:
                similar_articles.append((other_article_id, similarity))
    return similar_articles

# choose random document
doc_id = random.choice(list(sentences.keys()))
doc_text = sentences[doc_id]
shingles = generate_shingles(doc_text)
signature = generate_signature(shingles)
bands = generate_bands(signature)
doc_buckets = [str(bucket) for bucket in bands]


print(f"Similar articles to document {doc_id} with similarity >= 0.85:")
for i, (doc_id, similarity) in enumerate(workflow):
    print(f"{doc_id}: {similarity:.4f}")
    if i > 5:
        break

### Exercise 2.3 

Using a sample of the dataset, evaluate the LSH method by calculating the Jaccard similarities
and obtaining the percentage of false positives and false negatives.

In [None]:
def evaluate_lsh(candidate_pairs, shingles_dict, sample_size=100):
    """
    Evaluate LSH method by calculating Jaccard similarities and obtaining percentages of false positives and false negatives.
    Returns average false positives and false negatives over multiple samples.
    """
    false_positives = 0
    false_negatives = 0
    
    # Sample random articles for evaluation
    sample_articles = random.sample(list(shingles_dict.keys()), sample_size)
    
    for article_id in sample_articles:
        article_shingles = set(shingles_dict[article_id])
        for pair in candidate_pairs:
            if article_id in pair:
                other_article_id = pair[0] if pair[0] != article_id else pair[1]
                other_article_shingles = set(shingles_dict[other_article_id])
                similarity = jaccard_similarity(article_shingles, other_article_shingles)
                
                # Assuming ground truth or manually checking for false positives and negatives
                if similarity < 0.85 and pair in candidate_pairs:
                    false_positives += 1
                elif similarity >= 0.85 and pair not in candidate_pairs:
                    false_negatives += 1
    
    # Calculate percentages
    total_pairs = len(candidate_pairs)
    total_samples = sample_size * len(candidate_pairs)
    percent_false_positives = (false_positives / total_samples) * 100
    percent_false_negatives = (false_negatives / total_samples) * 100
    
    return percent_false_positives, percent_false_negatives

# Example usage
num_samples = 10
total_false_positives = 0
total_false_negatives = 0

for _ in range(num_samples):
    fp, fn = evaluate_lsh()
    total_false_positives += fp
    total_false_negatives += fn

average_false_positives = total_false_positives / num_samples
average_false_negatives = total_false_negatives / num_samples

print(f"Average False Positives: {average_false_positives:.2f}%")
print(f"Average False Negatives: {average_false_negatives:.2f}%")