In [1]:
import pandas as pd
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('sparkApp').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
print(sc)
numeric_val = sc.parallelize([1,2,3,4])
numeric_val.map(lambda x:x*x*x).collect()
sc.stop
import mllib

<SparkContext master=local appName=sparkApp>


In [2]:
#import module
from pyspark.ml.recommendation import ALS

#create session
appName = "Recommender system in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
#read file into dataFrame using automatically inferred schema
ratings = spark.read.csv('./dataset/ratings.csv', inferSchema=True, header=True)
movies = spark.read.csv('./dataset/movies.csv', inferSchema=True, header=True)
#merge "movies" and "ratings" dataFrame based on "movieId"
ratings.join(movies, "movieId").show(3)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|     31|     1|   2.5|1260759144|Dangerous Minds (...|               Drama|
|   1029|     1|   3.0|1260759179|        Dumbo (1941)|Animation|Childre...|
|   1061|     1|   3.0|1260759182|     Sleepers (1996)|            Thriller|
+-------+------+------+----------+--------------------+--------------------+
only showing top 3 rows



In [4]:
#use only column data of "userId", "movieId", dan "rating"
data = ratings.select("userID", "movieID", "rating")
#divide data, 70% for training and 30% for testing
splits = data.randomSplit([0.7, 0.3])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")
#calculate number of rows
train_rows = train.count()
test_rows = test.count()
print ("number of training data rows:", train_rows, 
       ", number of testing data rows:", test_rows)
type(data)

number of training data rows: 70078 , number of testing data rows: 29926


pyspark.sql.dataframe.DataFrame

In [5]:
#define ALS (Alternating Least Square) as our recommender system
als = ALS(maxIter=19, regParam=0.01, userCol="userID", 
          itemCol="movieID", ratingCol="label")
#train our ALS model
model = als.fit(train)
print("Training is done!")

Training is done!


In [6]:
prediction = model.transform(test)
print("testing is done!")

testing is done!


In [7]:
prediction.join(movies, "movieId").select(
    "userId", "title", "prediction", "trueLabel").show(n=10, truncate=False)

+------+---------------------------+----------+---------+
|userId|title                      |prediction|trueLabel|
+------+---------------------------+----------+---------+
|452   |Guilty as Sin (1993)       |2.2897446 |2.0      |
|380   |Guilty as Sin (1993)       |2.180541  |3.0      |
|534   |Guilty as Sin (1993)       |3.2426836 |4.0      |
|548   |Hudsucker Proxy, The (1994)|3.597429  |4.0      |
|285   |Hudsucker Proxy, The (1994)|4.126465  |5.0      |
|292   |Hudsucker Proxy, The (1994)|3.208682  |3.5      |
|306   |Hudsucker Proxy, The (1994)|3.6638885 |3.0      |
|452   |Hudsucker Proxy, The (1994)|2.607748  |3.0      |
|92    |Hudsucker Proxy, The (1994)|5.8184705 |4.0      |
|607   |Hudsucker Proxy, The (1994)|3.6398888 |4.0      |
+------+---------------------------+----------+---------+
only showing top 10 rows



In [8]:
#import RegressionEvaluator since we also want to calculate RMSE (Root Mean Square Error)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): nan


In [9]:
prediction.count()
a = prediction.count()
print("number of original data rows: ", a)
#drop rows with any missing data
cleanPred = prediction.dropna(how="any", subset=["prediction"])
b = cleanPred.count()
print("number of rows after dropping data with missing value: ", b)
print("number of missing data: ", a-b)

number of original data rows:  29926
number of rows after dropping data with missing value:  28733
number of missing data:  1193


In [None]:
rmse = evaluator.evaluate(cleanPred)
print ("Root Mean Square Error (RMSE):", rmse)