In [1]:
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark import SQLContext
from pyspark.accumulators import AccumulatorParam
from sklearn.metrics import mean_squared_error
from math import sqrt
import time
conf = SparkConf()
sc = SparkContext(conf = conf)

In [2]:
def getBias(user, item):    # Bias btw. movie & user
    isUser = train_pdsDF.iloc[:, 0]==user
    isItem = train_pdsDF.iloc[:, 1]==item

    user_avg =  train_pdsDF[isUser]['Rating'].sum() / train_pdsDF[isUser].shape[0]
    user_bias = user_avg - global_avg

    item_avg = train_pdsDF[isItem]['Rating'].sum() / train_pdsDF[isItem].shape[0]
    item_bias = item_avg - global_avg

    user_item_bias = user_bias + item_bias + global_avg
    return user_item_bias
def getMovieID(idx, movie):     # Return the one is not equal to the movie in (user,movie)
    if simPdsDF["ItemID_1"][idx] == movie:
        return simPdsDF["ItemID_2"][idx]
    else:
        return simPdsDF["ItemID_1"][idx]

In [3]:
def getPredict(user, movie, top_k):
    # Get all similar movies
    is_ItemID_1_Sim = simPdsDF['ItemID_1'] == movie
    is_ItemID_2_Sim = simPdsDF['ItemID_2'] == movie
    is_Sim_PdsDF = simPdsDF[is_ItemID_1_Sim | is_ItemID_2_Sim]

    if len(is_Sim_PdsDF.index) < 1:  # New Item in Test.dat
        print('user: ', user, 'movie: ', movie, 'is_Sim_PdsDF.index < 1')
        return global_avg          

    # Get rated index
    isRatedIdx = []
    for idx in is_Sim_PdsDF.index:
        MovieID = getMovieID(idx, movie)
        if (user, MovieID) in train_dict:
            isRatedIdx.append(idx)

    # Sorted rated movies by similarity (from high to low)
    isRatedPdsDF = simPdsDF.iloc[isRatedIdx, :]
    isRatedPdsDF = isRatedPdsDF.sort_values(by = ['Similarity'], ascending = False)

    # Compute predicted rating by top_k similar movies
    Sim_total = 0
    Up_total = 0
    Bias_movie = getBias(user, movie)
    if isRatedPdsDF.shape[0]<top_k:
        top_k = isRatedPdsDF.shape[0]
    for k in range(top_k):
        MovieID_k = getMovieID(isRatedPdsDF.index[k], movie)
        Rating_k = train_dict[(user, MovieID_k)]
        Sim_k = isRatedPdsDF.iloc[k, 2]  # Column 2 is similairty, isRatedPdsDF is sorted by similairty
        Bias_k = getBias(user, MovieID_k)
        Up_total += Sim_k*(Rating_k - Bias_k)
        Sim_total += Sim_k
    PredRating = Bias_movie + (Up_total/Sim_total)
    return PredRating

In [4]:
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, value):
        return [0.0]*len(value)
    def addInPlace(self, val1, val2):   #val1: list
        val1 += val2
        return val1

In [5]:
# to get actual rating
def test_spilt_result(x):  # x[0]:user, x[1]:item, x[2]:rating 
    global ans
    global top_k
    PredRating = getPredict(x[0], x[1], top_k)
    ans += [[x[0], x[1], PredRating]]

In [None]:
from sklearn.model_selection import KFold
import pandas as pd

