In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
%load_ext autoreload
%autoreload 2

from os.path import join

import numpy as np

from pyspark.sql import SQLContext

In [102]:
path_root = "/home/tlin/notebooks/data"
path_pickle_train = join(
    path_root, "cache/two_domain/split_data/train")
path_pickle_test = join(
    path_root, "cache/two_domain/split_data/test")
path_pickle_baseline_sim = join(
    path_root, "cache/two_domain/item_based_sim/base_sim")
path_pickle_extended_sim = join(
    path_root, "cache/two_domain/extend_sim/extendsim")
path_pickle_private_mapped_sim = join(
    path_root, "cache/two_domain/private_mapping/privatemap")
path_pickle_nonprivate_mapped_sim = join(
    path_root, "cache/two_domain/private_mapping/nonprivatemap")
path_pickle_userbased_alterEgo = join(
    path_root, "cache/two_domain/cross_sim/user_based_alterEgo")
path_pickle_itembased_alterEgo = join(
    path_root, "cache/two_domain/cross_sim/item_based_alterEgo")
path_pickle_alterEgo_userbased_sim = join(
    path_root, "cache/two_domain/cross_sim/targetdomain_userbased_sim")
path_pickle_alterEgo_itembased_sim = join(
    path_root, "cache/two_domain/cross_sim/targetdomain_itembased_sim")
path_pickle_private_policy_userbased_sim = join(
    path_root, "cache/two_domain/policy/policy_userbased_sim")
path_pickle_private_policy_itembased_sim = join(
    path_root, "cache/two_domain/policy/policy_itembased_sim")

In [103]:
train_dataRDD = sc.pickleFile(path_pickle_train).cache()
test_dataRDD = sc.pickleFile(path_pickle_test).cache()

alterEgo_userbased_sim = sc.pickleFile(path_pickle_alterEgo_userbased_sim).cache()
alterEgo_itembased_sim = sc.pickleFile(path_pickle_alterEgo_itembased_sim).cache()

userbased_sim_pair = sc.pickleFile(path_pickle_private_policy_userbased_sim).cache()
itembased_sim_pair = sc.pickleFile(path_pickle_private_policy_itembased_sim).cache()

In [105]:
item_based_alterEgo = sc.pickleFile(path_pickle_itembased_alterEgo).cache()
user_based_alterEgo = sc.pickleFile(path_pickle_userbased_alterEgo).cache()

user_based_dict_bd = sc.broadcast(user_based_alterEgo.collectAsMap())
item_based_dict_bd = sc.broadcast(item_based_alterEgo.collectAsMap())

userbased_sim_pair_dict_bd  = sc.broadcast(userbased_sim_pair.collectAsMap())
itembased_sim_pair_dict_bd  = sc.broadcast(itembased_sim_pair.collectAsMap())


In [106]:
    def get_info(dataRDD):
        """get the information of RDD, either item or user.
        Args:
            dataRDD could either be userRDD or itemRDD.
            userRDD: (uid, (iid, rating, rating time)*)
            itemRDD: (iid, (uid, rating, rating time)*)
        Returns:
            info of the input RDD:
                (uid, (average, norm2, count))* or
                (iid, (average, norm2, count))* or
        """
        def norm2(ratings):
            """calculate the norm 2 of input ratings.
            Args:
                ratings: (iid, rating, rating time)*
            Returns:
                norm of ratings.
            """
            return np.sqrt(np.sum([rating[1] ** 2 for rating in ratings]))

        def average(ratings):
            """calculate the average of the ratings.
            Args:
                ratings: (iid, rating, rating time)*
            Returns:
                average of ratings.
            """
            return 1.0 * np.average([rating[1] for rating in ratings])

        def helper(line):
            """a helper function."""
            uid, ratings = line
            return uid, (average(ratings), norm2(ratings), len(ratings))
        return dataRDD.map(helper)


user_info = get_info(user_based_alterEgo)
item_info = get_info(item_based_alterEgo)

user_info_bd = sc.broadcast(user_info.collectAsMap())
item_info_bd = sc.broadcast(item_info.collectAsMap())

