# MDA HW4 LSH

本次作業要實作LSH演算法。我們將分為三個部份來完成演算法。
1. 先製作出包含各個3-gram (3-shingles) 的characteristic matrix
2. 執行min hashing以得出Signature matrix
3. 找出candidate pairs。
4. 計算Jaccard similarities

本次作業使用的是101篇英文文章，我們要以LSH找出最相似的十對文章。

呼叫本次會使用到的package。

In [1]:
import glob
import random
import itertools
from math import ceil
from pyspark import SparkConf, SparkContext

以及呼叫SparkConf

In [2]:
sc.stop()

In [3]:
conf = SparkConf().setMaster("local").setAppName("wordcount") # call sparkconf
conf = SparkConf().set("spark.default.parallelism", 4)
sc = SparkContext(conf=conf) # call sparkcontext

## 一. 讀取檔案並製造Signature matrix

定義一個讀取檔案的函式readfile，其input為一篇文章(字串)，它會將整篇文章依據空格拆開，將文章中的每個單字加入陣列maplist中，並回傳maplist這個list of strings。我沒有去除掉標點符號與換行符號。

In [4]:
def readfile(line):
    wordlist=line.split(" ") # 將文章分開來
    maplist=[] # 回傳陣列
    for item in wordlist:
        maplist.append(item) #將每個單字加入陣列中
    return maplist

以下的程式碼中，dataset是一個陣列，其元素為對應的文章中，所有shingle按照順序形成的排列。universal_set是101篇文章中出現的所有3-shingle所構成的集合。迴圈中，美讀取一個檔案，我們會用readfile先將其打散成單字陣列(即一個list of strings)，再將此篇文章所有3-shingle構成的list加入dataset中，同時也將文章中各個3-shingle加入universal_set中。

In [5]:
path = glob.glob("athletics/*.txt") # glob資料夾中所有txt檔案
dataset = [] # 整個dataset，每個元素為每篇文章的3-shingle
universal_set = set() # all 3-shingle

for files in path:
    with open(files, "r") as f:
        text = f.read()
        split = readfile(text) # 打散成單字陣列
        shingles = [] # 此文章所有3-shingle構成的list
        for i in range(0,len(split)-2):
            shingles.append((split[i], split[i+1], split[i+2]))
            universal_set.add((split[i], split[i+1], split[i+2])) # 將3-shingle加入universal_set中
        dataset.append(shingles) # 加入dataset

接著就可以運算characteristic matrix了。我將characteristic matrix存在bool_mat中。其第$i$行對應到第$i$篇文章，而每一列都對應到一個3-shingle。如果第$i$篇文章包含第$k$個3-shingle，則bool_mat$[i,j]$=0。
建立bool_mat方法是將universal set中所有的3-shingle逐一拿出(順序並不重要)，並與dataset中所有元素(一篇文章中所有的3-shingle)比較。若有，就填入1，反之則填入0。最後，在每一列的後面接上該列所代表的3-gram。

In [6]:
bool_mat = [] # Jaccard matrix
universal_set = list(universal_set) # 比較好操作

for three_gram in universal_set:
    row = [] # Jaccard matrix的每一列
    for i in range(0,len(dataset)):
        if three_gram in dataset[i]: # 若某篇文章包含某個3-gram
            row.append(1) # 填入1
        else:
            row.append(0)
    row.append(three_gram) # 接上該列所代表的3-gram
    bool_mat.append(row)

此時bool_mat已經被建立完成，我們將它存入RDD中。

In [7]:
S = sc.parallelize(bool_mat)

## 二. 利用Map Reduce執行Min-hashing

此時characteristic matrix $S$的結構是無法進行map-reduce操作的。因此我定義一個函數read_map()，將$S$存成key-value$$\bigg[\big((i,j), r\big)\bigg]$$的形式。其中$i$為文件編號(直接對應到txt檔案名稱)，$j$為1到100間的整數。$r$為3-gram經過python內建的hashing後的值。我們之後會用$j$來定義與操作100個不同的hash function。因此每一個$i$,$r$而言，會有$\big((i,1), r\big),\big((i,2), r\big),......,\big((i,100), r\big)$等一百個元素

In [8]:
def read_map(row): # 讀取rdd的元素
    maplist=[]
    for doc_index in range(0,len(row)):
        if row[doc_index]==1: # 代表文件編號doc_index有此row所代表的3-grams
            for hash_num in range(1,101):
                key=(doc_index+1, hash_num)  # key: (文件編號, 執行的hash function編號)
                value=hash(row[-1]) # value: 將3-grams hashing成一個整數
                maplist.append((key,value)) # add to map
    return maplist

使用flatMap可以使輸出變為一維。

In [9]:
S = S.flatMap(read_map) # ((doc#, hash#), hash_value)

In [10]:
S.take(5)

[((34, 1), 3241749993215754241),
 ((34, 2), 3241749993215754241),
 ((34, 3), 3241749993215754241),
 ((34, 4), 3241749993215754241),
 ((34, 5), 3241749993215754241)]

