# **Initialization**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Download and extract Spark, then install required dependencies

In [None]:
!wget -q https://www-eu.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!tar xf spark-3.2.1-bin-hadoop2.7.tgz
!rm spark-3.2.1-bin-hadoop2.7.tgz
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 50.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=b6661928aae7927ef708113ea99d141bacdd1edcdf5e28d9b18a6c0a0e840c7c
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


### Download dataset through Kaggle API

In [None]:
%cd /content/drive/MyDrive/Unimi/AMD/
!mkdir ~/.kaggle/
!cp APIs/kaggle.json ~/.kaggle/

import os
import kaggle
import zipfile

from kaggle.api.kaggle_api_extended import KaggleApi


os.environ["DATASET_DIR"] = "/content/drive/MyDrive/Unimi/AMD/dataset"

api = KaggleApi()
api.authenticate()
if not os.path.exists(os.environ["DATASET_DIR"]):
  os.makedirs(os.environ["DATASET_DIR"])

if "Questions.csv" not in os.listdir(os.environ["DATASET_DIR"]):
  if "Questions.csv.zip" not in os.listdir(os.environ["DATASET_DIR"]):
    api.dataset_download_file("stackoverflow/stacksample", file_name="Questions.csv", path=os.environ["DATASET_DIR"])
  with zipfile.ZipFile(os.path.join(os.environ["DATASET_DIR"], "Questions.csv.zip"), 'r') as f:
    f.extractall(os.environ["DATASET_DIR"])
  os.remove(os.path.join(os.environ["DATASET_DIR"], "Questions.csv.zip"))


/content/drive/MyDrive/Unimi/AMD


In [None]:
%ls /content/spark-3.2.1-bin-hadoop2.7/

