In [63]:
!pip install pyspark



In [64]:
def formatTime(t):
    if t < 60:
        return "{:.2f} sec".format(t)
    elif t < (60*60):
        return "{:.2f} min".format(t/60 )
    elif t < (60*60*24):
        return  "{:.2f} hour".format((t/60)/24)

In [65]:
import sys
import random
import time
import binascii
import math
from pyspark import SparkConf, SparkContext
import collections
import numpy as np
from itertools import islice, count
from sympy import isprime, nextprime

# **Running on Spark enviroment**

## **Preparation**

### **Config Spark enviroment**

In [66]:
# Config Spark context
conf = SparkConf().setMaster("local").setAppName("Local Sensitive Hashing")
sc = SparkContext.getOrCreate(conf=conf)

### **Read data**

In [67]:
begin = time.time()
#read data
rdd = sc.textFile("sample_data/data.txt")
doc = rdd.collect()
doc_input = """Đó thực sự là một nghĩa cử cao đẹp của những thiên thần blouse trắng Bệnh viện (BV) Đà Nẵng... Bởi hơn ai hết, là nhân viên y tế, họ hiểu và cảm nhận rất rõ việc ghép tạng là phương pháp điều trị cuối cùng, mang lại cơ hội sống cho những bệnh nhân suy tạng giai đoạn cuối. Họ đã âm thầm kết nối nhau lại thành một cộng đồng những tình nguyện viên hiến mô, tạng.Bác sĩ Nguyễn Văn Đồng (32 tuổi, Khoa Hồi sức tích cực chống độc) cùng với 3 bác sĩ khác: Hiếu, An, Hùng là những thành viên đăng ký đơn ngay trong ngày đầu tiên hoạt động viết đơn tự nguyện. Bác sĩ Đồng cho biết: “Chỉ cần có tạng hiến để ghép thì sự sống của nhiều người không phải dừng lại khi tuổi đời còn quá trẻ. Điều đó thực sự thôi thúc chúng tôi tạo nên một cộng đồng sẵn sàng hiến mô, tạng. Từ việc hiến tặng này, sự sống sẽ được tiếp nối một cách ý nghĩa ở một cơ thể khác”, bác sĩ Đồng tâm sự.Khi mình đặt bút viết đơn tự nguyện hiến tặng bộ phận cơ thể, hơn ai hết mình hiểu rõ sự cần thiết và ý nghĩa của hoạt động nàyBác sĩ Lê Tuấn Anh""" 
doc.insert(0,doc_input)
rdd = sc.parallelize(doc)

In [68]:
ngram = 5
doc_rdd = rdd.map(lambda x: x.lower().replace('.','').replace(',','')).filter(lambda x: x!=" ").filter(lambda x: x!='\n')
doc = doc_rdd.collect()
print("Length of doc: ", len(doc))

Length of doc:  3527


## **Shingling Document**

In [69]:
shinglingBeginTime = time.time()

In [70]:
#Shingling
def shingling(words):
    shingle = set()
    for j in range(len(words)-(ngram-1)):
        # Encode shingle to crc32
        crc = binascii.crc32(words[j:(ngram+j)].encode())
        shingle.add(crc)
    return shingle

In [71]:
# Shingle doc 
shingleDocSetRDD = doc_rdd.map(lambda x : shingling(x))
# get number of shingle
sumOfShingle = sum(shingleDocSetRDD.map(lambda x: len(x)).collect())

shingleDocSet = shingleDocSetRDD.collect()

In [72]:
print("Sum of shingle is ", sumOfShingle)

Sum of shingle is  4718394


In [73]:
shinglingEndTime = time.time()

In [74]:
print("Shingling end in {}".format(formatTime(shinglingEndTime - shinglingBeginTime)))

Shingling end in 11.26 sec


## **MinHashing**

In [75]:
hashingBeginTime = time.time()

In [76]:
k = 100

Hashing referrence from Implement trick from stanford slide page 30


In [77]:
# Get prime number for MinHashing

nextPrime = nextprime(sumOfShingle)

# Generate k random integer to hash shingle
def randomHashFunc(k):
    return random.sample(range(sumOfShingle), k)

# Initialize A and B with k random integer
randomAHashFunc = randomHashFunc(k)
randomBHashFunc = randomHashFunc(k)

In [78]:

# MinHash shingle
def minHashShingle(shingleDoc, a, b):
    minHash = np.Inf
    for x in shingleDoc:
        # hashCode implementation based on Standford slide 
        hashCode = ((a * x + b) % nextPrime) % sumOfShingle
        # if hashCode < minHash then hashCode is minHash of current shingle
        minHash = hashCode if hashCode < minHash else minHash
    return minHash

