## **MinHashLSH**

### In local system memory implementation:

In [23]:
import pandas as pd
import numpy as np

In [24]:
class InMemoryMinHashLSH:
    def __init__(self, documents, k=5):
        self.documents = documents
        self.shingles = None
        self.signatures = None
        self.buckets = None
        self.num_bands = 8
        self.k = k
        self.num_perms = 128
    
    def shingling(self, documents=pd.DataFrame([""]), k=5):
        if documents.any().any() == "":
            self.documents = documents

        shingles = set()
        doc_shingles = set()
        for doc in self.documents["text"]:
            for i in range(len(doc) - k + 1):
                shingle = doc[i:i+k]
                shingles.add(shingle)
                doc_shingles.add(shingle)
        shingles = list(shingles)
        
        boolean_vectors = np.full((len(self.documents), len(shingles)), False, dtype=bool)
        for i, doc in enumerate(self.documents["text"]):
            for j, shingle in enumerate(shingles):
                if shingle in doc:
                    boolean_vectors[i, j] = True
        return pd.DataFrame(boolean_vectors, columns=shingles).transpose()
        
    def minhashing(self, shingles_bvs, num_perm=128):
        signatures = []
        for _ in range(0, num_perm):
            hash_funcs = np.random.permutation(shingles_bvs.shape[0])
            signature_row = []
            for j in range(0, shingles_bvs.shape[1]):
                for hash in hash_funcs:
                    if shingles_bvs.iloc[hash, j]:
                        signature_row.append(hash)
                        break
            signatures.append(signature_row)
        return pd.DataFrame(signatures)
    
    def locality_sensitive_hashing(self, signatures, num_bands=8):
        self.num_bands = num_bands
        buckets = {}
        for doc_id in signatures:
            sig = signatures[doc_id]
            for i in range(0, len(sig), self.num_bands):
                band = hash(tuple(sig[i:i+self.num_bands]))
                if band in buckets:
                    buckets[band].add(doc_id)
                else:
                    buckets[band] = {doc_id}
        return buckets

    def run(self, **kwargs):
        if "documents" in kwargs:
            self.documents = kwargs["documents"]
        if "bands" in kwargs:
            self.num_bands = kwargs["bands"]
        if "num_perms" in kwargs:
            self.num_perms = kwargs["num_perms"]
        if "k" in kwargs:
            self.k = kwargs["k"]
        bitvecs = self.shingling(self.documents, self.k)

        # The regular permutation variant of the minhashing algorithm is used
        # assuming that the amount of data process is relatively small to fit in memory
        # of course we'll utilize the row hashing variant in the spark implementation
        self.signatures = self.minhashing(bitvecs, self.num_perms) 
        self.buckets = self.locality_sensitive_hashing(self.signatures, self.num_bands)
        return self.buckets
    
    def __jaccard_similarity(self, a, b):
        return len(a & b) / len(a | b)
    
    def approximateNearestNeighbors(self, key, n):
        n = 1 if n > 1 else n
        sig = self.signatures[key]
        similar_docs = {}
        for i in range(0, len(sig), self.num_bands):
            band_hash = hash(tuple(sig[i:i+self.num_bands]))
            if band_hash in self.buckets:
                for doc_id in self.buckets[band_hash]:
                    if doc_id != key:
                        if doc_id in similar_docs:
                            similar_docs[doc_id] += 1
                        else:
                            similar_docs[doc_id] = 1
        similar_docs = {k: v for k, v in sorted(similar_docs.items(), key=lambda item: item[1], reverse=True)}

        most_similar_docs = []
        for doc_id in similar_docs:
            jac_sim = self.__jaccard_similarity(set(self.signatures[key]), set(self.signatures[doc_id]))
            if jac_sim > n:
                most_similar_docs.append((doc_id, jac_sim))
        return most_similar_docs

In [25]:
test_docs = ["This is a test document", "This document is another test document", "This is a test document","This is a test","This is a document", "Hello word"]
docs_df = pd.DataFrame(test_docs, columns=["text"])
in_memory_lsh = InMemoryMinHashLSH(docs_df)
pd.set_option('display.max_columns', None)

# bool_vecs = in_memory_lsh.shingling()
# sigs = in_memory_lsh.minhashing(bool_vecs, 128)
# buckets = in_memory_lsh.locality_sensitive_hashing(sigs)
buckets = in_memory_lsh.run(k=3, bands=4, num_perms=256, documents=docs_df)
# print(buckets)
in_memory_lsh.approximateNearestNeighbors(0, 0.5)

