# Part5

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import requests, json, os, sys, time, re
from sklearn.metrics.pairwise import linear_kernel,cosine_similarity
import pandas as pd

### Modifying SparkSession 

Spark session is a unified entry point of a spark application from Spark 2.0. It provides a way to interact with various spark’s functionality with a lesser number of constructs. Instead of having a spark context, hive context, SQL context, now all of it is encapsulated in a Spark session.

https://medium.com/@achilleus/spark-session-10d0d66d1d24

In [2]:
######## Spark session is a unified entry point of a spark application  #############
spark = SparkSession \
    .builder \
    .appName('spark-rec') \
    .config("configuration_key", "configuration_value") \
    .enableHiveSupport() \
    .getOrCreate()

In [3]:
sc = spark.sparkContext

In [4]:
import os
import math
import datetime
import pyspark.sql.functions as sf
from pyspark.sql.functions import desc
from pyspark.sql.window import Window
from pyspark import SparkConf, SparkContext
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql.types import TimestampType
datasets_path=os.getcwd() + "/RE_data"
#ratings_file = os.path.join(datasets_path, 'data/ratings.csv')


#### The dataset we use here is the same as in Part4, is item_id, user_id, ratings dataframe  ######

ratings_raw_data = sc.textFile("df_col_use_app_raings_new.csv")
ratings_raw_data_header = ratings_raw_data.take(1)[0]
ratings_data = ratings_raw_data.filter(lambda line: line != ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[1]),int(tokens[2]),int(float(tokens[3])))).cache()
######   we should assume each column here, by position [1],[2],[3] and data charactors
#map() function demands every input must return a value，flatMapValues expand the output result values set of input, put those into a bigger RDD
rddTraining, rddValidating, rddTesting = ratings_data.randomSplit([6,2,2], seed=1001)
#####   Split Training data, Validation data and Testing data by portion of 60%, 20%, 20%

### Modeling by Spark ALS 

* Build the Spark ALS model

In [5]:
rank = 10   
# The size of the feature vector used; the minimum value is 10, the minimum value of the feature vector, 
#the better the model produced, but it also costs more calculation cost
numIterations = 10
#Iteration numbers
alpha=0.01
#Confidence values in ALS，default 1.0

#lambda
#Regularization parameter，DEFAULT 0.01

############################################################################################
# Build the recommendation model using Alternating Least Squares based on implicit ratings #
############################################################################################

model = ALS.trainImplicit(rddTraining, 10, 10,alpha=0.01)
testset = sc.parallelize([(3, 4000), (3, 15700)])   
##########   you can assume any user_id and any item(game) id here   ######################
model.predictAll(testset).collect()

[Rating(user=3, product=15700, rating=0.9822181173763089),
 Rating(user=3, product=4000, rating=0.9871549585407864)]

* Get the games' rating predictions for users

In [11]:
# Calculate all predictions
rddTesting_map = rddTesting.map(lambda r: ((r[0], r[1]))) 
predictions = model.predictAll(rddTesting_map).map(lambda r: ((r[0], r[1]), (r[2]))) 
predictions.take(5)    ####### Output 5 results
#model.predictAll(rddTesting_map).collect()     Show all the Recommendation Results


[((6, 304050), 0.002293541944753008),
 ((0, 377160), 0.01218008741158405),
 ((0, 863550), 0.00662157475132602),
 ((3, 863550), 0.0037752978618249833),
 ((2, 239140), 0.009061042884919879)]

* We join the original Rates to our Predictions 

In [7]:
rates_and_preds = rddTesting.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions) 
rates_and_preds.take(5)

[((0, 30), (3.0, 0.014650328729938544)),
 ((0, 550), (5.0, 0.0009064751557851247)),
 ((0, 1046930), (2.0, 0.009567729278558423)),
 ((0, 304930), (1.0, 0.009429824281231292)),
 ((0, 319630), (5.0, 0.003907018228550996))]

### Results 

* Show the final top 10 recommendation for 8 users 0 - 7

