In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext("local", "lhs")
import numpy as np

In [2]:
import string
def cleanDocument(document):
    document = document.lower()
    document = document.translate(str.maketrans('','',string.punctuation))
    return document

In [3]:
numberOfDocuments = 10
documents = []
for i in range(numberOfDocuments):
    documents.append(sc.textFile("data/"+str(i)+".txt"))
    documents[i] = documents[i].map(lambda x: cleanDocument(x))

In [4]:
def hashShingling(text):
    val = 0
    mod = 2**32-1
    for c in text:
        val = (val*26 + ord(c))%mod
    return val

In [5]:
shinglingSize = 9

In [6]:
data = []

In [7]:
for d in range(numberOfDocuments):
    document = ""
    for s in documents[d].collect():
        document = document + s
    shinglings = []
    for i in range(shinglingSize-1, len(document)):
            shinglings.append(hashShingling(document[i-shinglingSize+1:i]))
    data.append(shinglings)
data = np.array(data)

In [8]:
#data = np.array([[1000,2000,3000,4000,5000,6000,7000,8000], [1000,2000,3000,4000,5000,6000,7000,8000], [1000,2000,3000,4000], [5000,6000,7000,8000]])

In [9]:
dataRDD = sc.parallelize(data)

In [10]:
dataWithIndex = dataRDD.zipWithIndex().map(lambda x: (x[1], x[0]))
print(dataWithIndex.collect())

