In [1]:
#I have referenced the below links for some of the code. It is a web page containing spark documentation
# https://spark.apache.org/docs/2.4.5/ml-features.html

In [2]:
from pyspark.sql import SparkSession, Row
import numpy as np
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import CountVectorizer
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import Tokenizer,RegexTokenizer

In [3]:
from pyspark.sql.functions import isnan, when, count, col

In [4]:
spark

In [5]:
sc = spark.sparkContext

In [6]:
#cookbook text files
cb_txt = sc.wholeTextFiles('dbfs:/FileStore/tables/*.txt')

In [7]:
#notice that the file name and the contents are in tuple pair
cb_txt.take(2)

In [8]:
cb_txt.count()

In [9]:
#remove the file name from rdd as it is not required
cb_txt2 = cb_txt.map(lambda line : (( line[1])))

In [10]:
#replace \n from the content
cb_txt2 = cb_txt2.map(lambda r: r.replace('\n', '')) 

In [11]:
cb_txt2.take(1)

In [12]:
cb_txt2 = cb_txt2.map(lambda r: Row(r))

In [13]:
#convert to dataframe
cb_txt2 = cb_txt2.toDF()

In [14]:
cb_txt2.count()

In [15]:
cb_txt2.show()

In [16]:
#Rename column 
cb_txt2 = cb_txt2.withColumnRenamed('_1','cookbook_text')

In [17]:
#trim column for leading and trailing white spaces
from pyspark.sql.functions import trim
cb_txt2 = cb_txt2.withColumn("cookbook_text", trim(cb_txt2.cookbook_text))

In [18]:
cb_txt2.show()

In [19]:
#tokenize the cookbook_text column
retoken = RegexTokenizer(inputCol="cookbook_text", outputCol="words", pattern="\\W")
cb_csv3 = retoken.transform(cb_txt2)

In [20]:
cb_csv3.show()

In [21]:
#With shingles of size 5 we now create a count vectorizer
# This count vectorizer will extract vocabulary and generate a CountVectorizerModel
#this is the first step of 3 for duplicate MinHash
vec = CountVectorizer(inputCol = 'words', outputCol = 'vectors', vocabSize = 5, minDF = 2.0)
shingles_cv = vec.fit(cb_csv3).transform(cb_csv3)

In [22]:
shingles_cv.show()

In [23]:
from pyspark.sql.functions import monotonically_increasing_id

# This will return a new DF with all the columns + unique id
#note some ids are big numbers as monotonically_increasing_id has not autoincremented but all id values are unique so it should'nt be a problem 
shingles_cv = shingles_cv.withColumn("id", monotonically_increasing_id())

In [24]:
shingles_cv.select('id').show(76)

In [25]:
#applying minhash on shingles_cv
min_hash = MinHashLSH(inputCol="vectors", outputCol="Hashes", seed = 12345)
min_hash2 = min_hash.fit(shingles_cv)
min_hash2.transform(shingles_cv).head()

In [26]:
#I have created minhash3 for the sole purpose of showing the column with hashs after minHash
minhash3 = min_hash.fit(shingles_cv).transform(shingles_cv)

In [27]:
minhash3.show()

In [28]:
minhash3.count()

In [29]:
#now we calculate the Jacard Distance. The lower the Jacard Distance the more the 2 txt files are duplicates as it caluculates dissimilarity b/w sample sets. So a Jacard Distance of 0 means that the txt files are complete duplicate Reference: https://en.wikipedia.org/wiki/Jaccard_index

threshold = 0.6
min_hash2.approxSimilarityJoin(shingles_cv, shingles_cv, threshold, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
        col("datasetB.id").alias("idB"),
        col("JaccardDistance")).show()