In [108]:
user_based_dict_bd, userbased_sim_pair_dict_bd, user_info_bd \
    = user_based_dict_bd, userbased_sim_pair_dict_bd, user_info_bd

In [113]:
    def bound_rating(rating):
        """bound the value of rating.
            If the predicted rating is out of range,
            i.e., < 0 or > 5, then, adjust it to the closest bound.
        """
        return 1.0 * max(0, min(int(rating + 0.5), 5))

    def user_based_prediction(line, rating_bd, sim_bd, user_bd):
        """Use this function to predict the rating of item for this user.
        Args:
            line: (uid, pairs)
                where pairs in the format of (iid, rating, time)*
            rating_bd: broadcast of {uid: [(iid1, rating1, time1)*]}*
            sim_bd: broadcast of {uid: [(uid, sim)*]}*
            user_bd: broadcast pf {uid: (average, norm, count)}*
        """
        def prediction(pair, uid_allneighbor_info, user_bd, uid):
            """do the prediction. It can either add decay rate or not,
                which is decided by `method`.
            Args:
                pair: (iid, rating, time)
                uid_allneighbor_info: (uid, sim, rating_record)*
                average_uid: average rating of current uid.
            """
            iid, real_rating, time = pair
            average_uid_rating = user_bd.value[uid][0]
            sim_rating = []
            for info in uid_allneighbor_info:
                uid, sim, ratings = info
                sim_rating += [
                    (rating[0], sim, rating[1] - average_uid_rating)
                    for rating in ratings if iid in rating[0]]

            if len(sim_rating) != 0:
                sim_rating = [
                    (line[0], line[1] * line[2], abs(line[1]))
                    for line in sim_rating]
                predicted_rating = average_uid_rating + sum(
                    map(lambda line: line[1], sim_rating)) / sum(
                    map(lambda line: line[2], sim_rating))
            else:
                predicted_rating = average_uid_rating
            return iid, real_rating, bound_rating(predicted_rating)

        uid, pairs = line
        uid_allneighbor_info = [
            (u[0], u[1], rating_bd.value[u[0]]) for u in sim_bd.value[uid]]
        return uid, [prediction(
            pair, uid_allneighbor_info, user_bd, uid) for pair in pairs]
    
sim_pair_dict_keys = set(sim_pair_dict_bd.value.keys())
result = test_dataRDD.filter(
    lambda line: line[0] in sim_pair_dict_keys).map(
    lambda line: user_based_prediction(
        line, user_based_dict_bd,
            userbased_sim_pair_dict_bd, user_info_bd))

[('A1TCIVMGCPMV6R',
  [('T:B003T2YI8G', 2.0, 2.0),
   ('T:B00004CY4P', 2.0, 2.0),
   ('T:B00004WZJJ', 4.0, 2.0),
   ('T:B00005LC8T', 4.0, 2.0),
   ('T:B000006FKR', 2.0, 2.0),
   ('T:B000A6UN3I', 2.0, 2.0),
   ('T:B00004U8ND', 4.0, 2.0),
   ('T:B00004CJ7D', 2.0, 2.0),
   ('T:630100843X', 2.0, 2.0),
   ('T:B00005JL1V', 2.0, 2.0),
   ('T:B00004RM5O', 2.0, 2.0),
   ('T:B000006G1C', 4.0, 2.0),
   ('T:B00005LW40', 4.0, 2.0),
   ('T:B00004CL0L', 4.0, 2.0),
   ('T:B00004CKAR', 2.0, 2.0),
   ('T:B00004TXKD', 4.0, 2.0)])]

