In [1]:
# https://medium.com/analytics-vidhya/movie-recommendation-with-collaborative-filtering-in-pyspark-8385dccecfca

# from pyspark.context import SparkContext
# from pyspark.sql.session import SparkSession
# sc = SparkContext.getOrCreate();

# spark = SparkSession(sc)

from pyspark.sql.session import SparkSession
import findspark
findspark.find()

import pyspark
sc = pyspark.SparkContext(appName="myAppName")

spark = SparkSession(sc)

In [3]:
# Reading our csv file
ratings = spark.read.option("inferSchema",True)\
                    .option("header",True)\
                    .csv("recommendation/ml-20m/ratings.csv")
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
+------+-------+------+----------+
only showing top 5 rows



In [4]:
# Initiate our ALS model and split dataset to train and test
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10, regParam=0.5, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

train, test = ratings.randomSplit([0.8, 0.2])

In [5]:
#So, now lets train our model and generate predictions.

#Training the model
alsModel = als.fit(train)

#Generating Predictions
prediction = alsModel.transform(test)

prediction.show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 87301|    148|   2.0| 974945135|  2.159832|
| 22884|    148|   3.0| 944947868|  2.433311|
|  5585|    148|   3.0| 833940677| 2.9189413|
| 36445|    148|   4.5|1419358555|  2.321209|
| 46146|    148|   2.0| 839629075| 1.9508841|
| 46944|    148|   2.0| 839965214|  2.567991|
| 60334|    148|   4.0| 832478061| 2.5743947|
| 46380|    148|   4.0| 828462479|   2.71672|
|108140|    148|   1.0| 840355078| 2.3267968|
|101628|    148|   1.0| 835452658| 2.1210291|
+------+-------+------+----------+----------+
only showing top 10 rows



In [6]:
# now that our model is trained, let’s check how good it is.
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(metricName="mse", labelCol="rating", predictionCol="prediction")

mse =evaluator.evaluate(prediction)

print(mse)


0.9947253748961725


In [7]:
# So let’s ask him to recommend top 3 movies for every user.
recommended_movie_df = alsModel.recommendForAllUsers(3)

#Show 10 of those recommended users
recommended_movie_df.show(10, False)

+------+---------------------------------------------------------------+
|userId|recommendations                                                |
+------+---------------------------------------------------------------+
|148   |[{126219, 6.3913794}, {125599, 5.6114106}, {121029, 5.513494}] |
|463   |[{126219, 6.871015}, {125599, 6.0323896}, {121029, 5.9271183}] |
|471   |[{126219, 6.0756793}, {125599, 5.334306}, {121029, 5.2412148}] |
|496   |[{126219, 7.090933}, {125599, 6.2257957}, {121029, 6.1169543}] |
|833   |[{126219, 6.602971}, {125599, 5.7967944}, {121029, 5.696224}]  |
|1088  |[{126219, 5.7822914}, {125599, 5.0763297}, {121029, 4.9883485}]|
|1238  |[{126219, 6.3560243}, {125599, 5.5805483}, {121029, 5.4828215}]|
|1342  |[{126219, 7.2717366}, {125599, 6.384574}, {121029, 6.273003}]  |
|1580  |[{126219, 3.3146727}, {125599, 2.9101472}, {121029, 2.859762}] |
|1591  |[{126219, 6.707773}, {125599, 5.889341}, {121029, 5.786321}]   |
+------+-------------------------------------------

In [None]:
# github access copy