kf = KFold(n_splits=5, shuffle=True)
real_train_pdsDF = pd.read_csv('train.dat', sep = ",")
top_k = 51
for train, test in kf.split(real_train_pdsDF):
    start_time = time.time()
    #------------------------------------------------- Split train to trian_split, test_split
    train_spilt_pdsDF = real_train_pdsDF.iloc[train, :]
    test_spilt_pdsDF = real_train_pdsDF.iloc[test, :]
    train_spilt_pdsDF.to_csv('train_split.dat', sep = ",", index = False)
    test_spilt_pdsDF.to_csv('test_spilt.dat', sep = ",", index = False)
    
    #------------------------------------------------- Read test_split file to rdd
    test_spilt_lines = sc.textFile("test_spilt.dat")
    header = test_spilt_lines.first()
    test_spilt_lines = test_spilt_lines.filter(lambda line: line != header)
    test_spilt_rdd = test_spilt_lines.map(lambda line: line.split(',')).map(
            lambda tokens: (int(tokens[0]),int(tokens[1]),int(tokens[2])))
    global test_spilt_dict
    test_spilt_dict = {}
    for x, y, z in test_spilt_rdd.collect():
        test_spilt_dict[(x, y)] = z
    
    #------------------------------------------------- Read train_split file to rdd
    train_lines = sc.textFile("train_split.dat")
    header = train_lines.first()
    train_lines = train_lines.filter(lambda line: line != header)
    global train_rdd
    train_rdd = train_lines.map(lambda line: line.split(',')).map(
                lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2])))
    # Build Train Data Dict. with Format [(user, item)] = rating
    # for later check if the specific movie is rated
    global train_dict
    train_dict = {}
    for x, y, z in train_rdd.collect():
        train_dict[(x, y)] = z
    
    # ----------------------------------------------------------- Build simMat&simPdsDF
    sqlCon = SQLContext(sc)
    utilityMatrix = CoordinateMatrix(train_rdd)
    # Similarity Btw. Items
    simMat = utilityMatrix.toRowMatrix().columnSimilarities()
    # Convert simMat to Pandas format
    global simPdsDF
    sparkDF = simMat.entries.map(lambda x: str(x.i)+","+str(x.j)+","+str(x.value)).map(lambda w: w.split(',')).toDF()
    simPdsDF = sparkDF.toPandas()
    # edit columns' name
    simPdsDF.columns = ['ItemID_1', 'ItemID_2', 'Similarity']
    # change data type
    simPdsDF['ItemID_1'] = simPdsDF['ItemID_1'].astype(int)
    simPdsDF['ItemID_2'] = simPdsDF['ItemID_2'].astype(int)
    simPdsDF['Similarity'] = simPdsDF['Similarity'].astype(float)
    # ------------------------------------------------------ Used for RDD to calculate bias
    global train_pdsDF
    train_pdsDF = pd.read_csv('train_split.dat', sep = ",")
    train_pdsDF = train_pdsDF.drop("Timestamp", axis=1)
    global global_avg    # overall mean rating
    global_sum = train_pdsDF['Rating'].sum()
    global_avg = global_sum/train_pdsDF.shape[0]
    # ------------------------------------------------------Get predicted rating of Test.dat 
    ans = sc.accumulator([], VectorAccumulatorParam())
    test_spilt_rdd.foreach(test_spilt_result)
    ans = np.array(ans.value)
    targetList = list(zip(ans[:, 0], ans[:, 1]))
    y_actual = []
    for target in targetList:
        y_actual.append(test_spilt_dict[target])
    y_predict = ans[:, 2]
    #-------------------------------------------------------- Calculate RMSE
    rms = sqrt(mean_squared_error(y_actual, y_predict))
    print("k: %s, RMSE: %s" % (top_k, rms))
    print("--- %s seconds ---" % (time.time() - start_time))
    top_k += 5

In [None]:
'''k: 51, RMSE: 0.9337310790949384
--- 1351.5924348831177 seconds ---
k: 56, RMSE: 0.9385368190877761
--- 1357.0452268123627 seconds ---
k: 61, RMSE: 0.9276684827030836
--- 1403.6028318405151 seconds ---'''

In [None]:
'''
k: 30, RMSE: 0.9346116908876994
--- 957.2602119445801 seconds ---
k: 40, RMSE: 0.9262036293252163
--- 1151.4687297344208 seconds ---
k: 50, RMSE: 0.9257627541958798
--- 1305.7629108428955 seconds ---
k: 60, RMSE: 0.923157020767258
--- 1527.7708637714386 seconds ---
k: 70, RMSE: 0.9354237971076579
--- 1763.168536901474 seconds ---'''

In [None]:
'''k: 4, RMSE: 0.9761079319227415
--- 460.2000787258148 seconds ---
k: 6, RMSE: 0.9552355889053802
--- 479.1455092430115 seconds ---
k: 8, RMSE: 0.9319697215787797
--- 537.9487290382385 seconds ---'''