In [29]:
import hashlib
import random
import numpy as np
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row

# Start spark
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('IPDE') \
    .getOrCreate()
sc = spark.sparkContext
    

In [30]:
class Shingling:
    """ this class  constructs k–shingles of a given length k (e.g., 10) 
    from a given document, computes a hash value for each unique shingle, 
    and represents the document in the form of an ordered set of its hashed k-shingles."""
    
    def __init__(self):
        pass
    
    def shingle(self, document, k = 10):
        """
        --------
        document: string
                  A string that contains the text of the document to shingle
        k: int, optional
           the shingle size
        """
        self.document = document;
        shingles = set()
        for i in range(0,len(document)-k+1):
            shingle = document[i:k+i]
            #print(shingle)
            #hash the string
            hash_object = hashlib.md5(shingle.encode())
            #convert the hexadecimal digest into an int number
            number = int(hash_object.hexdigest(),16)
            shingles.add(number)
        return shingles
            

In [31]:
class CompareSets:
    """ this class compare two sets of integers numbers
    A possible comparison is the Jaccard similarity """
    
    def __init__(self):
        pass
    
    def jaccardSimilarity(self, set1, set2):
        #check that the arguments are two sets
        if(not isinstance(set1,set)): raise TypeError("The fisrt argument must be a Set")
        if(not isinstance(set2,set)): raise TypeError("The second argument must be a Set")
        union = set1.union(set2)
        intersection = set1.intersection(set2)
        return len(intersection)/len(union)

In [32]:
class CompareSignatures:
    """ this class compare two signatures ( list of integers numbers )
    returns the fraction of elements in which they agree divided by the total number of elements k (Hamming Distance)"""
    
    def __init__(self):
        pass
    
    def signatureSimilarity(self, sign1, sign2):
        #check that the signatures have the same number of elements
        if(len(sign1)!=len(sign2)): raise Error("The signatures have different number of elements")
        count = 0
        for i in range(0,len(sign1)):
            if(sign1[i]==sign2[i]):
                count = count + 1
        return count/len(sign1)

In [33]:
class MinHashing:
    """ builds a minHash signature (in the form of a vector or a set) 
    of a given length k from a given set of integers (a set of hashed shingles)."""
    
    def __init__(self, a = None, b = None, c = None, k = 5):
        if(a == None):
            self.a = []
        else:
             self.a = a
        if(b == None):
            self.b = []
        else:
             self.b = b
        if(c == None):
            self.c = []
        else:
             self.c = c
        self.k = k
     
    
    def randomize(self):
        # k is the number of hash functions (and so triples a,b,c) to create
        for i in range(0,self.k):
            self.a.append(random.randint(0,sys.maxsize))
            self.b.append(random.randint(0,sys.maxsize))
            self.c.append(random.randint(0,sys.maxsize))
        #print("a: {} b: {} c: {}".format(self.a,self.b,self.c))
        
    def h(self, number, index):
        return (self.a[index]*number+self.b[index])%self.c[index]
        
    def minHash(self, set1, index = 1): 
        #index identify which triples a,b,c to use for the minHash function "h()"    
        if(not isinstance(set1,set)): raise TypeError("The fisrt argument must be a Set")       
        minElem = 0
        minValue = sys.maxsize
        for elem in set1:
            if(self.h(elem, index)<minValue):
                minElem = elem
                minValue = self.h(elem, index)
        return minElem
           
    def signature(self, set1):
        # return a vector that rapresent the signature of the set using the triples a,b,c in order, stored
        # in the class, to compute the minHash for k times
        sign = []
        for i in range(0,self.k):
            sign.append(self.minHash(set1,i))
        return sign