[(2, 1.0), (4, 0.6666666666666666), (1, 0.5483870967741935), (3, 0.55)]

In [26]:
wos_5736 = pd.read_csv("WebOfScience-5736.txt", sep="\r\n", header=None, names=["text"])
buckets = in_memory_lsh.run(k=16, bands=8, num_perms=256, documents=wos_5736[:100])
in_memory_lsh.approximateNearestNeighbors(59, 0.5)

  wos_5736 = pd.read_csv("WebOfScience-5736.txt", sep="\r\n", header=None, names=["text"])


[]

### Spark implementation:

In [27]:
from pyspark.sql import functions as F 
from pyspark.sql import window as W
from pyspark.sql.types import ArrayType, IntegerType, StringType

Auxiliary functions:

In [28]:
import time
###############
# Function name: hash_gen
# Input: (Int) the number of "hash functions" to generate (Also happen to be signature length)
# Output: (List) of tuples of 3 containing the hash functions parameters, example [(1, 3, 5), (2, 4, 6), ...]
# Purpose: Generate the hash functions parameters for the minhashing process. This function is part of the minhashing method
###############
def hash_gen(num_hashes=128):
   hashes = []
   for i in range(num_hashes):
       hash = [np.random.randint(1, 1000), np.random.randint(1, 1000), np.random.randint(1, 1000)]
       hashes.append(hash)
   return hashes

###############
# Function name: hash_bands
# Input: (List) of integers, (Int) the number of bands to hash the integers into (Defaulted to 2 if longer than signature length)
# Output: (List) of integers of the hashed bands
# Purpose: Hash the signature into bands for the locality sensitive hashing process. This function is part of the locality sensitive hashing method RDD
###############
def hash_bands(row, num_bands=8):
    arr_size = int(len(row) / num_bands)
    arr_size = 2 if arr_size == 0 else arr_size
    bands = []
    for i in range(0, len(row), arr_size):
        bands.append(hash(tuple(row[i:i+arr_size])))
    return bands

###############
# Function name: minhash_udf
# Input: (List) of squashed tuples containing the row id and the boolean vector, (Int) the signature length
# Output: (List) of lists containing the minhashed values
# Purpose: Minhash the boolean vectors into signatures. This function is part of the minhashing method RDD
###############
def minhash_udf(row, hash_funcs):#, sig_length=128):
    hf_len = int(len(hash_funcs))
    # Generating the "infinite" matrix (can't use np.inf as it's not supported by spark, class error w/e)
    final_ans = [[-2 for _ in range(len(row[0][1]))] for _ in range(hf_len)]
    for row in row:
        row_id = row[0]
        row_vals = row[1]
        for i in range(hf_len):
            # Hashing the row id
            curr_hash = ((hash_funcs[i][0] * row_id) + hash_funcs[i][1]) % hash_funcs[i][2]
            for j in range(len(row_vals)):
                if row_vals[j]:
                    # Minhashing with the "infinite" matrix
                    final_ans[i][j] = curr_hash if final_ans[i][j] == -2 else min(final_ans[i][j], curr_hash)
    return final_ans