# Make signature
def signatureGenerator(shingleDoc):
    signature = []
  
  # Make k signature length
    for i in range(0, k):
        minHash = minHashShingle(shingleDoc, randomAHashFunc[i], randomBHashFunc[i])
        signature.append(minHash)
    return signature

In [79]:
signatures = shingleDocSetRDD.map(lambda x: signatureGenerator(x)).collect()
numDocs = len(shingleDocSet)

In [80]:
signaturesMatrix = np.array(signatures)

In [81]:
hashingEndTime = time.time()

In [82]:
print("MinHashing end in {}".format(formatTime(hashingEndTime - hashingBeginTime)))

MinHashing end in 2.26 min


## **LSH**

In [83]:
LSHBeginTime = time.time()

In [84]:
b = 20
r = 5
# s = (1/b)**(1/r)
s = 0.8
neighborsOfPair = 5
bucket = []

In [85]:
def jaccardSim(A,B):
    return sum(A==B) / len(A)

def divideBands(x, signaturesMatrix):
    return signaturesMatrix.T[x:x+r]

In [86]:
# Devide bands
rdd3 = sc.parallelize(list(range(0,k,r)))
bands = rdd3.map(lambda x: divideBands(x, signaturesMatrix))

In [87]:
bands.count()

20

In [88]:
# Generate Pair
pairs = [(0,i) for i in range(1,len(shingleDocSet))]

In [89]:
# Insert pair to bukect if sim(pair) reach threshold s
def getBuckets(band):
    res = []
    for pair in pairs:
        js = jaccardSim(band.T[pair[0]], band.T[pair[1]])
        if js > s:
            res.append((pair, 1))
    return res

In [90]:
buckets = sc.parallelize(bands.map(lambda x: getBuckets(x)).reduce(lambda x,y: x+y))

In [91]:
findCandidatePairs = buckets.reduceByKey(lambda x,y: x+y)
findCandidatePairs.sortBy(lambda x: x[1], False)
candidatePairs = findCandidatePairs.collect()

In [92]:
LSHEndTime = time.time()

In [93]:
print("LSH end in {}".format(formatTime(LSHEndTime - LSHBeginTime)))

LSH end in 1.79 sec


In [94]:
print("candidate pairs: ", candidatePairs)

candidate pairs:  [((0, 2233), 20)]


In [95]:
doc_input

'Đó thực sự là một nghĩa cử cao đẹp của những thiên thần blouse trắng Bệnh viện (BV) Đà Nẵng... Bởi hơn ai hết, là nhân viên y tế, họ hiểu và cảm nhận rất rõ việc ghép tạng là phương pháp điều trị cuối cùng, mang lại cơ hội sống cho những bệnh nhân suy tạng giai đoạn cuối. Họ đã âm thầm kết nối nhau lại thành một cộng đồng những tình nguyện viên hiến mô, tạng.Bác sĩ Nguyễn Văn Đồng (32 tuổi, Khoa Hồi sức tích cực chống độc) cùng với 3 bác sĩ khác: Hiếu, An, Hùng là những thành viên đăng ký đơn ngay trong ngày đầu tiên hoạt động viết đơn tự nguyện. Bác sĩ Đồng cho biết: “Chỉ cần có tạng hiến để ghép thì sự sống của nhiều người không phải dừng lại khi tuổi đời còn quá trẻ. Điều đó thực sự thôi thúc chúng tôi tạo nên một cộng đồng sẵn sàng hiến mô, tạng. Từ việc hiến tặng này, sự sống sẽ được tiếp nối một cách ý nghĩa ở một cơ thể khác”, bác sĩ Đồng tâm sự.Khi mình đặt bút viết đơn tự nguyện hiến tặng bộ phận cơ thể, hơn ai hết mình hiểu rõ sự cần thiết và ý nghĩa của hoạt động nàyBác sĩ 

In [96]:
for i in candidatePairs:
    print("doc[0] is similarity to doc[{}]\n".format(i[0][1]))
    print(doc[i[0][1]])

doc[0] is similarity to doc[2233]

đó thực sự là một nghĩa cử cao đẹp của những thiên thần blouse trắng bệnh viện (bv) đà nẵng bởi hơn ai hết là nhân viên y tế họ hiểu và cảm nhận rất rõ việc ghép tạng là phương pháp điều trị cuối cùng mang lại cơ hội sống cho những bệnh nhân suy tạng giai đoạn cuối họ đã âm thầm kết nối nhau lại thành một cộng đồng những tình nguyện viên hiến mô tạngbác sĩ nguyễn văn đồng (32 tuổi khoa hồi sức tích cực chống độc) cùng với 3 bác sĩ khác: hiếu an hùng là những thành viên đăng ký đơn ngay trong ngày đầu tiên hoạt động viết đơn tự nguyện bác sĩ đồng cho biết: “chỉ cần có tạng hiến để ghép thì sự sống của nhiều người không phải dừng lại khi tuổi đời còn quá trẻ điều đó thực sự thôi thúc chúng tôi tạo nên một cộng đồng sẵn sàng hiến mô tạng từ việc hiến tặng này sự sống sẽ được tiếp nối một cách ý nghĩa ở một cơ thể khác” bác sĩ đồng tâm sựkhi mình đặt bút viết đơn tự nguyện hiến tặng bộ phận cơ thể hơn ai hết mình hiểu rõ sự cần thiết và ý nghĩa của hoạt đ

