# CS3319-02 - Coding 2
## Locality Sensitive Hashing

The locality sensitive hashing (LSH) algorithm is efficient in near-duplicate document detection. In this coding, you're given the file *''docs_for_lsh.csv''*, where the documents are processed into set of k-shingles (k = 8, 9, 10). *''docs_for_lsh.csv''* contains 201 columns, where column 'doc_id' represents the unique id of each document, and from column '0' to column '199', each column represents a unique shingle. If a document contains a shingle ordered with **i**, then the corresponding row will have value 1 in column **'i'**, otherwise it's 0. You need to implement the LSH algorithm and ask the problems below.

File path: **../input/docs-for-lsh/docs_for_lsh.csv**

### Your task

Use minhash algoirthm to create signature of each document, and find 'the most similar' documents under Jaccard similarity. 
Parameters you need to determine:
1) Length of signature (number of distinct minhash functions) *n*. Recommanded value: n > 20.

2) Number of bands that divide the signature matrix *b*. Recommanded value: b > n // 10.

In [1]:
!pip install pyspark==3.2.1

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple, http://pypi.douban.com/simple, https://pypi.tuna.tsinghua.edu.cn/simple, http://pypi.mirrors.ustc.edu.cn/simple, http://pypi.hustunique.com


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, array
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, IntegerType

file_path = '/Users/husky/Downloads/docs_for_lsh.csv'

In [4]:
spark = SparkSession.builder \
    .appName("lsh") \
    .getOrCreate()
    
df = spark.read.csv(file_path, header=True, inferSchema=True) # read quicker than using pandas

                                                                                

In [5]:
def get_shingles(value_list):
    return [index for index, value in enumerate(value_list) if value > 0]

get_shingles_udf = udf(get_shingles, ArrayType(IntegerType()))

df_with_shingles = df.withColumn("values", array(*[col(c) for c in df.columns if c != 'doc_id']))  
df_with_shingles = df_with_shingles.withColumn("shingles", get_shingles_udf("values"))

# change to set
shingles_rdd = df_with_shingles.select("shingles").rdd.map(lambda row: set(row[0]))

# collect to a list
shingles_list = shingles_rdd.collect()

                                                                                

In [6]:
import numpy as np
import os
from collections import defaultdict
from tqdm import tqdm
import random
import heapq

