In [1]:
from pyspark import SparkContext, SparkConf, SparkFiles
import random

# MDA_HW4 Locality-Sensitive Hashing (LSH)

這次的作業要實作Locality-Sensitive Hashing，有三個大步驟，以不同的mapper、reducer function實作：


### 1. Shingling
    會跑for迴圈一一讀入所有的檔案，mapper1~3的操作都是在for迴圈中進行（也就是在同一個檔案內進行操作）。
    
   * <font color=blue size=4> mapper1 </font> 
   
       mapper1會讀進檔案中的句子，並以空格作為字的分隔，再把所有的單字集中在一起。數字和標點符號的部分也會和英文字母節合成為一個單字，並沒有特別分開或去掉。把每一段的字都集中成一個list，作為key-value pair的value，為了以後reduce方便將key都設成同樣的字（"key"）。
       
       
   * <font color=blue size=4> reduce </font>
   
   這次使用的reduce function比較簡潔，因此我沒有特別寫reducer，直接以lambda定義在主要的code中。
   做完mapper1之後，會用reduceByKey把檔案中所有的字都reduce到同一個list中。
   
   
   * <font color=blue size=4> mapper2 </font>
   
   在mapper2中把檔案中的字建成3-shingles，並以空格把字接在一起，最後回傳全部的shingles。
   
   
   * <font color=blue size=4> mapper3 </font> 
   
   mapper3中會把檔案編號和檔案內的shingles放在一起，形成（ 檔案編號，[shingle0, shingle1, ...] ）這樣的key-value pair。把shingle放入的同時，會檢查list中有沒有放過相同的shingle，如果重複的話就不需要放入了。  
       

In [2]:
def mapper1(line):
    newLine = line.split(" ")
    wordList = []
    for word in newLine:
        if word != "":
            wordList.append(("key", [word]))
    return wordList


def mapper2(line):
    shingles = []
    tmp = []
    for i in range(len(line[1])-2):
        tmp = line[1][i:i + 3]
        tmp = " ".join(tmp)
        shingles.append(tmp)
        tmp = []
       
    return shingles

def mapper3(line):
    ShinglesInADoc = []
    for shingle in line:
        if shingle not in ShinglesInADoc:
            ShinglesInADoc.append(shingle)
            
    # (DocNum, [shingle0, shingle1, ... ])
    return DocNum, ShinglesInADoc

In [3]:
AllShingles = []
ShinglesInDocs = []

DocNum = 0

for i in range(1, 102):
    if(i<10):
        path = "00" + str(i) + ".txt"
    elif(i >= 10 and i <= 99):
        path = "0"+ str(i) + ".txt"
    else:
        path = str(i) + ".txt"
    
    file = sc.textFile(path)
    
    # Shingles
    DocNum= i
    shingles = file.flatMap(mapper1).reduceByKey(lambda a, b: a+b).map(mapper2)  
    shingles = shingles.map(mapper3)
    
    # Min-Hashing Prepare
    tmp = shingles.take(1)

    for s in tmp[0][1]:
        if s not in AllShingles:
            AllShingles.append(s)
    ShinglesInDocs.append(tmp[0])
    
shingles = sc.parallelize(ShinglesInDocs)


### 2. Min-hashing

    做完以上的部分之後，會為min hashing的步驟做一些前處理，這些處理也是在for迴圈裡面完成。
    有一個list--AllShingles，用來存放所有的shingles，後面hash時會使用shingles在list中的index來做（用數字來做hash！）。因此在每個檔案做完mapper3後，會把產生的shingles放到AllShingles裡，同時也檢查有沒有相同的已經存在在其中，如果重複的話就不用再放了。
    還有另一個list--ShinglesInDocs，會在迴圈的最後把經由mapper3處理完的、每個檔案的資料append進去，出迴圈後會將它轉為RDD的資料結構，在後面操作時使用。
    
   * <font color=blue size=4> mapper4 </font>
   
   從mapper4後就是在for迴圈外、對於所有檔案做處理的部分。
   在mapper4中會將檔案中的shingles從英文字轉為數字，所使用到的數字就是該shingle在AllShingles中的index，並以相同的結構回傳。
   
   
   * <font color=blue size=4> mapper5 </font>
   
   mapper5實做了min-hasning的主要部分。
   我寫了兩個function用來產生random hash function，TestPrime用來找到用來做mod的質數，RandomCoef用來產生隨機的係數，根據傳進function的參數來產生不同範圍和數量的係數。
   在mapper5就會使用這些係數來構成hash function，算式為$((a*shingle+b) \%p) \%N$，N為總共的shingle數量。總共有100個hash function，會拿所有的shingle去hash並用變數記住目前最小的計算結果，如果新hash出的值比它還小的話，就會更新。
   最後回傳的結構和mapper3、mapper4的結構長得很相似，key為檔案的編號，value為100個min-hashing值。

In [4]:
def RandomCoef(num, size):
    if(num == 1):
        return random.randint(1, size)
    else:
        randomList = []
        while num > 0:
            r = random.randint(1, size)

            # Ensure that each random number is unique.
            while r in randomList:
                r = random.randint(1, size)
            randomList.append(r)
            num-=1
        return randomList
    

