In [1]:
import findspark
findspark.init()

In [2]:
import time
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, DateType, FloatType, IntegerType
import string
import html
import itertools
import numpy as np
import random

In [3]:
start_time = time.time()
APP_NAME = 'hw3'
NAMENODE_ADDR = '127.0.0.1:19000'
DIR_PATH = '/hw3'
sc = pyspark.SparkContext(appName=APP_NAME)
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
print('spark init time: {}s'.format(time.time() - start_time))

spark init time: 3.6000771522521973s


In [4]:
def reuters21578_parser(row, file_id):
    text = row[1]
    start = False
    start_idx = -1
    news = []
    body_count = 0
    for i in range(len(text)):
        if not start:
            if text[i:i + 6] == '<BODY>':
                start_idx = i + 6
                start = True
        else:
            if text[i:i + 7] == '</BODY>':
                news.append((file_id * 10000 + body_count, text[start_idx:i]))
                body_count += 1
                start = False
    return news


text_files = []
for i in range(1):
    text_file = sc.textFile('hdfs://{}{}/{}'.format(
        NAMENODE_ADDR, DIR_PATH, 'reut2-{0:0>3}.sgm'.format(i)))
    
    text_file = text_file.map(lambda x: (1, x)).reduceByKey(
        lambda a, b: a + b).flatMap(lambda x: reuters21578_parser(x, i))
    
    #text_files.append(sc.parallelize(text_file.take(20)))
    text_files.append(text_file)
news_rdd = sc.union(text_files)

In [5]:
trantab = str.maketrans("'", "’")
strip_chars = ' –…' + string.punctuation
def tokenizer(sentence):
    result = []
    for token in html.unescape(str(sentence)).split():
        token = token.lower()
        token = token.strip(strip_chars)
        if not token.isalpha():
            continue
        length = len(token)
        if length < 1 or length > 26:
            continue
        result.append(token)
    return result


def k_shingle(text, k):
    string = ' '.join(tokenizer(text))
    shingles = set([])
    for i in range(len(string)-k + 1):
        shingles.add(string[i:i+k])
    return shingles
shingled_news_rdd = news_rdd.map(lambda x: (x[0], k_shingle(x[1], 5)))

In [6]:
shingles = list(shingled_news_rdd.map(lambda x: x[1]).reduce(lambda a, b: a.union(b)))
shingles_count = len(shingles)

In [7]:
def encode_shingles(row, shingles_count):
    array = np.zeros(shingles_count, dtype=np.int)
    for shingle, idx in zip(shingles, range(shingles_count)):
        if shingle in row:
            array[idx] = 1    
    return array

In [8]:
encoded_news_rdd = shingled_news_rdd.map(
    lambda x: (x[0], encode_shingles(x[1], shingles_count)))

In [9]:
def numpy_array_to_csv_line(x):
    line = ''
    for v in x[:-1]:
        line += '{},'.format(v)
    line += '{}'.format(x[-1])
    return line
encoded_news_rdd.map(lambda x: numpy_array_to_csv_line(x[1])).saveAsTextFile('task1_result_Transpose.csv')

In [10]:
def is_prime(n):
    if n % 2 == 0 and n > 2:
        return False
    return all(n % i for i in range(3, int(n**0.5) + 1, 2))


def get_prime_above(m):
    while not is_prime(m):
        m += 1
    return m


def one_pass_min_hash(a_arr, b_arr, p, k, n):
    def _one_pass_min_hash(x):
        result = np.zeros(k, dtype=np.uint)
        for i in range(k):
            idxs = np.argwhere(x == 1)
            hash_func = lambda x: ((a_arr[i] * x + b_arr[i]) % p) % n
            hashed = np.vectorize(hash_func)(idxs)
            result[i] = np.min(hashed)
        return result

    return _one_pass_min_hash

In [11]:
HASH_FUNC_NUM = 100
a_arr = np.random.randint(shingles_count, size = HASH_FUNC_NUM)
b_arr = np.random.randint(shingles_count, size = HASH_FUNC_NUM)
p = get_prime_above(shingles_count)
signature_matrix = encoded_news_rdd.map(lambda x: (x[0], one_pass_min_hash(a_arr, b_arr, p, HASH_FUNC_NUM, shingles_count)(x[1])))
signature_matrix.map(lambda x: numpy_array_to_csv_line(x[1])).saveAsTextFile('task2_result_Transpose.csv')

In [None]:
similarity_threshold = 0.8
band = 20
r = 5