[0m[01;34mbin[0m/   [01;34mdata[0m/      [01;34mjars[0m/        LICENSE    NOTICE   [01;34mR[0m/         RELEASE  [01;34myarn[0m/
[01;34mconf[0m/  [01;34mexamples[0m/  [01;34mkubernetes[0m/  [01;34mlicenses[0m/  [01;34mpython[0m/  README.md  [01;34msbin[0m/


### Set environment paths and create a Spark session

In [None]:
%cd ~/
import findspark
import pyspark
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"


findspark.init(findspark.find())
spark = SparkSession.builder.master("local").appName("Colab").getOrCreate()
spark

/root


Now we can adjust some options in order to correctly read the schema and then read a sample of the dataset.

In [None]:
df_reader = spark.read \
  .option("escape", '"') \
  .option("header", True) \
  .option("multiline", True) \
  .option("quote", '"') \
  .csv(os.environ["DATASET_DIR"])
df_reader.printSchema()   # print schema

root
 |-- Id: string (nullable = true)
 |-- OwnerUserId: string (nullable = true)
 |-- CreationDate: string (nullable = true)
 |-- ClosedDate: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Body: string (nullable = true)



In [None]:
df = df_reader.select(*['Id', 'Body', 'Title']).sample(fraction=1e-2, seed=1)
print("Sampled records: ", df.count())

Sampled records:  12611


# **Preprocessing**

## Spark Structured APIs for preprocessing

Now we can perform some preprocessing in order to obtain textual data in a proper format. Since Stack Overflow questions may contain both text in natural language and text in possibly any programming language, we should keep in mind this fact and separate those parts. In this way we can perform comparisons between the natural language, which should be more reliable.
Here, the use of regex and Spark's Structured API is crucial since we need efficient and distributed computations.

In [None]:
from pyspark.sql.functions import col, regexp_extract, regexp_replace

df.show(n=15, truncate=False)
# Extract code parts from Body
df_filt = df.withColumn('Body_code', 
                   regexp_extract('Body', '<code>[\S\s]*?</code>', 0))

# Remove code parts from Body
df_filt = df_filt.withColumn('Body_cleaned', 
                   regexp_replace('Body', '<code>[\S\s]*?</code>', ''))


# Remove code tags from code parts
'''
df_filt = df_filt.withColumn('Body_code', 
                   regexp_replace('Body_code', '(<code>[\S\s]*?)|(</code>)', ''))
'''
df_filt = df_filt.drop('Body').withColumnRenamed('Body_cleaned', 'Body')
df_filt.show(n=15, truncate=False)


+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import lower

englishStopwords = StopWordsRemover.loadDefaultStopWords('english')

# Here, match all words excluding punctuation and html delimiters from Body.
df_filt2 = df_filt.withColumn('BodyOut', 
                              regexp_replace('Body', '<[a-z]*>|<[\/a-z]*>|' \
                                             '[^a-z|^A-Z|^à|^ò|^è|^é|^ù]', 
                                             ' '))
'''
df_filt2 = df_filt2.withColumn('Body_code',
                               regexp_replace('Body_code', 
                                              '<[a-z]*>|<[\/a-z]*>|\.|\,|;|!|'\
                                              '[^a-z|^A-Z]',
                                              ' '))
'''
df_filt2 = df_filt2.drop('Body')

# Create regex from stopwords
stopwords_regex = '\\b)|(\\b'.join(englishStopwords)
stopwords_regex = '(\\b' + stopwords_regex + '\\b)' + '|(\\b[a-z]{1}\\b)'

# Remove stopwords from Body
df_filt2 = df_filt2.withColumn('BodyOut', lower(col('BodyOut')))
df_filt2 = df_filt2.withColumn('BodyOut', regexp_replace('BodyOut',
                                                      stopwords_regex,
                                                      ''))
df_filt2 = df_filt2.drop('Body_code')
df_filt2.select(*['Id', 'BodyOut']).show(n=10, truncate=False)


+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Id   |BodyOut                                                                                                                                                                                                                                                                                                                  

# LSH for document similarity

Here we implement the actual LSH algorithm, in which we compute the k-shingles and then the signatures for the documents. Upon the end, we collect the candidate pairs exceeding the specified threshold related to the Jaccard similarity.

## Miller-Rabin primality test

We should define some functions useful to perform the Miller-Rabin primality test. This test is needed later on since we want the least prime number greater than the greatest hash of the shingles. This allows us to compute the hashes representing the permutations of the shingles.
The following implementation is taken from: https://www.geeksforgeeks.org/primality-test-set-3-miller-rabin/

In [None]:
# Python3 program Miller-Rabin primality test
import random

# Utility function to do
# modular exponentiation.
# It returns (x^y) % p
def power(x, y, p):
	
	# Initialize result
	res = 1;
	
	# Update x if it is more than or
	# equal to p
	x = x % p;
	while (y > 0):
		
		# If y is odd, multiply
		# x with result
		if (y & 1):
			res = (res * x) % p;

		# y must be even now
		y = y>>1; # y = y/2
		x = (x * x) % p;
	
	return res;

# This function is called
# for all k trials. It returns
# false if n is composite and
# returns false if n is
# probably prime. d is an odd
# number such that d*2<sup>r</sup> = n-1
# for some r >= 1
def miillerTest(d, n):
	
	# Pick a random number in [2..n-2]
	# Corner cases make sure that n > 4
	a = 2 + random.randint(1, n - 4);

	# Compute a^d % n
	x = power(a, d, n);

	if (x == 1 or x == n - 1):
		return True;

	# Keep squaring x while one
	# of the following doesn't
	# happen
	# (i) d does not reach n-1
	# (ii) (x^2) % n is not 1
	# (iii) (x^2) % n is not n-1
	while (d != n - 1):
		x = (x * x) % n;
		d *= 2;

		if (x == 1):
			return False;
		if (x == n - 1):
			return True;

	# Return composite
	return False;

# It returns false if n is
# composite and returns true if n
# is probably prime. k is an
# input parameter that determines
# accuracy level. Higher value of
# k indicates more accuracy.
def isPrime( n, k):
	
	# Corner cases
	if (n <= 1 or n == 4):
		return False;
	if (n <= 3):
		return True;

	# Find r such that n =
	# 2^d * r + 1 for some r >= 1
	d = n - 1;
	while (d % 2 == 0):
		d //= 2;

	# Iterate given nber of 'k' times
	for i in range(k):
		if (miillerTest(d, n) == False):
			return False;

	return True;



## K-Shingle extraction and minhashing

We now extract the k-shingles, in which k is set to 6. Then, a sort of pre-hash is applied to all the shingles in order to convert them into a numeric form. After this step, we can apply an arbitrary number of hash functions.

In [None]:
import numpy as np
import re
from pyspark.sql.functions import collect_set, explode, lit, udf
from pyspark.sql.types import ArrayType, IntegerType, LongType, StringType


K = 6


@udf(returnType=ArrayType(elementType=StringType()))
def generate_shingles(bodyCol):
    words = re.split(r'[^a-z]+', bodyCol)
    v = [' '.join([words[x] for x in range(i, i+K)]) for i in range(1, len(words)-K)]   # shingles from words
    return v



@udf(returnType=IntegerType())
def apply_hash(shingle_id, a, b, n):
    shingle_id = (int(a)*shingle_id + int(b)) % int(n)
    return shingle_id


# Based on the fnv-1 algorithm
@udf(returnType=ArrayType(elementType=IntegerType()))
def pre_hash(shingles, n):
    hashes = []
    for s in shingles:
        h = 1283
        for c in s:
            h ^= ord(c)
            h *= 233
            h %= n
        hashes.append(h)
    return hashes


df_filt2 = df_filt2.withColumn('Shingles', generate_shingles(col('BodyOut')))
# df_filt3 = df_filt3.drop('Body_code')
df_filt2 = df_filt2.drop('Body_out')
df_filt2 = df_filt2.drop('BodyOut')
# df_filt3.select('Shingles').show(n=4, truncate=False)
n = df_filt2.count()
p = n
if n%2 == 0:
    p = n+1
while(not isPrime(p, 20)):
    p += 2
print("Found prime number: ", p)


df_filt2 = df_filt2.withColumn('ShingleHashes', pre_hash(col('Shingles'), lit(p)))
df_filt2 = df_filt2.drop('Shingles')

df_filt2 = df_filt2.withColumn('ShingleID', explode(df_filt2.ShingleHashes))

df_filt2 = df_filt2.drop('ShingleHashes')
df_filt2.printSchema()

a = np.random.randint(1, 30, 12).tolist()
b = np.random.randint(1, 30, 12).tolist()

for idx in range(len(a)):
    df_filt2 = df_filt2.withColumn(f"H{idx}", apply_hash(col('ShingleID'), lit(a[idx]), lit(b[idx]), lit(p)))

# sig_mat.printSchema()
df_filt2.drop('Title').show(n=20, truncate=False)


Found prime number:  12611
root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ShingleID: integer (nullable = true)

+-----+---------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|Id   |ShingleID|H0   |H1   |H2   |H3   |H4   |H5   |H6   |H7   |H8   |H9   |H10  |H11  |
+-----+---------+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|10230|9235     |3008 |515  |7279 |5691 |2302 |507  |5666 |4984 |6581 |6568 |10827|11524|
|10230|9221     |2602 |151  |6943 |5453 |2050 |143  |5428 |4900 |6399 |6386 |10715|11258|
|10230|10256    |7395 |1839 |6561 |10437|8069 |1831 |10412|11110|7243 |7230 |6384 |5701 |
|10230|4707     |10417|8897 |12106|4381 |9075 |8889 |4356 |3038 |10772|10759|12436|1158 |
|10230|7174     |6294 |9984 |8259 |8487 |3037 |9976 |8462 |5229 |5010 |4997 |6950 |10198|
|10230|9817     |7275 |3036 |8636 |2974 |167  |3028 |2949 |8476 |1536 |1523 |2872 |9971 |
|10230|923      |1569 |11401|9567 |3108 |401

## Banding technique and candidate pairs retrieval

We apply the banding technique in order to retrieve the candidate pairs exceeding the threshold.

In [None]:
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import min, size
from pyspark.sql.types import StructField, StructType

BANDS = 3

COLS = int(len(a)/BANDS)


def get_candidates_naive(mh):
    tmp = mh.withColumnRenamed('Id', 'tmpId').select('*')
    join_conds = [mh[col].eqNullSafe(tmp[col]) for col in mh.columns[1:]]
    cond = join_conds[0]
    for c in join_conds[1:]:
        cond = cond & c
    tmp1 = mh.join(tmp, cond & (mh['Id'] != tmp['tmpId']))
    tmp1 = tmp1.groupBy(col('Id')).agg(collect_set('tmpId').alias('Candidates')).select(*['Id', 'Candidates'])
    return tmp1


def hash_band(band):
  """Returns a dict containing {hash: docID}."""
  hval = 17
  for i in band[1:]:
    hval = (hval * 7) + (i / 5)
  return {round(hval): band[0]}
    
    
sig_mat = df_filt2.drop('ShingleID').drop('Title')
sig_mat = sig_mat.withColumn('Id', sig_mat.Id.cast(IntegerType()))

# sig_mat.show(n=5, truncate=False)

for i in range(len(a)):
    window = Window.partitionBy('Id').orderBy(f"H{i}")
    sig_mat = sig_mat.withColumn(f"minH{i}", min(f"H{i}").over(window)).drop(f"H{i}")

sig_mat = sig_mat.dropDuplicates()

for b in range(BANDS):
    band = b * COLS
    band_cols = sig_mat.columns[(band + 1):(band + COLS + 1)]
    band_cols = ['Id'] + band_cols
    bhash = sig_mat.select(band_cols).rdd.map(lambda x: hash_band(x))\
                                     .flatMap(lambda x: x.items())\
                                     .collect()
    spark.createDataFrame(data=bhash, schema=StructType([StructField("BandHash",
                                                                     LongType(),
                                                                     False),
                                                         StructField("Id",
                                                                    IntegerType(),
                                                                    False)
                                                        ])).groupBy("BandHash")\
                                                           .agg(collect_set("Id").alias("Candidates"))\
                                                           .where(size("Candidates")>1)\
                                                           .show(10, False)    
    #candidates = get_candidates_naive(sig_mat.select(band_cols))
    #candidates.show(10, truncate=False)


+--------+------------------------------+
|BandHash|Candidates                    |
+--------+------------------------------+
|40996   |[3470900, 18143860]           |
|41234   |[36236790, 4114410]           |
|41258   |[39834770, 27559540, 14966210]|
|41427   |[15973440, 19453430]          |
|41446   |[34140240, 12754120]          |
|41510   |[18571260, 24636520]          |
|41516   |[14574660, 2826710]           |
|41592   |[25440940, 21442980]          |
|41614   |[15376570, 25795200]          |
|41691   |[19534080, 23101180]          |
+--------+------------------------------+
only showing top 10 rows

+--------+-----------------------------+
|BandHash|Candidates                   |
+--------+-----------------------------+
|41301   |[23833670, 5146070]          |
|41425   |[11974600, 32418970]         |
|41446   |[28416130, 17993010]         |
|41501   |[9235790, 27872540, 38834420]|
|41638   |[15569960, 3898740]          |
|41692   |[39127810, 7510620]          |
|41726   |[120071

## Visualizing some results

Some pairs retrieved by using 12 hash functions, with 3 bands and 4 rows for each band. Threshold is approximately 0.76.

In [None]:
### Some candidate pairs retrieved with 3 bands and 12 hash functions
df_filt.where((col('Id') == 2065890) | (col('Id') == 8785860)).select(*['Id', 'Title', 'Body'])\
       .show(n=3, truncate=False)

df_filt.where((col('Id') == 1416300) | (col('Id') == 19645720)).select(*['Id', 'Title', 'Body'])\
       .show(n=3, truncate=False)


+-------+---------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Id     |Title                                  |Body                                                                                                              

Some pairs retrieved by using 50 hash functions, with 5 bands and 10 rows for each band. Threshold is approximately 0.85.

In [None]:
### Some results retrieved with 5 bands and 50 hashes

df_filt.where((col('Id') == 3650800) | (col('Id') == 4677690)).select(*['Id', 'Title', 'Body'])\
       .show(n=3, truncate=False)

df_filt.where((col('Id') == 4008990) | (col('Id') == 23859170)).select(*['Id', 'Title', 'Body'])\
       .show(n=3, truncate=False)

+-------+----------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------