In [1]:
import pyspark
from pyspark.sql import SQLContext
import pandas as pd
import numpy as np

In [2]:
sqlContext = SQLContext(sc)
print(sc.version)

3.2.3




In [3]:
from pyspark.sql import SparkSession    

spark = SparkSession.builder.appName("Spark").getOrCreate()
data = spark.read.csv("./movieRating.csv", header=True)
data.show(5)

+-----------+------+-------+------+
|TrainDataID|UserID|MovieID|Rating|
+-----------+------+-------+------+
|          1|   796|   1193|     5|
|          2|   796|    661|     3|
|          3|   796|    914|     3|
|          4|   796|   3408|     4|
|          5|   796|   2355|     5|
+-----------+------+-------+------+
only showing top 5 rows



In [8]:
from pyspark.sql.types import NumericType

data = data.withColumn("UserID", data.UserID.cast("int"))
data = data.withColumn("MovieID", data.MovieID.cast("int"))
data = data.withColumn("Rating", data.Rating.cast("double"))

data.dtypes

[('TrainDataID', 'string'),
 ('UserID', 'int'),
 ('MovieID', 'int'),
 ('Rating', 'double')]

In [9]:
# train test split
train, test = data.randomSplit([0.8, 0.2])
print("Train data: ", train.count())
print("Test data: ", test.count())

Train data:  720037
Test data:  179836


In [10]:
from pyspark.ml.recommendation import ALS

model = ALS(userCol="UserID", itemCol="MovieID", ratingCol="Rating",
           rank=5, maxIter=10, seed=0)
model = model.fit(train)

In [11]:
model.transform(test).show(5)

+-----------+------+-------+------+----------+
|TrainDataID|UserID|MovieID|Rating|prediction|
+-----------+------+-------+------+----------+
|     100008|   358|   2686|   4.0| 3.7846448|
|     100009|   358|   2688|   3.0| 3.2758799|
|     100012|   358|    293|   4.0| 3.8001728|
|     100016|   358|    457|   4.0|  4.010044|
|     100018|   358|    608|   4.0|  3.740312|
+-----------+------+-------+------+----------+
only showing top 5 rows



In [14]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="mae", 
                                labelCol="Rating", 
                                predictionCol="prediction")
train_pred = model.transform(train)
test_pred = model.transform(test).na.drop()
print("MAE on train data: ", evaluator.evaluate(train_pred))
print("MAE on test data: ", evaluator.evaluate(test_pred))

MAE on train data:  0.6753235556392392
MAE on test data:  0.7031716085804948