def TestPrime(num):
    for i in range(2, num):
        if(num%i == 0):
            return False
        else:
            if(i == num-1): 
                return True
            else: 
                continue

In [5]:
def mapper4(line):
    indexList = []
    for shingle in line[1]:
        index = AllShingles.index(shingle)
        indexList.append(index)
    return line[0], indexList


up = 1
N = len(AllShingles)

while not TestPrime(N + up):
    up+=1
    
p = N + up
a = RandomCoef(100, 300)
b = RandomCoef(100, 300)


def mapper5(line):
    value = N
    valueOpt = [N]*100 
    for i in range(100):
        for shingle in line[1]:
            value = ((a[i] * shingle + b[i]) % p) % N
            if(value < valueOpt[i]):
                valueOpt[i] = value
                
    # (index, [mh0, mh1, mh2, ...])
    return line[0], valueOpt

In [6]:
# Min-Hashing
shingles = shingles.map(mapper4)
shingles_MH = shingles.map(mapper5)

### 3. Locality-sensity hashing
    接著是LSH的部分。
    首先創建一個新的list--hashMatrix用來存所有檔案做完minhashing的值（也就是做完mapper5的資料，value部分），最後計算similarity時會使用。
   
   * <font color=blue size=4> mapper6 </font>
   
   在mapper6中會將資料分成50個band，每個band有兩個row，來做hash。一樣使用RandomCoef來產生係數，hash function的算式為$(a*row0 ＋b*row1 + c) \% k$，k為自訂的bucket數量。最後會把資料建構成key為（band編號，算出來的bucket數），value為檔案編號構成的list的資料結構。
   
做完mapper6之後會做一次reduceByKey，把相同key的資料reduce在一起。使用的reducer一樣比較簡潔，也是使用lambda function。
   * <font color=blue size=4> mapper7 </font>
   
   要成為Candidate pair的條件是他們要至少hash到同樣的bucket中至少一個band，在mapper7中會做篩選。篩選的方法是檢查value中檔案的數量，如果只有一個的話，就只會是一個空的list。

In [7]:
# LSH
hashMatrix = [0] * 101
tmp = shingles_MH.take(101)
for doc in tmp:
    hashMatrix[doc[0]-1] = doc[1]

In [8]:
a1 = RandomCoef(1, 10)
b1 = RandomCoef(1, 10)
c1 = RandomCoef(1, 10)

def mapper6(line):
    band = 50
    row = 2
    tmp = []
    for i in range(band):
        bucket = (a1 * line[1][i*row] + b1 * line[1][i*row+1] + c1) % 31
        tmp.append(((i+1, bucket), [line[0]]))
        
    # [((band, bucket), doc), ...]
    return tmp  

# if there is only one doc, ignore it
def mapper7(line):
    tmp = []
    if len(line[1]) != 1:
        tmp.append((line))
    
    return tmp

In [9]:
bands = shingles_MH.flatMap(mapper6).reduceByKey(lambda a, b: a+b).flatMap(mapper7)

   * <font color=blue size=4> mapper8 </font>
    
    接著是計算similarity的部分。
    兩個document一組計算similarity，用剛剛建好的hashMatrix找，如果能被同一個hash function hash到同樣的值，就將count加一。最後similarity的計算方式就是count除以總has function數（100）。回傳的資料結構為（（檔案編號0, 檔案編號1）, similarity）。
    
    
因為相同的檔案pair可能有重複出現好幾次，我們只要保留一次就好，因此再用一個reduceByKey，reducer function同樣為lambda funciton，回傳a就好。

   * <font color=blue size=4> mapper9 </font>
   
    最後將資料調整成（ similarity, （檔案編號0, 檔案編號1））的結構，以方便後面以similarity排序，找出最大的前十個。

In [10]:
def mapper8(line):
    tmp = []
    cnt = 0
    for i in range(len(line[1])):
        for j in range(len(line[1])):
            if i < j:
                for idx in range(100):
                    vi = hashMatrix[line[1][i]-1][idx]
                    vj = hashMatrix[line[1][j]-1][idx]
                    if vi == vj: 
                        cnt+=1
                tmp.append([(line[1][i], line[1][j]), cnt/100])
                cnt = 0
    return tmp

def mapper9(line):
    return (line[1], line[0])

In [11]:
# Similarity
Sim = bands.flatMap(mapper8).reduceByKey(lambda a, b: a)
Result = Sim.map(mapper9).sortByKey(ascending= False)

# Output
outfile = open('Outputfile.txt','w')
outfile.write("Output:\n")

print("Output:\n")
Output = Result.take(10)
for line in Output:
    print(line[1], ":", line[0])
    outfile.write(str(line[1]) + ": " + str(line[0]) + "\n")


Output:

(52, 84) : 1.0
(12, 20) : 1.0
(30, 35) : 0.8
(47, 49) : 0.72
(23, 38) : 0.53
(49, 88) : 0.53
(48, 49) : 0.48
(14, 40) : 0.42
(47, 88) : 0.39
(47, 48) : 0.34


In [None]:
sc.stop()