In [34]:
class LSH:
    
    def __init__(self, b = 20, r = 5):
        self.t = (1/b)**(1/r)
        self.b = b
        self.r = r
    
    def h(self, intList):
        # p is a prime number , a is a random number
        p = 119983
        a = 5
        sum = 0
        for number in intList:
            sum = sum + (a*number)
            a = a*a
        return sum%p
            
    def lsh(self, signList):
        # create the lsh matrix with b bands each hashing r rows with "h()"
        matrix = np.zeros((self.b,len(signList)),dtype = np.int)
        for band in range(0,self.b):
            for col in range(0,len(signList)):
                matrix[band][col]=self.h(signList[col][band*self.r:band*self.r+self.r])
        # print(matrix)
        # find the candidate pairs if two columns have at least one element in common
        candidatePair = []
        for i in range(0,matrix.shape[1]):
            for j in range(i+1,matrix.shape[1]):
                row = 0
                candidate = False
                while(row<matrix.shape[0] and candidate==False):
                    if(matrix[row][i]==matrix[row][j]):
                        candidate = True
                        candidatePair.append([i,j])
                    row = row + 1
        return candidatePair

In [160]:
doc1 = "aaabbccccbbaaaa"
doc2 = "aaabbccccbbaaaa"
doc3 = "sfdfdbcfdsfdwwwwww"
doc4 = "sfdfdbcfdsfdwww"
doc5 = "eeevvvvttyyyyeeevvvvttyyyyq"
doc6 = "eeevvvvttyyyyeeevvvvttyyyy"
docs = sc.parallelize([doc1,doc2,doc3,doc4,doc5,doc6])

shingleSize = 5
band = 20
r = 5
threshold = (1/band)**(1/r)
jaccardThreshold = 0.8
shingling = Shingling()
compareSets = CompareSets()
compareSignatures = CompareSignatures()
minHashing = MinHashing(k=100)
minHashing.randomize()
lsh = LSH(band,r)

# 1) create a list containiong all the sets of shingles of each document
shingles = docs.map(lambda x : shingling.shingle(x,shingleSize) )
print("log: finished to create the shingles for all the documents")

# an array with all the unique shingles
# allShingles = shingles.flatMap(lambda x : x ).distinct()

# 2) create a list of signatures of the previous shingles lists
signatures = shingles.map(lambda x : minHashing.signature(x) )
print("log: finished to create the signatures")

# 3) using LSH find the candidatePairs
signaturesList = signatures.collect()
candidatePairs = lsh.lsh(signaturesList)
print("candidate pairs {}".format(candidatePairs))

# 4) check that candidate pairs have similar signature with threshold (1/band)**(1/r), otherwise remove the candidate pair
candidateRdd = sc.parallelize(candidatePairs)
checkedRdd = candidateRdd.map(lambda x : x if compareSignatures.signatureSimilarity(signaturesList[x[0]],signaturesList[x[1]]) > threshold else []) \
                         .filter(lambda x: len(x)>0) 
print("checked with signatures pairs {}".format(checkedRdd.collect()))

# 5) for the checkedPairs, check the shingles similarity and remove the ones under the defined jaccard Threshold (Optional)
#   if they are similar add the jaccard similarity measure
shinglesList = shingles.collect()
shinglesCheckedRdd = checkedRdd.map(lambda x : [x,compareSets.jaccardSimilarity(shinglesList[x[0]],shinglesList[x[1]])] \
                        if compareSets.jaccardSimilarity(shinglesList[x[0]],shinglesList[x[1]]) > jaccardThreshold else []) \
                        .filter(lambda x: len(x)>0)
print("checked with shingles pairs {}".format(shinglesCheckedRdd.collect()))

# print the results (descending order of similarity):
shinglesChecked = shinglesCheckedRdd.sortBy(lambda x : x[1], ascending=False).collect()
if not shinglesChecked:
    print("all the documents are different!")
else:
    for pair in shinglesChecked:
        print("document {} and document {} are {} similar (jaccardSimilarity)".format(pair[0][0],pair[0][1],pair[1]))




log: finished to create the shingles for all the documents
log: finished to create the signatures
candidate pairs [[0, 1], [2, 3], [4, 5]]
checked with signatures pairs [[0, 1], [2, 3], [4, 5]]
checked with shingles pairs [[[0, 1], 1.0], [[2, 3], 0.8461538461538461], [[4, 5], 0.9285714285714286]]
document 0 and document 1 are 1.0 similar (jaccardSimilarity)
document 4 and document 5 are 0.9285714285714286 similar (jaccardSimilarity)
document 2 and document 3 are 0.8461538461538461 similar (jaccardSimilarity)