In [97]:
end = time.time()

In [98]:
print("Time running: {} sec".format(formatTime(end-begin)))

Time running: 2.51 min sec


# **Running on normal enviroment**

In [99]:
from functools import reduce

In [100]:
nor_begin = time.time()

## **Shingling**

In [101]:
nor_shinglingBeginTime = time.time()

In [102]:
nor_shingleDocSet = []
for d in doc:
    nor_shingleDocSet.append(shingling(d))

In [103]:
len(nor_shingleDocSet)

3527

In [104]:
nor_sumOfShingle = sum([len(x) for x in nor_shingleDocSet])
print("Sum of shingle is ", nor_sumOfShingle)

Sum of shingle is  4718394


In [105]:
nor_shinglingEndTime = time.time()

In [106]:
print("Shingling end in {}".format(formatTime(nor_shinglingEndTime - nor_shinglingBeginTime)))

Shingling end in 5.25 sec


## **MinHashing**

In [107]:
nor_hashingBeginTime = time.time()

In [108]:
nextPrime = nextprime(sumOfShingle)

randomAHashFunc = randomHashFunc(k)
randomBHashFunc = randomHashFunc(k)

In [109]:
nor_signatures = [signatureGenerator(x) for x in nor_shingleDocSet]
numDocs = len(nor_shingleDocSet)

In [110]:
nor_signaturesMatrix = np.array(nor_signatures)

In [111]:
nor_hashingEndTime = time.time()

In [112]:
print("MinHashing end in {}".format(formatTime(nor_hashingEndTime - nor_hashingBeginTime)))

MinHashing end in 2.42 min


## **LSH**

In [113]:
nor_LSHBeginTime = time.time()

In [114]:
# Devide bands
counter = list(range(0,k,r))
nor_bands = [divideBands(x, nor_signaturesMatrix) for x in counter]

In [115]:
# Generate Pair
pairs = [(0,i) for i in range(1,len(nor_shingleDocSet))]

In [116]:
buckets = [getBuckets(x) for x in nor_bands]
buckets = reduce(lambda x,y: x+y, buckets)
buckets = reduce(lambda x,y : (x[0],x[1]+y[1]) if x[0]==y[0] else x+y, buckets)

In [117]:
nor_LSHEndTime = time.time()

In [118]:
print("LSH end in {}".format(formatTime(nor_LSHEndTime - nor_LSHBeginTime)))

LSH end in 1.22 sec


In [119]:
for i in candidatePairs:
    print("doc[0] is similarity to doc[{}]\n".format(i[0][1]))
    print(doc[i[0][1]])

doc[0] is similarity to doc[2233]

đó thực sự là một nghĩa cử cao đẹp của những thiên thần blouse trắng bệnh viện (bv) đà nẵng bởi hơn ai hết là nhân viên y tế họ hiểu và cảm nhận rất rõ việc ghép tạng là phương pháp điều trị cuối cùng mang lại cơ hội sống cho những bệnh nhân suy tạng giai đoạn cuối họ đã âm thầm kết nối nhau lại thành một cộng đồng những tình nguyện viên hiến mô tạngbác sĩ nguyễn văn đồng (32 tuổi khoa hồi sức tích cực chống độc) cùng với 3 bác sĩ khác: hiếu an hùng là những thành viên đăng ký đơn ngay trong ngày đầu tiên hoạt động viết đơn tự nguyện bác sĩ đồng cho biết: “chỉ cần có tạng hiến để ghép thì sự sống của nhiều người không phải dừng lại khi tuổi đời còn quá trẻ điều đó thực sự thôi thúc chúng tôi tạo nên một cộng đồng sẵn sàng hiến mô tạng từ việc hiến tặng này sự sống sẽ được tiếp nối một cách ý nghĩa ở một cơ thể khác” bác sĩ đồng tâm sựkhi mình đặt bút viết đơn tự nguyện hiến tặng bộ phận cơ thể hơn ai hết mình hiểu rõ sự cần thiết và ý nghĩa của hoạt đ

In [120]:
nor_end = time.time()

In [121]:
print("Time running: {} sec".format(formatTime(nor_end-nor_begin)))

Time running: 2.53 min sec
