In [1]:
import findspark
import json
import itertools
import sys
import time
import random

findspark.init()
from pyspark import SparkConf, SparkContext

In [2]:
def build_min_hash_func(a, b, p, m):
    def min_hash_func(x):
        return (((a * x + b) % p) % m)

    return min_hash_func

In [3]:
def get_min_hash_functions(num_func, buckets):
    list_a = random.sample(range(50331653, 92233720), num_func)
    list_b = random.sample(range(25165843, 92233720), num_func)
    p = 12582917
    min_hash_func_list = [build_min_hash_func(a, b, p, buckets) for a, b in zip(list_a, list_b)]

    return min_hash_func_list

In [4]:
def check_jaccard_similarity(candidate, business_user_tokens):
    business_set_1 = set(business_user_tokens.get(candidate[0], []))
    business_set_2 = set(business_user_tokens.get(candidate[1], []))
    pair_jac_sim = 0
    if business_set_1 and business_set_2:
        pair_jac_sim = len(business_set_1.intersection(business_set_2)) / len(business_set_1.union(business_set_2))
    return tuple([candidate, pair_jac_sim])

In [5]:
def write_results(results, file_path):
    with open(file_path, 'w') as file:
        for line in results:
            file.write(json.dumps(line) + '\n')
    file.close()

In [6]:
def computeSimilarity(dict1, dict2):
    """
    compute Pearson Correlation Similarity
    :param dict1:
    :param dict2:
    :return: a float number
    """
    co_rated_user = list(set(dict1.keys()) & (set(dict2.keys())))
    val1_list, val2_list = list(), list()
    [(val1_list.append(dict1[user_id]),
      val2_list.append(dict2[user_id])) for user_id in co_rated_user]

    avg1 = sum(val1_list) / len(val1_list)
    avg2 = sum(val2_list) / len(val2_list)

    numerator = sum(map(lambda pair: (pair[0] - avg1) * (pair[1] - avg2), zip(val1_list, val2_list)))

    if numerator == 0:
        return 0
    denominator = math.sqrt(sum(map(lambda val: (val - avg1) ** 2, val1_list))) * \
                  math.sqrt(sum(map(lambda val: (val - avg2) ** 2, val2_list)))
    if denominator == 0:
        return 0

    return numerator / denominator


# Main

In [7]:
conf = SparkConf()
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")
conf.setMaster('local[8]')
conf.setAppName('Assignment_3')
sc = SparkContext.getOrCreate(conf)

In [8]:
reviews_json = sc.textFile("asnlib/publicdata/train_review.json").map(json.loads)

In [9]:
reviews_json.first()

{'review_id': 'pxOrtki0sqXps5hSyLXKpA',
 'user_id': 'OLR4DvqFxCKLOEHqfAxpqQ',
 'business_id': 'zK7sltLeRRioqYwgLiWUIA',
 'stars': 5.0,
 'text': "Second time I've been here. First time was whatever. This time it was actually good. Way better than inn n out. It's the same type of burger that's why I put it up against that. I love that you can get grilled jalapeños. Just wish they came on the burger and not on the side.",
 'date': '2015-12-19 07:35:30'}

In [10]:
user_business_rating_sets = reviews_json.map(lambda x: (x.get('user_id'), x.get('business_id'), x.get('stars'))).distinct()

In [11]:
# create user tokens
user_tokens_dict = user_business_rating_sets\
    .map(lambda x: x[0])\
    .distinct()\
    .sortBy(lambda x: x)\
    .zipWithIndex()\
    .collectAsMap()

inverse_user_tokens_dict = {bid: token for token, bid in user_tokens_dict.items()}

In [13]:
# create business tokens
business_tokens = user_business_rating_sets\
    .map(lambda x: x[1])\
    .distinct()\
    .sortBy(lambda x: x)\
    .zipWithIndex()\
    
business_tokens_dict = business_tokens.collectAsMap()

inverse_business_tokens_dict = {bid: token for token, bid in business_tokens_dict.items()}

In [14]:
min_hash_func_list = get_min_hash_functions(50, len(user_tokens_dict) * 2)

In [15]:
# get user business tokenized maps
user_business_rating_tokenized_sets = user_business_rating_sets\
    .map(lambda x: (user_tokens_dict.get(x[0]), business_tokens_dict.get(x[1]), x[2]))

In [17]:
business_user_tokenized_pairs = user_business_rating_tokenized_sets.map(lambda x: (x[1], x[0]))

In [18]:
# create business user list
business_user_tokenized_map = business_user_tokenized_pairs.groupByKey().mapValues(lambda x: list(set(x))).filter(lambda x: len(x[1])>=3)

In [19]:
business_user_tokenized_map.count()

10118

In [20]:
user_business_tokenized_dict = business_user_tokenized_map.flatMap(lambda x: [(user, x[0]) for user in x[1]]).groupByKey().mapValues(lambda x: list(set(x))).collectAsMap()

In [21]:
business_hashed_values = business_tokens.map(lambda x: (x[1], [min_hash(x[1]) for min_hash in min_hash_func_list]))

In [22]:
signature_matrix_rdd = business_user_tokenized_map\
    .leftOuterJoin(business_hashed_values)\
    .map(lambda x: x[1])\
    .flatMap(lambda user_set: [(x, user_set[1]) for x in user_set[0]])\
    .reduceByKey(lambda a, b: [min(x, y) for x, y in zip(a, b)])

In [23]:
signature_matrix_rdd.first()

(23632,
 [3570,
  3392,
  5657,
  352,
  2278,
  5971,
  1378,
  571,
  792,
  3048,
  3444,
  300,
  4868,
  30,
  1860,
  950,
  2580,
  509,
  2928,
  2345,
  2715,
  3283,
  467,
  3647,
  1537,
  771,
  200,
  2102,
  748,
  2001,
  5,
  758,
  2358,
  2907,
  1591,
  4395,
  26,
  1769,
  717,
  6972,
  20,
  525,
  412,
  552,
  1595,
  2473,
  656,
  2166,
  7949,
  6723])

In [24]:
candidate_pairs = signature_matrix_rdd \
    .flatMap(lambda x: [(tuple([i, tuple(x[1][i:i + 1])]), x[0]) for i in range(0, 50)]) \
    .groupByKey()\
    .map(lambda x: list(x[1]))\
    .filter(lambda val: len(val) > 1) \
    .flatMap(lambda uid_list: [pair for pair in itertools.combinations(uid_list, 2)])

In [25]:
candidate_pairs.count()

48495849

In [26]:
jaccard_similar_users = candidate_pairs\
        .distinct()\
        .map(lambda x: check_jaccard_similarity(x, user_business_tokenized_dict))\
        .filter(lambda x: x[1] >= 0.01)\
#         .map(lambda x: {"b1": inverse_business_tokens_dict[x[0][0]], "b2": inverse_business_tokens_dict[x[0][1]], "sim": x[1]})\
#         .collect()

In [27]:
jaccard_similar_users.first()

((23632, 24958), 0.03225806451612903)

In [28]:
jaccard_similar_users.count()

21491909

In [34]:
user_business_rating_map = user_business_rating_tokenized_sets.map(lambda x: (x[0], (x[1], x[2]))).groupByKey().mapValues(lambda x: list(set(x))).collectAsMap()

In [35]:
len(user_business_rating_map)

26184

In [36]:
pearson_similar_pairs = jaccard_similar_users.map(lambda id_pair: (id_pair, computeSimilarity(user_business_rating_map[id_pair[0]], user_business_rating_map[id_pair[1]]))).filter(lambda kv: kv[1] > 0)

In [None]:
pearson_similar_pairs.count()