## IMPORTING DATA

In [1]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
conf = SparkConf()
conf = conf.setAppName("PySpark LSH") \
    .set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/bda.lsh") \
    .set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/bda.lsh") \
    .set('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12-3.0.1') \
    .set("spark.local.dir", "C:/tmp") \

In [3]:
sc.stop()
sc = SparkContext(conf=conf)

In [4]:
spark = SparkSession(sc)

df = spark.read.format('com.mongodb.spark.sql.DefaultSource').load()
df.show()


+--------------------+---+------------+----+----+---------------------+--------------------+
|                 _id| id|is_duplicate|qid1|qid2|            question1|           question2|
+--------------------+---+------------+----+----+---------------------+--------------------+
|[6066bd35b5a6a54b...|  0|       false|   1|   2| What is the step ...|What is the step ...|
|[6066bd35b5a6a54b...|  1|       false|   3|   4| What is the story...|What would happen...|
|[6066bd35b5a6a54b...|  2|       false|   5|   6| How can I increas...|How can Internet ...|
|[6066bd35b5a6a54b...|  3|       false|   7|   8| Why am I mentally...|Find the remainde...|
|[6066bd35b5a6a54b...|  4|       false|   9|  10| Which one dissolv...|Which fish would ...|
|[6066bd35b5a6a54b...|  5|        true|  11|  12| Astrology: I am a...|I'm a triple Capr...|
|[6066bd35b5a6a54b...|  6|       false|  13|  14|  Should I buy tiago?|What keeps childe...|
|[6066bd35b5a6a54b...|  7|        true|  15|  16| How can I be a go...

## PREPROCESSING

In [5]:
q1 = df.select(col("qid1").alias("qid"), col("question1").alias("question")).limit(100000)
q2 = df.select(col("qid2").alias("qid"), col("question2").alias("question")).limit(100000)

In [6]:
ques = q1.union(q2).dropDuplicates()

In [7]:
ques = ques.orderBy('qid', ascending=True).cache()

In [8]:
ques.show()

+---+---------------------+
|qid|             question|
+---+---------------------+
|  1| What is the step ...|
|  2| What is the step ...|
|  3| What is the story...|
|  4| What would happen...|
|  5| How can I increas...|
|  6| How can Internet ...|
|  7| Why am I mentally...|
|  8| Find the remainde...|
|  9| Which one dissolv...|
| 10| Which fish would ...|
| 11| Astrology: I am a...|
| 12| I'm a triple Capr...|
| 13|  Should I buy tiago?|
| 14| What keeps childe...|
| 15| How can I be a go...|
| 16| What should I do ...|
| 17|When do you use シ...|
| 18| When do you use "...|
| 19| Motorola (company...|
| 20| How do I hack Mot...|
+---+---------------------+
only showing top 20 rows



In [9]:
qid_to_ques = dict()

for row in ques.collect():
    qid_to_ques[row.qid] = row.question

## GENERATING SHINGLES

In [10]:
def ques_to_shingle(q, k=5):
    
    n = len(q)
    
    shingle_set = []
    
    for i in range(n-k+1):
        shingle_set.append(q[i:i+k])
    
    return shingle_set

In [11]:
create_shingles = udf(ques_to_shingle, ArrayType(StringType()))
ques_shingle = ques.withColumn("shingles",create_shingles(ques.question)).cache()
ques_shingle.show()

+---+---------------------+--------------------+
|qid|             question|            shingles|
+---+---------------------+--------------------+
|  1| What is the step ...|[What , hat i, at...|
|  2| What is the step ...|[What , hat i, at...|
|  3| What is the story...|[What , hat i, at...|
|  4| What would happen...|[What , hat w, at...|
|  5| How can I increas...|[How c, ow ca, w ...|
|  6| How can Internet ...|[How c, ow ca, w ...|
|  7| Why am I mentally...|[Why a, hy am, y ...|
|  8| Find the remainde...|[Find , ind t, nd...|
|  9| Which one dissolv...|[Which, hich , ic...|
| 10| Which fish would ...|[Which, hich , ic...|
| 11| Astrology: I am a...|[Astro, strol, tr...|
| 12| I'm a triple Capr...|[I'm a, 'm a , m ...|
| 13|  Should I buy tiago?|[Shoul, hould, ou...|
| 14| What keeps childe...|[What , hat k, at...|
| 15| How can I be a go...|[How c, ow ca, w ...|
| 16| What should I do ...|[What , hat s, at...|
| 17|When do you use シ...|[When , hen d, en...|
| 18| When do you use

## MAPPING SET-OF-SHINGLES TO SET-OF-INTEGERS

In [12]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="shingles", outputCol="hashed_shingles1",minDF=11,vocabSize=1<<20)

model = cv.fit(ques_shingle)

ques_shingle_hashed = model.transform(ques_shingle).cache()
ques_shingle_hashed.select(["qid","shingles","hashed_shingles1"]).show(truncate=False)


+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
hashed_shingles = udf( (lambda shingle : shingle.indices.tolist()) , ArrayType(IntegerType()))
ques_shingle_hashed = ques_shingle_hashed.withColumn("hashed_shingles",hashed_shingles(ques_shingle_hashed.hashed_shingles1)).cache()
ques_shingle_hashed.show()

+---+---------------------+--------------------+--------------------+--------------------+
|qid|             question|            shingles|    hashed_shingles1|     hashed_shingles|
+---+---------------------+--------------------+--------------------+--------------------+
|  1| What is the step ...|[What , hat i, at...|(71574,[0,1,2,5,6...|[0, 1, 2, 5, 6, 7...|
|  2| What is the step ...|[What , hat i, at...|(71574,[0,1,2,5,6...|[0, 1, 2, 5, 6, 7...|
|  3| What is the story...|[What , hat i, at...|(71574,[0,1,2,5,6...|[0, 1, 2, 5, 6, 7...|
|  4| What would happen...|[What , hat w, at...|(71574,[0,1,14,21...|[0, 1, 14, 21, 57...|
|  5| How can I increas...|[How c, ow ca, w ...|(71574,[0,14,18,2...|[0, 14, 18, 28, 3...|
|  6| How can Internet ...|[How c, ow ca, w ...|(71574,[18,34,36,...|[18, 34, 36, 37, ...|
|  7| Why am I mentally...|[Why a, hy am, y ...|(71574,[18,28,34,...|[18, 28, 34, 36, ...|
|  8| Find the remainde...|[Find , ind t, nd...|(71574,[0,133,135...|[0, 133, 135, 140...|

In [14]:
ques_shingle_hashed = ques_shingle_hashed.select(["qid","hashed_shingles"]).cache()
ques_shingle_hashed.show()

+---+--------------------+
|qid|     hashed_shingles|
+---+--------------------+
|  1|[0, 1, 2, 5, 6, 7...|
|  2|[0, 1, 2, 5, 6, 7...|
|  3|[0, 1, 2, 5, 6, 7...|
|  4|[0, 1, 14, 21, 57...|
|  5|[0, 14, 18, 28, 3...|
|  6|[18, 34, 36, 37, ...|
|  7|[18, 28, 34, 36, ...|
|  8|[0, 133, 135, 140...|
|  9|[4, 43, 63, 64, 1...|
| 10|[21, 43, 63, 64, ...|
| 11|[4, 39, 41, 47, 5...|
| 12|[1, 4, 39, 41, 86...|
| 13|[21, 50, 83, 87, ...|
| 14|[1, 4, 62, 65, 18...|
| 15|[18, 28, 34, 36, ...|
| 16|[1, 21, 50, 72, 7...|
| 17|[17, 66, 89, 90, ...|
| 18|[17, 66, 89, 90, ...|
| 19|[28, 214, 440, 46...|
| 20|[9, 15, 16, 19, 3...|
+---+--------------------+
only showing top 20 rows



## MIN-HASHING TO GENERATE SIGNATURES

In [15]:
num_hash_fns = 100

In [16]:
def hash_func(a,x,n):
    # (a*x+1)%n - family of hash functions
    return (a*x+1)%n

In [17]:
def min_hash(shingle, n=165931):
    
    signature = [float("inf")]*num_hash_fns
    
    for h in range(num_hash_fns):
        for i in shingle:
            hash_val = hash_func(h+1,i,n)
            if hash_val < signature[h]:
                signature[h] = hash_val
    
    return signature

In [18]:
create_signatures = udf(min_hash, ArrayType(IntegerType()))
ques_signatures = ques_shingle_hashed.withColumn("signatures",create_signatures(ques_shingle_hashed.hashed_shingles)).cache()
ques_signatures.show()

+---+--------------------+--------------------+
|qid|     hashed_shingles|          signatures|
+---+--------------------+--------------------+
|  1|[0, 1, 2, 5, 6, 7...|[1, 1, 1, 1, 1, 1...|
|  2|[0, 1, 2, 5, 6, 7...|[1, 1, 1, 1, 1, 1...|
|  3|[0, 1, 2, 5, 6, 7...|[1, 1, 1, 1, 1, 1...|
|  4|[0, 1, 14, 21, 57...|[1, 1, 1, 1, 1, 1...|
|  5|[0, 14, 18, 28, 3...|[1, 1, 1, 1, 1, 1...|
|  6|[18, 34, 36, 37, ...|[19, 37, 55, 73, ...|
|  7|[18, 28, 34, 36, ...|[19, 37, 55, 73, ...|
|  8|[0, 133, 135, 140...|[1, 1, 1, 1, 1, 1...|
|  9|[4, 43, 63, 64, 1...|[5, 9, 13, 17, 21...|
| 10|[21, 43, 63, 64, ...|[22, 43, 64, 85, ...|
| 11|[4, 39, 41, 47, 5...|[5, 9, 13, 17, 21...|
| 12|[1, 4, 39, 41, 86...|[2, 3, 4, 5, 6, 7...|
| 13|[21, 50, 83, 87, ...|[22, 43, 64, 85, ...|
| 14|[1, 4, 62, 65, 18...|[2, 3, 4, 5, 6, 7...|
| 15|[18, 28, 34, 36, ...|[19, 37, 55, 73, ...|
| 16|[1, 21, 50, 72, 7...|[2, 3, 4, 5, 6, 7...|
| 17|[17, 66, 89, 90, ...|[18, 35, 52, 69, ...|
| 18|[17, 66, 89, 90, ...|[18, 35, 52, 6

## DIVIDING INTO BANDS

In [19]:
bands = 10
rows_per_band = num_hash_fns//bands

def LSH_buckets(signature, k = 100000):
    
    hash_buckets = []
    
    for b in range(bands):
        sign_b = 1
        for i in range(b*rows_per_band,(b+1)*rows_per_band):
            if (signature[i] is not None):
                sign_b *= (signature[i]%k)
        
        hash_buckets.append(sign_b%k)
    
    return hash_buckets

In [20]:
create_hash_buckets = udf(LSH_buckets, StructType([ StructField("band"+str(b+1), IntegerType(), False) for b in range(bands)]))
ques_bands = ques_signatures.withColumn("Hash_Bands",create_hash_buckets(ques_signatures.signatures)).cache()
ques_bands.show()

+---+--------------------+--------------------+--------------------+
|qid|     hashed_shingles|          signatures|          Hash_Bands|
+---+--------------------+--------------------+--------------------+
|  1|[0, 1, 2, 5, 6, 7...|[1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
|  2|[0, 1, 2, 5, 6, 7...|[1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
|  3|[0, 1, 2, 5, 6, 7...|[1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
|  4|[0, 1, 14, 21, 57...|[1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
|  5|[0, 14, 18, 28, 3...|[1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
|  6|[18, 34, 36, 37, ...|[19, 37, 55, 73, ...|[6975, 31375, 682...|
|  7|[18, 28, 34, 36, ...|[19, 37, 55, 73, ...|[42500, 4125, 312...|
|  8|[0, 133, 135, 140...|[1, 1, 1, 1, 1, 1...|[1, 1, 1, 1, 1, 1...|
|  9|[4, 43, 63, 64, 1...|[5, 9, 13, 17, 21...|[20125, 30925, 69...|
| 10|[21, 43, 63, 64, ...|[22, 43, 64, 85, ...|[10400, 28000, 96...|
| 11|[4, 39, 41, 47, 5...|[5, 9, 13, 17, 21...|[46750, 43535, 98...|
| 12|[1, 4, 39, 41, 86...|[2, 3, 4

In [21]:
# ques_band_buckets = ques_bands.select(["qid","question","signatures"] + ["Hash_Bands.band"+str(b+1) for b in range(bands)] ).cache()
# ques_band_buckets.show()

## Function to generate candidate pairs given a bucket.

In [22]:
def comparison(qid, hashed_shingles, thresh=0.75):
    n = len(hashed_shingles)
    
    qid_all = []

    for i in range(n):
        for j in range(i+1,n):
            q1 = set(hashed_shingles[i])
            q2 = set(hashed_shingles[j])
            
            jac = len(q1&q2)/(1+len(q1|q2))
            if jac>=thresh:
                qid1 = qid[i]
                qid2 = qid[j]
                qid_all.append([qid1,qid2])

    if len(qid_all)==0:
        return None
    else:
        return qid_all

## GENERATING CANDIDATE PAIRS

In [23]:
# for b in range(1,bands+1):

b=1

ques_band_b = ques_bands.groupby("Hash_Bands.band"+str(b)).agg(collect_list("qid"),collect_list("hashed_shingles")).cache()
ques_band_b.show()

+-----+--------------------+-----------------------------+
|band1|   collect_list(qid)|collect_list(hashed_shingles)|
+-----+--------------------+-----------------------------+
|41575|              [2496]|         [[164, 194, 195, ...|
|18800|              [4200]|         [[1184, 1301, 132...|
|78400|[16750, 24169, 43...|         [[6316, 8555, 266...|
| 9376|     [24663, 122288]|         [[625, 669, 788, ...|
|83250|             [25774]|         [[254, 733, 1237,...|
|97186|             [32153]|         [[150, 153, 207, ...|
|18944|             [72579]|         [[55, 158, 190, 2...|
|63087|             [78240]|         [[40, 44, 47, 51,...|
|26425|             [97668]|         [[64, 81, 107, 15...|
|35820|            [108950]|         [[150, 207, 215, ...|
| 3175|            [117696]|         [[266, 293, 332, ...|
|90228|            [141960]|         [[150, 180, 207, ...|
| 4900|            [145204]|         [[44, 47, 51, 146...|
|19200|[17, 18, 26, 61, ...|         [[17, 66, 89, 90,..

In [24]:
candidate_pairs = udf(comparison, ArrayType(ArrayType(IntegerType())))

candidates_band_b = ques_band_b.withColumn("Candidates",candidate_pairs(ques_band_b[1],ques_band_b[2])).cache()
candidates_band_b.show()

+-----+--------------------+-----------------------------+--------------------+
|band1|   collect_list(qid)|collect_list(hashed_shingles)|          Candidates|
+-----+--------------------+-----------------------------+--------------------+
|41575|              [2496]|         [[164, 194, 195, ...|                null|
|18800|              [4200]|         [[1184, 1301, 132...|                null|
|78400|[16750, 24169, 43...|         [[6316, 8555, 266...|    [[93350, 93351]]|
| 9376|     [24663, 122288]|         [[625, 669, 788, ...|                null|
|83250|             [25774]|         [[254, 733, 1237,...|                null|
|97186|             [32153]|         [[150, 153, 207, ...|                null|
|18944|             [72579]|         [[55, 158, 190, 2...|                null|
|63087|             [78240]|         [[40, 44, 47, 51,...|                null|
|26425|             [97668]|         [[64, 81, 107, 15...|                null|
|35820|            [108950]|         [[1

In [26]:
pdf = candidates_band_b.select("Candidates").cache()
pdf.show()

+--------------------+
|          Candidates|
+--------------------+
|                null|
|                null|
|    [[93350, 93351]]|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|                null|
|[[17, 18], [17, 3...|
|  [[162567, 162568]]|
|[[64482, 64483], ...|
|                null|
|                null|
|                null|
|                null|
+--------------------+
only showing top 20 rows



In [29]:
pdf1 = pdf.select(explode(pdf.Candidates).alias("Candidates")).cache()
pdf1.show()

+---------------+
|     Candidates|
+---------------+
| [93350, 93351]|
|       [17, 18]|
|    [17, 38436]|
|    [18, 38436]|
|   [1168, 1169]|
|   [1853, 1854]|
|  [2504, 44486]|
|  [2997, 63635]|
|   [3573, 3574]|
|  [4225, 26881]|
| [4692, 145004]|
|   [4979, 4980]|
|   [7642, 9566]|
| [9747, 103348]|
| [11216, 11217]|
| [14376, 30919]|
| [14376, 44584]|
| [14548, 22949]|
| [15088, 15089]|
|[15088, 109911]|
+---------------+
only showing top 20 rows



In [35]:
final_csv = udf(lambda x:[x[0],x[1]], StructType([ StructField("qid1", IntegerType(), False), StructField("qid2", IntegerType(), False) ]))
pdf2 = pdf1.withColumn("Candidate_Pairs",final_csv(pdf1.Candidates)).cache()
pdf3 = pdf2.select(["Candidate_Pairs.qid1","Candidate_Pairs.qid2"]).cache()
pdf3.show()

+-----+------+
| qid1|  qid2|
+-----+------+
|93350| 93351|
|   17|    18|
|   17| 38436|
|   18| 38436|
| 1168|  1169|
| 1853|  1854|
| 2504| 44486|
| 2997| 63635|
| 3573|  3574|
| 4225| 26881|
| 4692|145004|
| 4979|  4980|
| 7642|  9566|
| 9747|103348|
|11216| 11217|
|14376| 30919|
|14376| 44584|
|14548| 22949|
|15088| 15089|
|15088|109911|
+-----+------+
only showing top 20 rows



In [36]:
pdf3.write.csv('band1.csv')