In [29]:
class SparkMinHashLSH:
    def __init__(self, documents, k=5):
        self.documents = documents
        self.bool_vecs = None
        self.signatures = None
        self.buckets = None
        self.num_bands = 8
        self.k = k
        self.num_perms = 128
    
    ###############
    # Function name: shingling
    # Input: kwargs: (DataFrame) documents=, (Int) k=
    # Output: (DataFrame) of the shingled documents represented as boolean vectors
    #         Sample output:
    #           +-------------+---------+---------+---------+---------+---------+---------+---------+---------+
    #           |shingles     |1        |2        |3        |4        |5        |6        |7        |8        |
    #           +-------------+---------+---------+---------+---------+---------+---------+---------+---------+
    #           |this         |True     |True     |True     |True     |True     |True     |True     |True     |
    #           |is a         |True     |False    |False    |True     |False    |True     |False    |True     |
    #           |test         |False    |True     |False    |True     |False    |True     |True     |True     |
    #           |document     |True     |False    |True     |False    |True     |True     |True     |False    |
    #           +-------------+---------+---------+---------+---------+---------+---------+---------+---------+
    # Purpose: Shingle the documents into k size shingles. This function is part of the minhashing method RDD
    ###############
    def shingling(self, **kwargs):
        self.documents = kwargs["documents"] if "documents" in kwargs else self.documents
        
        # High k values would result in document skipping if the document is smaller than the shingle size
        shingle_size = 5 if "k" not in kwargs else kwargs["k"]

        # The monotonically increasing ID function only generates random unique IDs
        # Thus requiring us to use the row_number function instead
        # The following code is essentially creating an ordered "window"  from 1, the "lit"(literal) function acts the same way as passing an arg to a function 
        id_doc_df = self.documents.withColumn("docID", F.row_number().over(W.Window.orderBy(F.lit(1))))
        shingles = (id_doc_df.rdd.map(lambda x: (x[0],x[1])) 
                   .map(lambda x: list(set([(x[1], x[0][i:i+shingle_size]) for i in range(len(x[0]) - shingle_size + 1)]))) # Shingling with k size shingle
                   .flatMap(lambda x: x) # DF formatting, from shape [(docID, shingle), ...] to (docID, shingle), aka unknown data shape to 2 columns
                   .toDF(["docID", "shingles"])
                    )
                    
        ret_bv = (shingles.groupBy("shingles")
                .pivot("docID") # Rotating the DF based on docIDs
                .agg(F.lit(True)) # Create a true column and aggregate on each present intersected shingles corresponding to the docID
                .fillna(False)) # Fill the NaNs left by the aggregation process with False
        self.bool_vecs = ret_bv
        return ret_bv

    ###############
    # Function name: minhashing
    # Input: kwargs: (DataFrame) bool_vecs=, (Int) sig_length=
    # Output: (DataFrame) of the minhashed signatures
    #         Sample output:
    #         +----+----+----+----+----+----+----+----+
    #         |1   |2   |3   |4   |5   |6   |7   |8   |
    #         +----+----+----+----+----+----+----+----+
    #         |1   |2   |3   |4   |5   |6   |7   |8   |
    #         |2   |3   |4   |5   |6   |7   |8   |9   |
    #         +----+----+----+----+----+----+----+----+
    # Purpose: Minhash the boolean vectors into signatures (A form of embedding), used for the LSH process. 
    ###############
    def minhashing(self, **kwargs):
        if "bool_vecs" not in kwargs:
            raise ValueError("Boolean Vectors not provided")
        bool_vecs = kwargs["bool_vecs"]
        sig_length = 128 if "sig_length" not in kwargs else kwargs["sig_length"]
        
        # Col numbering nothing exciting
        bool_vecs = bool_vecs.withColumn("id", F.row_number().over(W.Window.orderBy(F.lit(1)))) 

        # Grouping the bool vecs into lists for processing
        zipped_bv = bool_vecs.select("id", F.array([F.col(x) for x in bool_vecs.columns if x != "id" and x != "shingles"]).alias("vals")) 

        # Defining the return type so it doesn't become string cuz spark doesnt know how to process lol   
        typed_minhash_udf = F.udf(minhash_udf, ArrayType(ArrayType(IntegerType()))) 
        
        # Generating the hash functions and transforming it into a spark udf friendly format for parameter passing
        # This has to be done separately as spark reconstructs the table from scratch everytime show() is called 
        # or anything that requires the table to be "materialized", side effect of that is random signatures every time show is called
        hash_funcs = hash_gen(sig_length)
        transformed_hash_funcs = F.array([F.array(F.lit(x[0]), F.lit(x[1]), F.lit(x[2])) for x in hash_funcs])
        
        # Black magic, jk, continue reading at the above auxiliary udf func named "minhash_udf"
        sigs = zipped_bv.agg(typed_minhash_udf(F.collect_list(F.struct(F.col("id"), F.col("vals"))), transformed_hash_funcs).alias("sigs")) 

        # Convert it to a more "expected" and "familiar" format of signatures per col, and renaming the columns
        sig = (sigs
                .select(F.explode("sigs"))
                .rdd
                .flatMap(lambda x: x)
                .toDF()
                )
        # Column renaming 
        ret_sig = sig.select([F.col(x).alias(f"{x.strip('_')}") for x in sig.columns])
        self.signatures = ret_sig
        return ret_sig
    
    ###############
    # Function name: locality_sensitive_hashing
    # Input: kwargs: (DataFrame) sigs=, (Int) num_bands=
    # Output: (DataFrame) of the hashed buckets
    #         Sample output:
    #         +------------------+----------+
    #         |buckets           |doc_ids   |
    #         +------------------+----------+
    #         |19248921412094    |[1, 2, 3] |
    #         |46948691412095    |   [2, 4] |
    #         |69420911123456    |      [3] |
    #         +------------------+----------+
    # Purpose: Hash the signatures into buckets, which then can be used to find similar documents, by rehashing the query document and finding the intersecting buckets
    ###############
    def locality_sensitive_hashing(self, **kwargs):
        if "sigs" not in kwargs:
            raise ValueError("Signatures not provided")
        
        num_bands = 10 if "num_bands" not in kwargs else kwargs["num_bands"]
        sigs = kwargs["sigs"]

        # Defining the aggregate expression, basically squashing the columns into lists
        squash_sigs = [F.collect_list(F.col(x)).alias(x) for x in sigs.columns] 
        squashed_sigs = (sigs
                        .agg(*squash_sigs)
                        .select(F.explode(F.array([F.array(F.col(x)) for x in sigs.columns]))
                       .alias("sigs"))
                       .withColumn("doc_id", F.row_number().over(W.Window.orderBy(F.lit(1)))))
        bands_list = squashed_sigs.rdd.map(lambda x: (x[1],hash_bands(x[0][0], num_bands)))
        ret_buckets =   (bands_list
                   .toDF(["doc_id", "bands"])
                   .select("doc_id", F.explode("bands").alias("buckets"))
                   .groupBy("buckets")
                   .agg(F.collect_set("doc_id").alias("doc_ids")))
        self.buckets = ret_buckets
        return ret_buckets
    
    ###############
    # Function name: run
    # Input: kwargs: (DataFrame) documents=, (Int) k=, (Int) bands=, (Int) num_perms=
    # Output: (DataFrame) of the hashed buckets, output of the locality sensitive hashing function
    # Purpose: The main function to run the entire process of minhashing and locality sensitive hashing
    ###############
    def run(self, **kwargs):
        if "documents" in kwargs:
            documents = kwargs["documents"]
        if "bands" in kwargs:
            self.num_bands = kwargs["bands"]
        if "num_perms" in kwargs:
            self.num_perms = kwargs["num_perms"]
        if "k" in kwargs:
            self.k = kwargs["k"]
        t_start = time.time()
        bool_vecs = self.shingling(documents=documents, k=self.k)
        t_stop = time.time()
        print(f"Shingling took {t_stop - t_start} seconds")

        t_start = time.time()
        signatures = self.minhashing(bool_vecs=bool_vecs, sig_length=self.num_perms)
        t_stop = time.time()
        print(f"Minhashing took {t_stop - t_start} seconds")

        self.signatures = signatures
        t_start = time.time()
        buckets = self.locality_sensitive_hashing(sigs=signatures, num_bands=self.num_bands)
        t_stop = time.time()
        print(f"Locality Sensitive Hashing took {t_stop - t_start} seconds")
        return buckets

    ###############
    # Function name: jaccard_similarity
    # Input: (String) first_key, (String) second_key
    # Output: (Float) of the jaccard similarity between the two documents
    # Purpose: Calculate the jaccard similarity between two documents columns
    ###############
    def jaccard_similarity(self, first_key, second_key):
        sig_table = self.signatures
        union = sig_table.select(f"{first_key}").union(sig_table.select(f"{second_key}")).distinct().count()
        intersect = sig_table.select(f"{first_key}").intersect(sig_table.select(f"{second_key}")).distinct().count()
        return intersect / union
    

    def approximateNearestNeighbors(self, key, n,):
        
        num_bands = int(self.num_bands)
        selected_sigs = (self.signatures
            .select(f"{key}")
            .agg(F.collect_list(f"{key}"))
            .rdd.map(lambda x: hash_bands(x[0], num_bands))
            .flatMap(lambda x: x)
            .map(lambda x: (x, ))
            .toDF(["buckets"])
        )

        similar_docs = buckets.join(selected_sigs, "buckets")
        doc_id = similar_docs.select(F.explode("doc_ids").alias("doc_id")).distinct().collect()
        similar_docs = []

        # TODO: Implement threading for parallel processing
        for doc in doc_id:
            jac_sim = self.jaccard_similarity(key, doc.doc_id)
            if jac_sim > n:
                similar_docs.append((doc.doc_id, jac_sim))
        
        return similar_docs 

