In [1]:
from pyspark import SparkConf, SparkContext
import re
from binascii import crc32
import time

### mappers

一些常見的功能單一的 map function：

  - #### mapper_expand_inner
  把結構為 $[word, [docID_1, docID_2......]]$ 的資料轉為 $[(docID_1, [word]), (docID_2, [word])......]$（方便後續將很多個 y 結合進一個 list 中）

  - #### mapper_switch
  交換 pair 的 key 與 value
  
  - #### mapper_length
  回傳 key 與 value list 的長度

In [2]:
def mapper_expand_inner(x):
    return [(doc, [x[0]]) for doc in x[1]]
def mapper_switch(x):
    return (x[1], x[0])
def mapper_length(x):
    return (x[0], len(x[1]))

### mapper 0


讀入整段文章，先以空格切開不同字串，再用正規表達式去除掉無意義的標點符號（在字串最前或最後的標點符號），然後再回傳答案。

In [3]:
def mapper0(line):
    words = []
    for word in line.split(' '):
        Len = len(word)
        if Len >= 1:
            word = re.sub('[^a-zA-Z0-9]$', '', word)
            word = re.sub('^[^a-zA-Z0-9]', '', word)
            words.append(word)
    return words

### mapper 1

input: $(word, idx)$<br>
output: $[(idx, word), (idx, word), (idx, word)]$

由於是 k shingle，將 k 個字串合稱為一個字段，則每個字串會在第 $idx - 2$, $idx - 1$, $idx$ 個字串時被使用到。<br>於是首先顛倒 key value 使字串有編號，而後回傳 k 個以利後續做到 k-shingle。

In [4]:
def mapper1(x):
    pair = []
    for i in range(-k+1, 1):
        if x[1]+i >= 0:
            pair.append((x[1]+i, [x[0]]))
    return pair

### mapper2

input:  $(idx, [k\_words])$<br>
output: $(hash\_value, [docID])$

將 k 個字串以空白分隔後相連成一個長字串，接下來對字串做 encode，既保證 unique 又比較省空間。<br>
為了避免文章末段的字串影響，因此排除掉長度不為 3 的情形。

In [5]:
def mapper2(x, doc_id):
    sep = ' '
    return [(crc32(sep.join(x[1]).encode()), [doc_id])] if len(x[1]) == 3 else []

### mapper3

input:  $(docID, [shingleIDs......])$<br>
output: $[(hashID, docID), value)......]$

以 100 個 hash function 來達到 min-hashing 的目標，在這邊的 hash function 是設計成：<br>
$= (i * (docID+1)) \% P$<br>
其中 i 是 0~100 的變數，idx 是字段編號，P 是自訂的一個質數。

In [36]:
def mapper3(x):
    hash_value = []
    for i in range(100):
        min_val = (i*x[1][0]) % P % N
        for sig_id in x[1]:
            new_min_val = (i*sig_id) % P % N
            if new_min_val < min_val:
                min_val = new_min_val
        hash_value.append(((i, x[0]), min_val))
    return hash_value

### mapper4

input:  $((hashID, docID), minhash))$<br>
output: $((bandID, docID), bucketID))$

因為每個 document 會有 100 個 hash function，為了限縮為 50 個 band，因此用 hash function ID 除以 2 來得到 band 的 ID。<br>
接著對於 row 0 的資料乘以 2，對於 row 1 的資料乘以 5，以此來做 hashing。

In [37]:
def mapper4(x):
    if x[0][0] % 2 == 0:
        return ((round(x[0][0] / 2), x[0][1]), x[1])
    else:
        return ((round(x[0][0] / 2), x[0][1]), x[1] * N)

### mapper5

input:  $((bandID, docID), bucketID))$<br>
output: $((bandID, bucketID), [docID])$

由於要看的是哪些 document 的同一個 band 會 hash 到同一個 bucket 當中，因此把 band 與 hash function 的 ID 當作 key，再把 document 的 ID 作為 list 回傳。

In [8]:
def mapper5(x):
    return ((x[0][0], x[1]), [x[0][1]])

### mapper6

input:  $((bandID, bucketID), [docID_1, docID_2......]))$<br>
output: $[(docID_1, docID_2), (docID_1, docID_3)......]$

用來選出 candidate pair 的 mapper，只要出現在同一個 band 與 bucket 中的 document，就是可能有高相似度的 document。<br>
因此可以忽略 key，只針對 sort 過的 value list，做成由小至大的 pair 後一併回傳。

In [9]:
def mapper6(x):
    cand_pair = []
    len1 = len(x[1])
    arr = sorted(x[1])
    for i in range(len1):
        for j in range(i+1, len1):
            cand_pair.append((arr[i], arr[j]))
    return cand_pair

### mapper7

input:  $(shingleID, [docID_1, docID_2......]))$<br>
output: $[((docID_1, docID_2), 1), ((docID_1, docID_3), 1)......]$