In [8]:
Spark_rec_list = []
for i in range(8):
    Spark_rec_list.append(model.recommendProducts(i,10))
Spark_rec_df = pd.DataFrame(Spark_rec_list)

Spark_rec_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
0,"(0, 10, 1.0039087062032928)","(0, 230410, 1.0039087062032928)","(0, 261550, 1.0038017860126884)","(0, 582010, 1.0038017860126884)","(0, 380, 0.9980827438971605)","(0, 271590, 0.9962936001300994)","(0, 500, 0.9953937103977362)","(0, 238960, 0.9943937963381518)","(0, 363970, 0.9913195564075354)","(0, 578080, 0.988802977030889)"
1,"(1, 236390, 1.0038617174240998)","(1, 204360, 1.0037555089268277)","(1, 255710, 1.0036472918089963)","(1, 304050, 1.0034244887120733)","(1, 578080, 0.9962739393215608)","(1, 1046930, 0.9950699678783315)","(1, 218620, 0.994146490995629)","(1, 359550, 0.9935279808236084)","(1, 431960, 0.9901998237446543)","(1, 8930, 0.9872762943548623)"
2,"(2, 224260, 0.9946745247706099)","(2, 1085660, 0.9931579340739607)","(2, 578080, 0.9913788598717663)","(2, 292030, 0.9913296231678457)","(2, 72850, 0.9910815389303043)","(2, 377160, 0.9901411078493634)","(2, 431960, 0.98852639885961)","(2, 271590, 0.9881245951924617)","(2, 304930, 0.9865989121198795)","(2, 730, 0.9856853239819501)"
3,"(3, 219740, 1.0029898414280827)","(3, 278360, 1.0026350436237617)","(3, 130, 1.0026350436237617)","(3, 291480, 1.002512130039145)","(3, 242760, 0.9952071278394812)","(3, 96000, 0.9932086439659975)","(3, 105600, 0.9927665024928113)","(3, 550, 0.9894840753503809)","(3, 4000, 0.9871549585407864)","(3, 322330, 0.9869614878340032)"
4,"(4, 555570, 1.0009839761103627)","(4, 227300, 1.0009839761103627)","(4, 320, 1.0009839761103627)","(4, 381210, 1.0009839761103627)","(4, 240, 1.0009839761103627)","(4, 370910, 1.0009839761103627)","(4, 863550, 1.000858795614192)","(4, 80, 1.0007312083088022)","(4, 221380, 1.0007312083088022)","(4, 420, 1.0007312083088022)"
5,"(5, 220, 1.00298678581594)","(5, 377160, 0.999708016665643)","(5, 208090, 0.9967649747167598)","(5, 96000, 0.9955572785842058)","(5, 252950, 0.9930115206426398)","(5, 10180, 0.9930106394379453)","(5, 15700, 0.9908895730642852)","(5, 620, 0.990360269364651)","(5, 304930, 0.9896408710770234)","(5, 219640, 0.9887003311043836)"
6,"(6, 1085660, 1.007019322171078)","(6, 291550, 1.0041729885754034)","(6, 813820, 1.0040535013092233)","(6, 700330, 1.0040535013092233)","(6, 444090, 1.003807533332838)","(6, 438100, 1.0036809633789172)","(6, 218620, 0.9943763961199261)","(6, 359550, 0.9939160312756354)","(6, 252950, 0.9929075057109256)","(6, 433850, 0.9926072326444403)"
7,"(7, 413150, 1.0009940705518565)","(7, 44350, 0.9946134113220667)","(7, 30, 0.9946123776024681)","(7, 1046930, 0.9934343981392315)","(7, 500, 0.9911487857465932)","(7, 363970, 0.988797131719461)","(7, 377160, 0.9881315434470435)","(7, 105600, 0.9876446819601835)","(7, 252490, 0.9854567715013329)","(7, 219640, 0.985388703241106)"


### Evaluations 

* Mesuare our recommendation results by applying RMSE

In [9]:
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) 
print ('For testing data the RMSE is %s' % (error))


For testing data the RMSE is 3.6044256643640327