In [30]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("minHashLSH").getOrCreate()
sc = spark.sparkContext

In [31]:
data = [("This is a test document",), ("This document is another test document",), ("This is a test document",), ("Hello wordello ",), ("Word Hello world",), ("hello", )]
df = spark.createDataFrame(data, ["text"])
spark_lsh = SparkMinHashLSH(df)

# Increase the number of bands and perms to decrease the sensitivity of the LSH, effectively reducing the number of false positives
shingle_length = 2
signature_length = 128
num_bands = 8

In [32]:
bool_vecs = spark_lsh.shingling(documents=df, k=shingle_length)
bool_vecs.show(10)

+--------+-----+-----+-----+-----+-----+-----+
|shingles|    1|    2|    3|    4|    5|    6|
+--------+-----+-----+-----+-----+-----+-----+
|      ld|false|false|false|false| true|false|
|       H|false|false|false|false| true|false|
|      en| true| true| true|false|false|false|
|       a| true| true| true|false|false|false|
|      nt| true| true| true|false|false|false|
|      t | true| true| true|false|false|false|
|      st| true| true| true|false|false|false|
|      oc| true| true| true|false|false|false|
|      a | true|false| true|false|false|false|
|      rl|false|false|false|false| true|false|
+--------+-----+-----+-----+-----+-----+-----+
only showing top 10 rows