class LSH:
    def __init__(self, n_signature, n_bands, n_shingles):
        self.n_signature = n_signature # number of hash function
        self.n_bands = n_bands
        self.band_hash_buckets = [defaultdict(list) for _ in range(n_bands)]
        self.signature_matrix = None
        self.permutations = [np.random.permutation(np.arange(n_shingles)) for _ in range(self.n_signature)]
        self.large_prime = 1000033  # big prime
        self.max_shingle_id = 199   
        self.hash_functions = self.generate_hash_functions(n_signature)
        self.candidates = dict()

    def generate_hash_functions(self, n):
        hash_functions = []
        for _ in range(n):
            a = random.randint(1, self.large_prime - 1)
            b = random.randint(0, self.large_prime - 1)
            hash_function = lambda x, a=a, b=b: ((a * x + b) % self.large_prime) & self.max_shingle_id
            hash_functions.append(hash_function)
        return hash_functions

    def minhash_2(self, shingles):
        # hash function 2
        signature = np.full(self.n_signature, np.inf, dtype=np.int32)
        for shingle in shingles:
            for i, hash_function in enumerate(self.hash_functions):
                hash_value = hash_function(shingle)
                if hash_value < signature[i]:
                    signature[i] = hash_value
        return signature
        
    def minhash(self, shingles): 
        # use permutation as hash function 
        signature = np.full(self.n_signature, np.inf, dtype=np.int32)

        shingles_array = np.array(list(shingles), dtype=np.int32)

        for i, permutation in enumerate(self.permutations):
            min_permuted_index = np.min(permutation[shingles_array])
            signature[i] = min_permuted_index

        return signature
    
    def build_signature_matrix(self, shingles, re_build = False, hash_function = "permutation"):
        
        hash_function = self.minhash if hash_function == "permutation" else self.minhash_2
        
        if os.path.exists("signature_matrix_{}.npy".format(self.n_signature)) and (not re_build):
            self.signature_matrix = np.load("signature_matrix_{}.npy".format(self.n_signature))
        else:
            n_docs = len(shingles)
            self.signature_matrix = np.zeros((self.n_signature, n_docs), dtype=np.int32)
            for i, doc_shingles in tqdm(enumerate(shingles), desc = "Building Signature Matrix...", total = len(shingles)):

                self.signature_matrix[:, i] = hash_function(doc_shingles)

            print("signature matrix built!")
            np.save("signature_matrix_{}".format(self.n_signature), self.signature_matrix)
            
    def lsh(self):
        n_rows = self.n_signature // self.n_bands
        for i in range(self.n_bands):
            start = i * n_rows
            end = start + n_rows
            band = self.signature_matrix[start:end, :]
            for j in range(band.shape[1]):
                band_hash = hash(tuple(band[:, j]))
                self.band_hash_buckets[i][band_hash].append(j)
    
    def query(self, query_signature):
        if hash(tuple(query_signature)) in self.candidates.keys():
            return self.candidates[hash(tuple(query_signature))]
        else:
            candidates = set()
            n_rows = self.n_signature // self.n_bands
            for i in range(self.n_bands):
                start = i * n_rows
                end = start + n_rows
                band_hash = hash(tuple(query_signature[start:end]))
                if band_hash in self.band_hash_buckets[i]:
                    candidates.update(self.band_hash_buckets[i][band_hash])
            self.candidates[hash(tuple(query_signature))] = candidates
            return candidates

    def find_similar_docs(self, shingles, threshold, re_build = False):
        self.build_signature_matrix(shingles, re_build = re_build)
        self.lsh()
        section_num = 10
        step = len(shingles) // section_num
        similar_docs = []
        for section in range(section_num):
            del similar_docs
            similar_docs = []
            section_shingles = shingles[section: section + step]
            for i, doc_shingles in tqdm(enumerate(section_shingles), desc="section {}, Calculating...".format(section), total = len(section_shingles)):
                query_signature = self.signature_matrix[:, i]
                candidates = self.query(query_signature)
                for j in candidates:
                    if j != i:
                        jaccard = len((doc_shingles) & (shingles[j])) / len((doc_shingles) | (shingles[j]))
                        if jaccard >= threshold:
                            similar_docs.append((i, j, jaccard))
            np.save("similar_docs_{}_{}.npy".format(self.n_signature, section), similar_docs)
    
    def find_most_similar_docs(self, shingles, threshold, doc_pair_num=10,  early_stop = False, rebuild = False):
        self.build_signature_matrix(shingles, re_build = rebuild)
        self.lsh()

        # small heap
        top_similar_docs_heap = []
        
        cnt = 0

        for i, doc_shingles in tqdm(enumerate(shingles), desc="Calculating...", total=len(shingles)):
            query_signature = self.signature_matrix[:, i]
            candidates = self.query(query_signature)
            for j in candidates:
                if j != i:
                    jaccard = len(doc_shingles & shingles[j]) / len(doc_shingles | shingles[j])
                    if jaccard >= threshold:
                        if len(top_similar_docs_heap) < doc_pair_num:
                            heapq.heappush(top_similar_docs_heap, (jaccard, i, j))
                        elif jaccard > top_similar_docs_heap[0][0]:
                            heapq.heapreplace(top_similar_docs_heap, (jaccard, i, j))
                    if jaccard >= 1:
                        cnt += 1
            print("doc {} finished".format(i))
            # print("current top similar docs: ", top_similar_docs_heap)
            if (cnt >= doc_pair_num) and early_stop:
                return top_similar_docs_heap
        
        # reverse
        top_similar_docs = heapq.nlargest(doc_pair_num, top_similar_docs_heap)

        np.save("similar_docs_top-{}.npy".format(doc_pair_num), top_similar_docs)
        return top_similar_docs
    
    def find_simlar_docs_to_index(self, shingles, index, threshold, re_build = False):
        self.build_signature_matrix(shingles, re_build = re_build)
        self.lsh()
        similar_docs = []
        query_signature = self.signature_matrix[:, index]
        candidates = self.query(query_signature)
        print(len(candidates))
        for j in candidates:
            if j != index:
                jaccard = len(shingles[index] & shingles[j]) / len(shingles[index] | shingles[j])
                if jaccard >= threshold:
                    similar_docs.append((index, j, jaccard))
        return similar_docs


In [7]:
n_signature = 60
n_bands = n_signature // 10 + 1
threshold = 0.8
n_shingles = 200 # get the total number of shingles, which should be 200
lsh = LSH(n_signature, n_bands, n_shingles)

# find top 10 similar docs

In [None]:
ten_most_similar_docs = lsh.find_most_similar_docs(shingles_list, threshold, doc_pair_num=10, early_stop = True, rebuild = True)

  multiarray.copyto(a, fill_value, casting='unsafe')
Building Signature Matrix...: 100%|██████████| 1000000/1000000 [01:27<00:00, 11371.40it/s]


signature matrix built!


Calculating...:   0%|          | 57/1000000 [00:00<26:01, 640.46it/s]

