## Spark ML Movie Recommendation (Explicit)

- dataset: https://grouplens.org/datasets/movielens/100k/

In [1]:
from pyspark.conf import SparkConf
from pyspark import StorageLevel

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Spark ML") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("INFO")

In [3]:
df = spark.read.csv("../dataset/ml-ratings.csv", inferSchema=True, header=True).cache()
df.show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
|     1|    151|   4.0|
|     1|    223|   4.0|
|     1|    253|   4.0|
|     1|    260|   4.0|
+------+-------+------+
only showing top 10 rows



In [4]:
df.describe().show()

+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|           100000|            100000|            100000|
|   mean|         362.8304|         8572.4658|          3.507605|
| stddev|196.8029033568026|19056.086005583176|1.0629280136183334|
|    min|                1|                 1|               0.5|
|    max|              702|            128594|               5.0|
+-------+-----------------+------------------+------------------+



## Alternating Least Square

![](../dataset/alsd.png)

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [6]:
(train, test) = df.randomSplit([0.8, 0.2])
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True)

In [7]:
# Hyperparameter
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [12, 14]) \
    .addGrid(als.maxIter, [18, 20]) \
    .addGrid(als.regParam, [.17, .19]) \
    .build()

# Evaluator RMSE
evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", 
    predictionCol="prediction")

In [8]:
# Cross-Validation
tvs = TrainValidationSplit(
    estimator=als, 
    estimatorParamMaps=paramGrid, 
    evaluator=evaluator)

In [9]:
# Fit ALS model to training data
model = tvs.fit(train)
bestModel = model.bestModel

In [10]:
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)

# Evaluation results
print("RMSE : {}".format(str(rmse)))
print("Best rank : {}".format(bestModel.rank))
print("Best maxIter : {}".format(bestModel._java_obj.parent().getMaxIter()))
print("Best regParam : {}".format(bestModel._java_obj.parent().getRegParam()))

RMSE : 0.8976724722515121
Best rank : 12
Best maxIter : 20
Best regParam : 0.17


In [11]:
predictions.sort("userId", "rating").show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|   1370|   3.0| 3.4125395|
|     1|   1750|   3.5| 2.4918456|
|     1|    924|   3.5| 3.4901712|
|     1|   6755|   3.5| 3.5711613|
|     1|   1208|   3.5| 3.7742562|
+------+-------+------+----------+
only showing top 5 rows



In [12]:
# Recommendation for all user (matrix R)
recs = bestModel.recommendForAllUsers(10)

In [13]:
# UDF function for mapping result
zip_ = udf(
    lambda x, y: list(zip(x, y)),
    ArrayType(StructType([
      StructField("movieId", IntegerType()),
      StructField("rating", DoubleType())
    ]))
)

In [14]:
# Recommendation for specific user
def get_recs_for_user(recs, userId):
    recs = recs.filter(recs["userId"]==userId)
    recs = recs.select("userId", "recommendations.movieId", "recommendations.rating") \
        .withColumn("tmp", explode(zip_("movieId", "rating"))) \
        .select("userId", "tmp.movieId", "tmp.rating")
    return recs

In [15]:
recs = get_recs_for_user(recs, "11")
recs.show()

+------+-------+------------------+
|userId|movieId|            rating|
+------+-------+------------------+
|    11|    449| 5.564600467681885|
|    11|  31545| 5.513560771942139|
|    11|  26413| 5.472157001495361|
|    11|   5004| 5.369581699371338|
|    11|   7560| 5.316649436950684|
|    11|  48326| 5.310154438018799|
|    11|  91529|5.1881818771362305|
|    11|    751| 5.152843475341797|
|    11|  66934| 5.130558967590332|
|    11|   4798| 5.120045185089111|
+------+-------+------------------+



In [16]:
spark.stop()