In [1]:
import pyspark
import hashlib
import re

conf = pyspark.SparkConf().setMaster('local').setAppName('lsh')
sc = pyspark.SparkContext(conf=conf)

22/01/04 21:43:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Report
## readData
用 `wholeTextFiles` 把所有檔案讀入，再將每個txt檔的內容，一個字一個字檢查是不是含有特殊符號，因為題目要求3-shingles，所以把三個字concat在一起，為了節省空間，我使用`hashlib.sha256`轉成32bits，最後把資料存成`(file_id, [shingle1, shingle2,...])`，接著將資料轉成類似矩陣的結構`[(shingle1, [file_id]), (shingle2, [file_id])]`，最後的資料結構是`[(1, [file_id1, file_id2]), (2, [file_id2, file_id3]),...]`


In [2]:
def readData(x):
    file_id = int(x[0][-7:-4])
    content = x[1]
    res = []
    for l in content.splitlines():

        words = l.split(' ')
        new_words = []
        for i in words:
            i = re.sub('[^a-zA-Z0-9]$', '', i)
            i = re.sub('^[^a-zA-Z0-9]', '', i)
            new_words.append(i)
        for i in range(len(new_words)-2):
            shingles = new_words[i] + new_words[i+1] + new_words[i+2]
            shingles = int.from_bytes(hashlib.sha256(shingles.encode('utf8')).digest()[:4], 'little')
            res.append(shingles)

    return (file_id, res)


original_data = sc.wholeTextFiles('./athletics/*.txt').map(readData)
hash_matrix = original_data.flatMapValues(lambda x: x).map(lambda x: (x[1], [x[0]])).reduceByKey(lambda x, y: x+y).sortBy(lambda x: x[0], ascending=True)
hash_matrix = hash_matrix.zipWithIndex().map(lambda x: (x[1], x[0][1]))

## min_hash
這裡實作題目所要求產生100個hash function，並且將資料轉成`[((row, file_id, hash_function_id), hash_value)]`，最後的lambda function 更新最小的hash value

In [3]:
def min_hash(x):
    l = []
    res = []
    for i in range(100):
        hash_func = i * x[0] % 22111 % keys_conut
        l.append(hash_func)
    for i in x[1]:
        for idx, val in enumerate(l):
            res.append(((x[0], i, idx), val))
    return res
    
keys_conut = hash_matrix.keys().count()
signature_matrix = hash_matrix.flatMap(min_hash).reduceByKey(lambda x, y: x if x < y else y)



## gen_candidate_pair
這裡實作相同bucket的文章找出類似的文章

## cal_sim
計算Jaccard Similarity，找出union跟intersection的數量

## hash_bucket
因為題目要求50個band，所以將原本100個hash function直接除二，當作band的id，最後再把資料做hashing

In [4]:
def gen_candidate_pair(x):
    res = []
    for i in x[1]:
        for j in x[1]:
            if i != j:
                if i < j:
                    res.append((i, j))
                else:
                    res.append((j, i))
    return list(set(res))

def cal_sim(x):
    union = set(x[1][0]).union(set(x[1][1]))
    inter = set(x[1][0]).intersection(set(x[1][1]))
    
    return (x[0], len(inter) / len(union))


def hash_bucket(x):
    if x[0][0] % 2 == 0:
        return ((x[0][0] // 2, x[0][1]), x[1])
    else:
        return ((x[0][0] // 2, x[0][1]), x[1] * keys_conut)

lsh_matrix = signature_matrix.map(hash_bucket).reduceByKey(lambda x, y: x + y)
lsh_matrix = lsh_matrix.map(lambda x: ((x[0][0], x[1]), [x[0][1]])).reduceByKey(lambda x, y: x + y)
candidate_pairs = lsh_matrix.flatMap(gen_candidate_pair).distinct()


## Final Result
這裡將candidate pairs 與原本的資料join，並將結果的key改到後面，再做一次join把結果排列成`((file_id1, file_id2), ([shingle1, shingle2], [shingle2]))`，最後算出相似度後用降冪排列就得到答案了

In [5]:
sim_pairs = candidate_pairs.join(original_data).map(lambda x: (x[1][0], (x[0], x[1][1]))).join(
    original_data).map(lambda x: ((x[1][0][0], x[0]), (x[1][0][1], x[1][1])))
sim_pairs = sim_pairs.map(cal_sim).sortBy(lambda x: x[1], ascending=False)

ans = sim_pairs.take(10)
for i in ans:
    print('(%03d, %03d): %.2f %s' % (i[0][0], i[0][1], i[1]*100, '%'))

sc.stop()



(052, 084): 100.00 %
(012, 020): 100.00 %
(047, 049): 75.74 %
(030, 035): 71.00 %
(049, 088): 51.47 %
(048, 049): 48.58 %
(023, 038): 48.18 %
(014, 040): 39.97 %
(047, 088): 39.02 %
(047, 048): 36.79 %