In [114]:
    def item_based_prediction(line, rating_bd, sim_bd, item_bd):
        """predict the rating of item for a specific user.
        Args:
            line: (uid, pairs),
                where pairs: in the format of (iid, rating, time)*
            rating_bd: broadcast of {uid: [(iid1, rating1, time1)*]}
            sim_bd: broadcast of {iid: [(iid, sim)*]}
            item_bd: broadcast of {iid: (average, norm, length)}
        """
        def sort_by_time(pairs):
            """For each user, sort its rating records based on its datetime.
                More specifically, if time_a > time_b,
                    then: time_a <- x, time_b <- x + 1.
            """
            pairs = sorted(pairs, key=lambda line: line[2], reverse=False)
            order = 0
            out = []
            for i in xrange(len(pairs)):
                if i != 0 and pairs[i][2] == pairs[i - 1][2]:
                    out += [(pairs[i][0], pairs[i][1], order)]
                else:
                    order += 1
                    out += [(pairs[i][0], pairs[i][1], order)]
            return out

        def f_decay(cur, t_ui):
            return np.exp(- 0.1 * (cur - t_ui))

        def add_decay(pairs):
            """add decay rate to the pairs.
            Args:
                pairs:    sim * rating, sim, time
            """
            new_pairs = sort_by_time(pairs)
            current_time = max(map(lambda line: line[2], new_pairs)) + 1
            final_pairs = [
                (pair[0] * f_decay(current_time, pair[2]),
                 pair[1] * f_decay(current_time, pair[2]))
                for pair in new_pairs]
            return sum(map(lambda line: line[0], final_pairs)) / sum(
                map(lambda line: line[1], final_pairs))

        def prediction(uid, pair, rating_bd, sim_bd, item_bd):
            """do the prediction. It can either add decay rate or not,
                which is decided by `method`.
            """
            iid, real_rating = pair[0], pair[1]
            if iid not in sim_bd.value.keys():
                return ()
            iid_neighbors = [
                (i[0], i[1], rating_bd.value[i[0]]) for i in sim_bd.value[iid]]
            average_iid_rating = item_bd.value[iid][0]
            sim_rating = []
            for info in iid_neighbors:
                sim_rating += [
                    (iid, info[1], i[1] - item_bd.value[info[0]][0], i[2])
                    for i in info[2] if uid in i[0]]
            if len(sim_rating) != 0:
                sim_ratings = [
                    (line[1] * line[2], abs(line[1]), line[3])
                    for line in sim_rating]
                predicted_rating_no_decay = average_iid_rating + sum(
                    map(lambda line: line[0], sim_ratings)) / sum(
                    map(lambda line: line[1], sim_ratings))
                predicted_rating_decay = \
                    average_iid_rating + add_decay(sim_ratings)
            else:
                predicted_rating_no_decay = average_iid_rating
                predicted_rating_decay = average_iid_rating
            return iid, real_rating, \
                bound_rating(predicted_rating_no_decay), \
                bound_rating(predicted_rating_decay)

        uid, pairs = line
        return uid, [
            prediction(uid, pair, rating_bd, sim_bd, item_bd)
            for pair in pairs]
    
test = test_dataRDD.map(
    lambda line: item_based_prediction(
        line, item_based_dict_bd, itembased_sim_pair_dict_bd, item_info_bd))
test.take(1)

[('A1TCIVMGCPMV6R',
  [('T:B003T2YI8G', 2.0, 5.0, 5.0),
   ('T:B00004CY4P', 2.0, 4.0, 4.0),
   ('T:B00004WZJJ', 4.0, 5.0, 5.0),
   ('T:B00005LC8T', 4.0, 5.0, 5.0),
   ('T:B000006FKR', 2.0, 4.0, 4.0),
   ('T:B000A6UN3I', 2.0, 4.0, 4.0),
   ('T:B00004U8ND', 4.0, 5.0, 5.0),
   ('T:B00004CJ7D', 2.0, 4.0, 4.0),
   ('T:630100843X', 2.0, 5.0, 5.0),
   ('T:B00005JL1V', 2.0, 4.0, 4.0),
   ('T:B00004RM5O', 2.0, 4.0, 4.0),
   ('T:B000006G1C', 4.0, 5.0, 5.0),
   ('T:B00005LW40', 4.0, 5.0, 5.0),
   ('T:B00004CL0L', 4.0, 5.0, 5.0),
   ('T:B00004CKAR', 2.0, 5.0, 5.0),
   ('T:B00004TXKD', 4.0, 5.0, 5.0)])]