In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
appName="Recommender System"
spark=SparkSession \
        .builder \
        .appName(appName) \
        .config("spark.some.config.option" , "some-value") \
        .getOrCreate()

In [2]:
ratings=spark.read.csv('ml-latest-small/ratings.csv' ,  inferSchema=True , header=True)
movies=spark.read.csv('ml-latest-small/movies.csv' ,  inferSchema=True , header=True)

In [8]:
ratings.join(movies , "movieId").show(3)

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 3 rows



In [16]:
#use only column data of "userId", "movieId", dan "rating"
data = ratings.select("userId", "movieId", "rating")
#divide data, 70% for training and 30% for testing
splits = data.randomSplit([0.7, 0.3])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")
#calculate number of rows
train_rows = train.count()
test_rows = test.count()
print ("number of training data rows:", train_rows, 
       ", number of testing data rows:", test_rows)

number of training data rows: 70584 , number of testing data rows: 30252


In [17]:
#define ALS (Alternating Least Square) as our recommender system
als = ALS(maxIter=19, regParam=0.01, userCol="userId", 
          itemCol="movieId", ratingCol="label")
#train our ALS model
model = als.fit(train)
print("Training is done!")

Training is done!


In [18]:
prediction = model.transform(test)
print("testing is done!")

testing is done!


In [19]:
prediction.join(movies, "movieId").select(
    "userId", "title", "prediction", "trueLabel").show(n=10, truncate=False)


+------+---------------------------+----------+---------+
|userId|title                      |prediction|trueLabel|
+------+---------------------------+----------+---------+
|597   |Hudsucker Proxy, The (1994)|5.4054284 |2.0      |
|602   |Hudsucker Proxy, The (1994)|3.3373003 |4.0      |
|599   |Hudsucker Proxy, The (1994)|3.0761595 |2.5      |
|474   |Hudsucker Proxy, The (1994)|2.891672  |3.0      |
|500   |Hudsucker Proxy, The (1994)|5.048422  |1.0      |
|217   |Hudsucker Proxy, The (1994)|1.8197318 |2.0      |
|555   |Hudsucker Proxy, The (1994)|3.2830153 |3.0      |
|171   |Hudsucker Proxy, The (1994)|3.8798063 |3.0      |
|312   |Hudsucker Proxy, The (1994)|4.1859612 |4.0      |
|411   |Hudsucker Proxy, The (1994)|3.071249  |4.0      |
+------+---------------------------+----------+---------+
only showing top 10 rows



In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(labelCol="truelabel" , predictionCol="prediction" , metricName="rmse")
rmse=evaluator.evaluate(prediction)
print("Root mean Square Error" , rmse)

Root mean Square Error nan


In [20]:

prediction.count()
a = prediction.count()
print("number of original data rows: ", a)
#drop rows with any missing data
cleanPred = prediction.dropna(how="any", subset=["prediction"])
b = cleanPred.count()
print("number of rows after dropping data with missing value: ", b)
print("number of missing data: ", a-b)

number of original data rows:  30252
number of rows after dropping data with missing value:  28935
number of missing data:  1317


In [None]:
rmse = evaluator.evaluate(cleanPred)
print ("Root Mean Square Error (RMSE):", rmse)