[(0, [4165096782, 3847052731, 79622680, 2788544303, 2748935685, 1593350548, 1280398990, 2901655764, 478326072, 54620676, 1554460787, 143504058, 4234150193, 1221713064, 540268048, 128327973, 1970278777, 738504341, 2524288372, 1673820506, 3705126946, 1061803005, 2887938029, 121664876, 3666331479, 3638295676, 573239425, 2522367865, 4292521819, 2739375333, 1928812744, 3407535719, 746303017, 3526395226, 3252618838, 429400062, 2708790312, 2427426027, 2659587588, 932845265, 1412891959, 3426316242, 983074307, 3761148596, 3434906295, 4126572392, 419744520, 3500047678, 3942393191, 1516173110, 2400690225, 1254664247, 1064489754, 2957793517, 1937907633, 3644002833, 2599480679, 1795739778, 621218815, 3769831994, 2366850812, 2126933535, 516560761, 1048722604, 1631307214, 2141511162, 3815735688, 1475743769, 2517590049, 1535876469, 2580468500, 717391777, 2523180706, 602719722, 3288855583, 2746366006, 735759651, 2452926435, 4113377439, 2709097791, 4061753347, 3577235627, 2364105048, 303449724, 21027159

In [11]:
class VectorWrapper():
    def __init__(self, vector):
        self.vector = vector
        
class Hasher():
    def __init__(self, signatures):
        self.signatures = signatures
        self.coefficient = np.random.randint(2**32-1, size=self.signatures)
        self.bias = np.random.randint(2**32-1, size=self.signatures)
        self.mod = np.ones(self.signatures)*2**32-1#np.random.randint(10000, size=self.signatures)
        
    def hashValue(self, value, signature):
        return (value*self.coefficient[signature]+self.bias[signature])%self.mod[signature]
    
    def hashVector(self, vector, signature):
        return np.vectorize(self.hashValue)(vector, signature)
    
    def minHashVector(self, signature, vectorWrapper):
        return np.amin(self.hashVector(vectorWrapper.vector, signature))
    
    def generateSignatures(self, vector):
        return np.vectorize(self.minHashVector)(np.arange(self.signatures), VectorWrapper(vector))

In [12]:
signatures = 100 #number of hash functions
numberOfBands = 25 #b
t = (1/numberOfBands)**(numberOfBands/signatures)
print(t)

0.4472135954999579


In [13]:
hasher = Hasher(signatures)

In [14]:
minHash = dataWithIndex.map(lambda x: (x[0], hasher.generateSignatures(x[1])))
print(minHash.collect())

[(0, array([ 8520183.,  5426993.,   862646.,  4546819.,  1477041.,  3187734.,
       15409346.,  3225521.,  4277691.,  6302474.,  8355753.,  2241306.,
       12775066.,  1034670., 11660482.,  1113907., 20455342.,  8045992.,
         827551.,  3301827.,  5839855.,   612295., 12198591.,  5581217.,
        4333145.,  2142524.,  3984686.,  1277077.,  4701242.,  7099747.,
        3785003.,  3231807., 14674707.,  3652712.,    95433., 13950895.,
         239046.,   927317.,  5776914.,  2436723.,  5145097.,  2416829.,
         865264.,   774722.,   204841.,  9233639.,  5514152.,  1823676.,
       12814259.,  4838195.,  1680691.,  1319257.,  2418703.,  3544895.,
        6497298.,  7745660.,  2963073.,  7881281.,  5510350.,    69882.,
        1056132.,  8448499.,  9044993.,  2600167.,  4877783.,  6404214.,
        6967658.,  6650029.,  6693774., 35507667.,  3436857.,  8146302.,
        4403270., 10125855.,  5544161., 15284439.,  7746833.,  9218808.,
        1946938.,   244674.,  1510270.,  22050

In [15]:
#to improve
def simpleHash(vector):
    return np.sum(vector)%2**32-1

In [16]:
bands0 = minHash.flatMap(lambda x: np.arange(numberOfBands))
bands1 = minHash.flatMap(lambda x: np.ones(numberOfBands, dtype=int)*x[0])
bands2 = minHash.flatMap(lambda x: np.array(np.split(x[1], numberOfBands)))
bands2Hashed = bands2.map(lambda x: simpleHash(x))
bands12Hashed = bands1.zip(bands2Hashed)
bands = bands0.zip(bands12Hashed)
print(bands.collect()) #(bandId, (documentId, hashOverTheBand))

[(0, (0, 19356640.0)), (1, (0, 23299641.0)), (2, (0, 21177223.0)), (3, (0, 26584124.0)), (4, (0, 32630711.0)), (5, (0, 24231957.0)), (6, (0, 11737431.0)), (7, (0, 18817798.0)), (8, (0, 32373746.0)), (9, (0, 9379999.0)), (10, (0, 9201911.0)), (11, (0, 16776307.0)), (12, (0, 20652401.0)), (13, (0, 20206555.0)), (14, (0, 16424585.0)), (15, (0, 21149790.0)), (16, (0, 24899683.0)), (17, (0, 53784599.0)), (18, (0, 35357724.0)), (19, (0, 19157252.0)), (20, (0, 17807248.0)), (21, (0, 10769867.0)), (22, (0, 23225020.0)), (23, (0, 34019053.0)), (24, (0, 12176113.0)), (0, (1, 5632396.0)), (1, (1, 12154430.0)), (2, (1, 6126665.0)), (3, (1, 8019521.0)), (4, (1, 14623481.0)), (5, (1, 27839822.0)), (6, (1, 12179882.0)), (7, (1, 15691299.0)), (8, (1, 7245265.0)), (9, (1, 7068625.0)), (10, (1, 1588403.0)), (11, (1, 3616209.0)), (12, (1, 2794468.0)), (13, (1, 5678417.0)), (14, (1, 13229252.0)), (15, (1, 16379406.0)), (16, (1, 8645043.0)), (17, (1, 13068392.0)), (18, (1, 18512505.0)), (19, (1, 28130115.0

In [17]:
def generateCandidates(vector):
    candidates = []
    for x in vector:
        for y in vector:
            if x[0] < y[0] and x[1] == y[1]:
                candidates.append((x[0], y[0]))
    return candidates

In [18]:
bandsInGroup = bands.groupByKey()

In [19]:
candidates = bandsInGroup.flatMap(lambda x: generateCandidates(x[1])).map(lambda x: (x[0]*len(data)+x[1], x)).values().distinct()
print(candidates.collect())

[(8, 9)]


In [20]:
print(bandsInGroup.map(lambda x : (x[0], list(x[1]))).collect())

[(0, [(0, 19356640.0), (1, 5632396.0), (2, 7578343.0), (3, 19502899.0), (4, 4476671.0), (5, 12004772.0), (6, 6441003.0), (7, 9567692.0), (8, 16804002.0), (9, 21077512.0)]), (1, [(0, 23299641.0), (1, 12154430.0), (2, 22683267.0), (3, 23165596.0), (4, 22729508.0), (5, 10418089.0), (6, 12438072.0), (7, 46780243.0), (8, 25000421.0), (9, 70631918.0)]), (2, [(0, 21177223.0), (1, 6126665.0), (2, 21861028.0), (3, 22940297.0), (4, 12359827.0), (5, 14787865.0), (6, 8360777.0), (7, 21723904.0), (8, 28735295.0), (9, 53101553.0)]), (3, [(0, 26584124.0), (1, 8019521.0), (2, 33484671.0), (3, 17060771.0), (4, 21461604.0), (5, 38902682.0), (6, 20840790.0), (7, 55066582.0), (8, 31121255.0), (9, 33788113.0)]), (4, [(0, 32630711.0), (1, 14623481.0), (2, 17782913.0), (3, 16991793.0), (4, 21920735.0), (5, 18381226.0), (6, 8704272.0), (7, 18213838.0), (8, 18074954.0), (9, 39943743.0)]), (5, [(0, 24231957.0), (1, 27839822.0), (2, 12761255.0), (3, 10058172.0), (4, 11306598.0), (5, 19190077.0), (6, 5827698.0), 

In [21]:
for couple in candidates.collect():
    first = minHash.filter(lambda x: x[0]==couple[0]).collect()[0][1]
    second = minHash.filter(lambda x: x[0]==couple[1]).collect()[0][1]
    comparison = np.dstack((first, second))[0]
    print("Similarity between "+str(couple[0])+" and "+str(couple[1])+" is "+str(len(list(filter(lambda x: x[0] == x[1], comparison)))/len(comparison)))

Similarity between 8 and 9 is 0.56