doc 0 finished
doc 1 finished
doc 2 finished
doc 3 finished
doc 4 finished
doc 5 finished
doc 6 finished
doc 7 finished
doc 8 finished
doc 9 finished
doc 10 finished
doc 11 finished
doc 12 finished
doc 13 finished
doc 14 finished
doc 15 finished
doc 16 finished
doc 17 finished
doc 18 finished
doc 19 finished
doc 20 finished
doc 21 finished
doc 22 finished
doc 23 finished
doc 24 finished
doc 25 finished
doc 26 finished
doc 27 finished
doc 28 finished
doc 29 finished
doc 30 finished
doc 31 finished
doc 32 finished
doc 33 finished
doc 34 finished
doc 35 finished
doc 36 finished
doc 37 finished
doc 38 finished
doc 39 finished
doc 40 finished
doc 41 finished
doc 42 finished
doc 43 finished
doc 44 finished
doc 45 finished
doc 46 finished
doc 47 finished
doc 48 finished
doc 49 finished
doc 50 finished
doc 51 finished
doc 52 finished
doc 53 finished
doc 54 finished
doc 55 finished
doc 56 finished
doc 57 finished





In [None]:
ten_most_similar_docs

[(1.0, 10, 24190),
 (1.0, 24, 42234),
 (1.0, 48, 871),
 (1.0, 28, 13709),
 (1.0, 34, 54799),
 (1.0, 57, 88325),
 (1.0, 56, 18490),
 (1.0, 43, 74180),
 (1.0, 39, 30204),
 (1.0, 53, 77481)]

Problem: For document 0 (the one with id '0'), list the **30** most similar document ids (except document 0 itself). You can verify your results with the [sklearn.metrics.jaccard_score()](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.jaccard_score.html) function.

Tips: You can adjust your parameters to hash the documents with similarity *s > 0.8* into the same bucket.

# Find top 30 docs similar to doc 0

In [8]:
# similar to doc 0
doc_0 = lsh.find_simlar_docs_to_index(shingles_list, index = 0, threshold = threshold, re_build=False)

Building Signature Matrix...:   0%|          | 0/1000000 [00:00<?, ?it/s]

  multiarray.copyto(a, fill_value, casting='unsafe')
Building Signature Matrix...: 100%|██████████| 1000000/1000000 [02:16<00:00, 7301.08it/s]


signature matrix built!
2638


In [9]:
similar_to_doc_0 = [pair[1] for pair in doc_0]

query_signature = lsh.signature_matrix[:, 0]

candidates = lsh.candidates[hash(tuple(query_signature))]

print("Found {} documents similar to doc 0".format(len(similar_to_doc_0)))
print(similar_to_doc_0)

print("there are {} candidates".format(len(candidates)))
print(len(candidates))

Found 30 documents similar to doc 0
[99370, 91300, 1331, 58694, 67032, 58852, 84306, 26980, 2575, 84520, 52076, 68730, 69724, 28910, 20854, 62080, 46220, 39310, 72156, 39784, 48131, 23585, 40298, 81289, 81379, 81480, 89825, 89833, 32681, 73681]
there are 2638 candidates
2638


# Compare with ground truth

In [10]:
# ground truth, shingles_list[0] is the index = 0 document

result = []

for i in tqdm(range(1, len(shingles_list))):
    jaccard = len(shingles_list[0] & shingles_list[i]) / len(shingles_list[0] | shingles_list[i])
    if jaccard >= threshold:
        result.append((i, jaccard))
        
        

100%|██████████| 999999/999999 [00:01<00:00, 892009.64it/s]


In [11]:
result.sort(key=lambda x: x[1], reverse=True)

gt = [pair[0] for pair in result]

print("there are total {} documents similar to doc 0".format(len(gt)))

print(gt)

there are total 30 documents similar to doc 0
[32681, 89833, 91300, 20854, 28910, 39310, 40298, 62080, 81379, 81480, 84306, 89825, 1331, 2575, 23585, 26980, 39784, 46220, 48131, 52076, 58694, 58852, 67032, 68730, 69724, 72156, 73681, 81289, 84520, 99370]


In [12]:
# calculate the precision/recall between doc_0 and gt

def precision_recall(result, gt):
    tp = 0
    fp = 0
    fn = 0
    for doc in result:
        if doc in gt:
            tp += 1
        else:
            fp += 1
            
    for doc in gt:
        if doc not in result:
            fn += 1
            
    precision = tp / (tp + fp)
    
    recall = tp / (tp + fn)
    
    return precision, recall
    

p, r = precision_recall(similar_to_doc_0, gt)

print("precision: ", p)

print("recall: ", r)

precision:  1.0
recall:  1.0


In [13]:
# calculate the precision/recall between candidates and gt

p,r = precision_recall(candidates, gt)

print("precision: ", p)

print("recall: ", r)

precision:  0.011372251705837756
recall:  1.0