接著我們要定義100個不同的hash function $h_j$。產生100個隨機的數對$(a_j, b_j)$，並選取質數$p=86243$(需比相異的3-shingle總數$N$大)。

In [11]:
randomlist = []
for i in range(0,100):
    a = random.randint(1, 5000)
    b = random.randint(1, 5000)
    randomlist.append((a, b)) # 產生亂數數對

In [12]:
big_prime = 86243
N = len(bool_mat)

定義這100個hash function為$$h_j(x)=(a_jx+b_j mod 86243) mod N$$
接著，將RDD內的結構變為$$\bigg[\big((i,j), h_j(r)\big)\bigg]$$

In [13]:
S = S.map(lambda x: (x[0], ((randomlist[x[0][1]-1][0]*x[1]+randomlist[x[0][1]-1][1]) % big_prime) % N))  
# ((doc#, hash#), 3-grams)

flatten2list這個函數可將巢狀結構壓成一維list。hash_reducer利用這個函數所有相同key的value加入一個list中。我們對$S$進行reduce，使之結構變為
$$\bigg[\big((i,j),(h_1(r),h_2(r),...,h_{100}(r)\big)\bigg]$$

In [14]:
def flatten2list(object):
    gather = []
    for item in object:
        if isinstance(item, (list, tuple, set)):
            gather.extend(flatten2list(item))            
        else:
            gather.append(item)
    return gather

In [15]:
def hash_reducer(x,y): # Combine all the hash results
    return flatten2list((x,y))

In [16]:
S = S.reduceByKey(hash_reducer) # ((doc#, hash#), [h(3-grams),h(3-grams),h(3-grams)...])

find_min_hash()會執行minhash，使S的結構變為$$\bigg[\big((i,j),\min_jh_j(r)\big)\bigg]$$

In [17]:
def find_min_hash(x):
    return (x[0],min(x[1]))

In [18]:
S = S.map(find_min_hash) # ((doc#, hash#), min(h(3-grams))]

至此$S$已變為signature matrix了，其理應為一個101(文件數量)$\times$100(hash function數)的矩陣。我們確認一下結構並計算其大小。

In [19]:
S.take(5)
S.count()

10100

## 三. 找出所有Candidate Pairs。

將$S$複製成另一個RDD $B$。我們利用$B$來做bucket hashing，並找出candidate pairs。

In [20]:
B = S

依照hash function的編號(也就是characteristic matrix的row的index)劃分band。$(1,2)$為一個band，$(3,4)$為一個band，......依此類推。經過以下mapping，B的結構變為
$$\bigg[\big((i,k),\min_jh_j(r)\big)\bigg]$$
其中$k$表示第$k$個band。

In [21]:
B = B.map(lambda x: ((x[0][0], ceil(x[0][1]/2)), x[1])) # ((doc#, band#), h(r)) 

接著執行reduce。並將同一個band內的兩個row的value給hash成一個整數。如此一來，B的結構變為$$\bigg[\big((i,k),h(a,b)\big)\bigg]$$其中$a$ ,$b$代表同一band中兩個row的min-hashing值。

In [22]:
def reduce_band_hash(x, y):
    return hash((x,y))

In [23]:
B = B.reduceByKey(reduce_band_hash) # ((doc#, band#), h(a,b)) 

由於第$k$個band中$h(a,b)$相同的文件$i$即為candidate pair，因此我們再進行一次map，改以$(k, h(a,b))$為key，$i$為value。

In [24]:
B = B.map(lambda x: ((x[1], x[0][1]), x[0][0])) # ((h(a,b), band#), doc#) 

再用一次hash_reducer，在同一band中$h(a,b)$相同的文件$i$存在value中。$B$的結構變為$$\bigg[\big((h(a,b),k),[i_1,i_2,...]\big)\bigg]$$

In [25]:
B = B.reduceByKey(hash_reducer)

如果value中只有一個文件$i_1$，那hash_reducer會把其存成int，因此只要刪除掉這些東西，剩下的東西就是candidate pair了。

In [26]:
B = B.filter(lambda x: (isinstance(x[1], int)==False))

In [27]:
B.take(5)

[((3713081631956061156, 21), [49, 47]),
 ((3713075136741109656, 49), [49, 47]),
 ((3713075136767090256, 5), [61, 90]),
 ((3713088127093070856, 37), [61, 92]),
 ((3713029670585022206, 35), [47, 49])]

不過此時$B$的value未必只有2個元素!因此接著這一步不大嚴謹，其實是會造成誤差(相似性被低估)的。為了避免value文件的pair$(i_1,i_2)$與$(i_2,i_1)$被計算成不同的pair，我們一律按照大小排列，並執行以下map，使$B$的結構變為:
$$\bigg[\big(((\min(i_1,i_2,...), \max(i_1,i_2,...)), k\big)\bigg]$$
但由於$(i_1,i_2,...)$未必只有兩個元素，因此此步驟形同排除掉部分的candidate pairs!不過我們就先假設這類的文件很少，不至於造成致命的誤差。

In [28]:
B = B.map(lambda x: ((min(x[1]), max(x[1])), x[0][1]))

In [29]:
B.take(5)

[((47, 49), 21), ((47, 49), 49), ((61, 90), 5), ((61, 92), 37), ((47, 49), 35)]

最後將$B$以hash_reducer()做reduce，並丟棄所有value後collect出來，就得到所有candidate pairs所構成的陣列了。

In [30]:
B = B.reduceByKey(hash_reducer)

In [31]:
cand_pair = B.map(lambda x: x[0]).collect() # 所有candidate pairs

## 四. 計算Jaccard Similarities

得到了所有candidate pairs之後，我們要回到signature matrix中計算這些pair所有的jaccard similarities。先將signature matrix $S$複製到$J$這個rdd中。

In [32]:
J = S

經過一次map，使$J$的結構變為$$\bigg[\big((j,\min_jh_j(r)), i\big)\bigg]$$

In [33]:
J = J.map(lambda x: ((x[0][1], x[1]),x[0][0]))

以hash_reducer執行reduce，這時在同一列中具有相同min-hashing的文件$i$會被聚在一起，接著刪除那些value只有一個整數的元素，就會得到。
$$\bigg[\big((j,\min_jh_j(r)), (i_1, i_2, ...)\big)\bigg]$$

In [34]:
J = J.reduceByKey(hash_reducer)

In [35]:
J = J.filter(lambda x: (isinstance(x[1], int)==False))

對每個剩下的元素，找出$(i_1,i_2,......)$所有可組成大小為2的subset。因此變成了$$\bigg[\big((j,\min_jh_j(r)), [(i_1, i_2),(i_3,i_4), ...]\big)\bigg]$$

In [36]:
def findsubsets(s, n): 
    return list(itertools.combinations(s, n)) 

In [37]:
J = J.map(lambda x: (x[0], findsubsets(set(x[1]), 2)))

至此，其實$J$的大小已經變得非常小了，因此我直接將其collect出來，則其仍維持key-value的結構。

In [38]:
similarty_list = J.collect()

In [39]:
similarty_list

[((40, 14), [(81, 34)]),
 ((44, 6), [(34, 19), (34, 42), (19, 42)]),
 ((48, 38), [(34, 95)]),
 ((80, 6), [(34, 68)]),
 ((96, 2),
  [(64, 34),
   (64, 45),
   (64, 16),
   (64, 54),
   (64, 26),
   (34, 45),
   (34, 16),
   (34, 54),
   (34, 26),
   (45, 16),
   (45, 54),
   (45, 26),
   (16, 54),
   (16, 26),
   (54, 26)]),
 ((53, 105), [(64, 101)]),
 ((9, 25), [(35, 93), (35, 30), (93, 30)]),
 ((13, 97), [(93, 55)]),
 ((25, 17), [(93, 78)]),
 ((39, 47), [(33, 19), (33, 36), (19, 36)]),
 ((51, 15), [(19, 70)]),
 ((63, 95), [(19, 22)]),
 ((25, 41), [(69, 14), (69, 6), (14, 6)]),
 ((57, 49), [(50, 51), (50, 69), (51, 69)]),
 ((65, 9), [(18, 75), (18, 69), (75, 69)]),
 ((20, 10), [(36, 70)]),
 ((28, 2),
  [(70, 40),
   (70, 10),
   (70, 47),
   (70, 49),
   (70, 88),
   (70, 29),
   (40, 10),
   (40, 47),
   (40, 49),
   (40, 88),
   (40, 29),
   (10, 47),
   (10, 49),
   (10, 88),
   (10, 29),
   (47, 49),
   (47, 88),
   (47, 29),
   (49, 88),
   (49, 29),
   (88, 29)]),
 ((84, 10), [(8

最後我們定義一個字典，其key為第三步中所得到的所有candidate pairs$(i_1,i_2)$(目前儲存在cand_pair中)，讓它與similarty_list的所有value一一比較。明顯地，$(i_1,i_2)$在signature matrix的value中出現的次數，即為$i_1$在signature matrix中與$i_2$相等的row的數量。將這個數字除以100，即為jaccard similarity。

In [40]:
sim_count = {} # 新增空字典，key:candidate pair;value:jaccard similarity

for pairs in cand_pair:
    sim_value = 0; # 該candidate pair在signature matrix中相同的row數
    for item in similarty_list:
        if pairs in item[1]: # candidate pair也在similarty_list的value中
            sim_value = sim_value + 1 # 記數+1
    sim_count[pairs]=sim_value/100 # jaccard similarities

In [41]:
sim_count = sorted(sim_count.items(), key=lambda x:x[1], reverse = True) # 將字典依照value由大到小排列

最後按照規定的格式輸出前10筆最相似的文章，就大功告成了。可以發現052.txt與084.txt是相同的文章。

In [42]:
for i in range(0,10):
    print(sim_count[i][0],":",sim_count[i][1])

(52, 84) : 0.98
(12, 20) : 0.94
(47, 49) : 0.19
(64, 98) : 0.05
(33, 49) : 0.03
(34, 75) : 0.02
(38, 47) : 0.02
(47, 93) : 0.02
(16, 61) : 0.01
(34, 99) : 0.01