In [33]:
sigs = spark_lsh.minhashing(bool_vecs=bool_vecs, sig_length=signature_length)
sigs.show(10)

+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
+---+---+---+---+---+---+
|  1| 21|  1| 11|  2| 11|
|  7|  7|  7|120| 12|210|
|  1|  1|  1| 13| 13| 17|
|140|140|140|240|120|280|
|  5|  5|  5|  2|  2|  2|
| 13| 20| 13| 88|  6| 81|
|  8|  8|  8|  0|  0|  0|
|136|136|136|154|130|421|
|  1|  2|  1| 25| 24| 73|
| 97| 97| 97| 27| 35|105|
+---+---+---+---+---+---+
only showing top 10 rows



In [34]:
buckets = spark_lsh.locality_sensitive_hashing(sigs=sigs, num_bands=num_bands)
buckets.show(10)

+--------------------+-------+
|             buckets|doc_ids|
+--------------------+-------+
| 1368473407760555883| [1, 3]|
|-1091538363657981302|    [5]|
|-2359805777066111308|    [6]|
|  789066639884840564|    [4]|
| 5391101189165332098| [1, 3]|
| 2975859297736561103|    [4]|
|  528502270807337801| [1, 3]|
| 2906227291019145655|    [6]|
| 7728498595413907917|    [2]|
|  472232209436246182|    [5]|
+--------------------+-------+
only showing top 10 rows



In [35]:
doc_id = 1
similarity_percentage = 0.6
spark_lsh.approximateNearestNeighbors(doc_id, similarity_percentage)

[(1, 1.0), (3, 1.0)]

In [36]:
buckets = spark_lsh.run(documents=df,k=2, bands=8, num_perms=16)
buckets.show(10)

Shingling took 9.954345226287842 seconds
Minhashing took 2.7686731815338135 seconds
Locality Sensitive Hashing took 2.3187873363494873 seconds
+--------------------+---------+
|             buckets|  doc_ids|
+--------------------+---------+
| 3883364387212965600|[1, 2, 3]|
|-8907760414718763540|   [1, 3]|
| 1599942983381215927|   [1, 3]|
| 3244284212049768524|   [5, 4]|
|-8645571619998295187|      [6]|
| 5646003302091366357|      [2]|
| 8792398846230568615|      [6]|
|  735482836909611368|      [5]|
| 5763571626872289010|   [1, 3]|
| 8389048192121911274|[1, 2, 3]|
+--------------------+---------+
only showing top 10 rows



In [37]:
df_web_of_science_5736 = spark.read.text("WebOfScience-5736.txt", lineSep="\r\n")
df_web_of_science_5736.show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [38]:
wos_bucket = spark_lsh.run(documents=df_web_of_science_5736.limit(100), k=20, bands=8, num_perms=32)
wos_bucket.show(10)

Shingling took 2.2737371921539307 seconds
Minhashing took 18.19093418121338 seconds
Locality Sensitive Hashing took 16.542454481124878 seconds
+--------------------+--------------------+
|             buckets|             doc_ids|
+--------------------+--------------------+
| 8483113381247461651|                [82]|
| 7571275679603839956|    [85, 53, 58, 76]|
|-8725076169812727400|[66, 45, 37, 74, ...|
|  626009420018547013|[30, 82, 38, 74, ...|
| -445724793277990459|[81, 34, 31, 38, ...|
| 6917473940299867721|[15, 66, 60, 1, 1...|
|-2534679458921155598|                 [7]|
|-4630518706558724794|            [66, 27]|
| 3228296493935762207|     [7, 58, 94, 92]|
|-2433500303205739586|     [63, 53, 6, 69]|
+--------------------+--------------------+
only showing top 10 rows



In [39]:
spark_lsh.approximateNearestNeighbors(60, 0.3)

[]