用來找出所有在某個 shingle 下重複的 document，作法與 mapper6 相似，只是為了統計總數而多加了一個 count value。

In [10]:
def mapper7(x):
    List = []
    len1 = len(x[1])
    arr = sorted(x[1])
    
    for i in range(len1):
        for j in range(i+1, len1):
            List.append(((arr[i], arr[j]), 1))
    return List

### reducer0

最常見的 reducer，將 value 值加總

In [11]:
def reducer0(x, y):
    return x + y

### reducer 1

input: $((hashID, docID), value))$

用來找出最小值的 reducer。

In [12]:
def reducer1(x, y):
    return y if y < x else x

### Setting

Create Spark context，以及設定好固定的變數。<br>
$k$ 為題目規定的 shingle 值。<br>
$doc\_num$ 為文章總數。<br>
$P$ 為 Hash function 用的質數。<br>

In [38]:
sc = SparkContext.getOrCreate()

k = 3
doc_num = 101
P = 29989

### Input && K-Shingling

讀入 $doc\_num$ 篇文章的資料。<br>
以 $shingle\_doc\_map$ 儲存 shingleID 與所有相關 docIDs 的對應表。<br>
以 $shingle\_set$ 儲存 docID 與其擁有的 shingleIDs 的對應表。<br>
以 $N$ 儲存 shingles 總數。

In [43]:
shingle_doc_map = sc.parallelize(list())

for i in range(doc_num):
    each_word_in_doc = sc.textFile("athletics/%03d.txt"%(i+1)).flatMap(mapper0)
    k_words_in_doc = each_word_in_doc.zipWithIndex().flatMap(mapper1).reduceByKey(reducer0)
    k_words_in_doc_with_id = k_words_in_doc.flatMap(lambda x: mapper2(x, i))
    shingle_doc_map = shingle_doc_map.union(k_words_in_doc_with_id)
    
shingle_doc_map = shingle_doc_map.reduceByKey(reducer0).zipWithIndex()
shingle_doc_map = shingle_doc_map.map(lambda x: (x[1], list(set(x[0][1]))))
shingle_set = shingle_doc_map.flatMap(mapper_expand_inner).reduceByKey(reducer0)

N = shingle_doc_map.count()

### Min-Hashing

以 $minhash\_set$ 儲存 100 個 hash function 針對 101 篇文章與 N 個 shingles 計算後的 min hashing 結果。

In [44]:
minhash_set = shingle_set.flatMap(mapper3).reduceByKey(reducer1)

26713


### LSH

針對 minhash 後的結果將它們 hash 到 bucket 之上，儲存為 $bucket\_set$。<br>
將 $bucket\_set$ 中所有曾被 hash 到一起的文章兩兩取出，作為 $sim\_doc\_set$。<br>
最後做 distinct 得出完整的 $candidate\_pair\_set$。

In [45]:
bucket_set = minhash_set.map(mapper4).reduceByKey(reducer0)
sim_doc_set = bucket_set.map(mapper5).reduceByKey(reducer0)
candidate_pair_set = sim_doc_set.flatMap(mapper6).distinct()

673


### Jaccard Distance

首先得出每篇 document 的 shingle 數量，存入 $doc\_len\_set$ 之中。<br>
接著透過 $shingle\_doc\_map$ 得到所有共享同一個字段的 document 的 ID，每共享一個就會出現一次，加總和後以 $join\_set$ 來存放所有文件彼此重複的字段的數量。<br>
接下來，依序使每對 candidate pair 的兩篇文章都透過 join 來得到對應長度；接著再針對整個 pair，透過 join 獲取 pair 重複字段的數量，再以 map 來根據這些資料取得各個 pair 的 jaccard distance。

In [46]:
doc_len_set = shingle_set.map(mapper_length)
join_set = shingle_doc_map.flatMap(mapper7).reduceByKey(reducer0)
ans = candidate_pair_set.join(doc_len_set).map(lambda x: (x[1][0], (x[0], x[1][1]))).join(doc_len_set)
ans = ans.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1] + x[1][1])).join(join_set).map(lambda x: (x[0], x[1][1] / (x[1][0] - x[1][1])))

### Output

將順序倒過來再 sortByKey 後依序輸出即可。

In [47]:
sorted_ans = ans.map(mapper_switch).sortByKey(ascending=False).take(10)
for doc in sorted_ans:
    print("(%03d, %03d) : %.2f %%" % (doc[1][0]+1, doc[1][1]+1, doc[0]*100.0))

(012, 020) : 100.00 %
(052, 084) : 100.00 %
(047, 049) : 75.50 %
(030, 035) : 70.67 %
(049, 088) : 50.98 %
(048, 049) : 48.49 %
(023, 038) : 48.21 %
(014, 040) : 39.74 %
(047, 088) : 38.55 %
(047, 048) : 36.61 %
