In [1]:
import findspark
findspark.init()

import pyspark
findspark.find()

from pyspark import SparkContext
sc = SparkContext()

## ItembasedRs

In [7]:
from __future__ import division
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS, Rating
import time
import json
import sys
from math import sqrt

In [8]:
from pyspark.sql.types import *


In [9]:
def mapDict(vals):
        return_dict = {}
        for val in vals:
            return_dict[val[0]] = val[1]
        return return_dict

def findAvg(vals):
        '''Find average value for each row'''
        count = 0
        tot = 0
        for item, rating in vals.items():
            tot += rating
            count += 1
        average = float(tot) / count
        
        for item, rating in vals.items():
            vals[item] = rating - average
        vals['row_avg'] = average
        return vals

In [10]:
train = sc.textFile('../Dataset/train.csv')
test = sc.textFile('../Dataset/test.csv')

In [11]:
train.take(5)

['user_id,movie_id,rating,unix_timestamp',
 '1,1,5,874965758',
 '1,2,3,876893171',
 '1,3,4,878542960',
 '1,4,3,876893119']

In [12]:
train_header = train.first()
train_filtered = train.filter(lambda row: row != train_header)

In [13]:
train_header

'user_id,movie_id,rating,unix_timestamp'

In [14]:
test_header = test.first()
test_filtered = test.filter(lambda row: row != test_header)

In [15]:
train_rdd = train_filtered.map(lambda x: x.split(',')).map(lambda line_split: (line_split[1], (line_split[0], float(line_split[2]))))

In [16]:
train_rdd.take(5)

[('1', ('1', 5.0)),
 ('2', ('1', 3.0)),
 ('3', ('1', 4.0)),
 ('4', ('1', 3.0)),
 ('5', ('1', 3.0))]

In [17]:
test_rdd = test_filtered.map(lambda x: x.split(',')).map(lambda line_split: (line_split[1], line_split[0], float(line_split[2])))

In [18]:
training_group = train_rdd.groupByKey()
training_group.take(5)

[('1', <pyspark.resultiterable.ResultIterable at 0x24a3d3a7610>),
 ('4', <pyspark.resultiterable.ResultIterable at 0x24a3d3a7b50>),
 ('8', <pyspark.resultiterable.ResultIterable at 0x24a3d3a7fa0>),
 ('9', <pyspark.resultiterable.ResultIterable at 0x24a3d3a7f10>),
 ('10', <pyspark.resultiterable.ResultIterable at 0x24a3d3a7a90>)]

In [19]:
training_group = train_rdd.groupByKey().mapValues(list)
training_group_dict = training_group.mapValues(mapDict)
training_group_dict_avg = training_group_dict.mapValues(findAvg)

In [20]:
training_group_dict_one = training_group_dict_avg.map(lambda x: (1, x))
training_one_reduce = training_group_dict_one.groupByKey().mapValues(list).map(lambda x: x[1])

In [21]:
training_one_reduce.collect()

[[('1',
   {'1': 1.1403061224489797,
    '2': 0.14030612244897966,
    '6': 0.14030612244897966,
    '10': 0.14030612244897966,
    '13': -0.8596938775510203,
    '15': -2.8596938775510203,
    '16': 1.1403061224489797,
    '18': 1.1403061224489797,
    '20': -0.8596938775510203,
    '21': 1.1403061224489797,
    '23': 1.1403061224489797,
    '25': 1.1403061224489797,
    '26': -0.8596938775510203,
    '38': 1.1403061224489797,
    '41': 0.14030612244897966,
    '42': 1.1403061224489797,
    '43': 1.1403061224489797,
    '44': 0.14030612244897966,
    '45': 1.1403061224489797,
    '49': -1.8596938775510203,
    '54': 0.14030612244897966,
    '56': 0.14030612244897966,
    '57': 1.1403061224489797,
    '58': 1.1403061224489797,
    '59': -1.8596938775510203,
    '62': -1.8596938775510203,
    '63': -0.8596938775510203,
    '64': 0.14030612244897966,
    '65': -0.8596938775510203,
    '70': 0.14030612244897966,
    '72': 0.14030612244897966,
    '73': -1.8596938775510203,
    '75': 0.140

In [22]:
training_data_compile = training_one_reduce.collect()
training_data_compile = training_data_compile[0]
training_compile_dict = {}

In [23]:
for i in range(len(training_data_compile)):
        training_compile_dict[training_data_compile[i][0]] = training_data_compile[i][1]

In [24]:
item_to_user_pre = train_rdd.map(lambda x: (x[1][0], x[0])).groupByKey().mapValues(list)
item_to_user_compile = item_to_user_pre.collect()
item_to_user_dict = {}

In [25]:
for i in range(len(item_to_user_compile)):
        item_to_user_dict[item_to_user_compile[i][0]] = set(item_to_user_compile[i][1])

In [26]:
test_data = test_rdd.collect()
RMSE_tmp = 0
tmp_result = []
pearson_threshold = 0.3
random_pred = 0
upper_limit = 150
lower_limit = 15

In [28]:
start_time_item = time.time()
for test in test_data:
        '''Get all the rows corresponding to cur user and item of test dataset'''
        cur_item, cur_user = test[0], test[1]
        filtered_train = {}
        if cur_item not in training_compile_dict or cur_user not in item_to_user_dict:
            '''If it's an unseen business id, assign some random prediction'''
            prediction = 2.5
            random_pred += 1
        else:
            '''We want to attach row corresponding to current item'''
            filtered_train[cur_item] = training_compile_dict[cur_item]
            cur_item_info = filtered_train[cur_item]
            '''Get a list of user_id who contains the current item'''
            row_set = item_to_user_dict[cur_user]
            for row in row_set:
                if len(training_compile_dict[row]) > lower_limit and len(training_compile_dict[row]) < upper_limit:
                    filtered_train[row] = training_compile_dict[row]
                
            '''Compute Pearson for each row and add to the final result if Pearson
            passes the threshold value'''
            predict_num = 0
            predict_den = 0
            for item, user_list in filtered_train.items():
                if item != cur_item:
                    num = 0
                    den1 = 0
                    den2 = 0
                    for user, rating in user_list.items():
                        if user in cur_item_info and user != cur_user and user != 'row_avg':
                            num += rating * cur_item_info[user]
                            den1 += rating**2
                            den2 += (cur_item_info[user])**2
                    denom = sqrt(den1) * sqrt(den2)
                    if num == 0 or denom == 0:
                        pearson = 0
                    else:
                        pearson = float(num) / denom
                    if pearson > pearson_threshold:
                        predict_num += (filtered_train[item][cur_user] + filtered_train[item]['row_avg']) * pearson
                        predict_den += abs(pearson)
            if predict_num == 0 or predict_den == 0:
                prediction = cur_item_info['row_avg']
            else:
                prediction = float(predict_num) / predict_den
                prediction = (prediction + cur_item_info['row_avg']) / 2.0 
        '''Save the results which consists of user_id, business_id, ground truth and predicted'''
        tmp_result.append(((test[1], test[0], test[2]), prediction))
        '''Compile results for final MSE computation'''
        RMSE_tmp += (test[2] - prediction)**2
end_time_item = time.time() - start_time_item

In [29]:
RMSE = sqrt(RMSE_tmp / len(test_data))
print('Time need', end_time_item)
print('RMSE',RMSE)

Time need 13.952967882156372
RMSE 1.4336449386433354
