In [1]:
# Import Required Libraries
from pyspark.sql import SparkSession   # Establish Spark Session
from pyspark.ml.recommendation import ALS  # Recommendation Algorithm to use
from pyspark.ml.evaluation import RegressionEvaluator  # Evaluate Model

In [2]:
spark = SparkSession.builder.appName('Recommendation').getOrCreate()

In [3]:
df = spark.read.csv("/home/pabhijit/data/movielens_ratings.csv",inferSchema="True", header="True")

In [4]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [5]:
df.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [6]:
training, test = df.randomSplit([0.8,0.2])

In [7]:
als = ALS(maxIter=5,regParam=0.01,userCol="userId",itemCol="movieId",ratingCol="rating")

In [8]:
model = als.fit(training)

In [9]:
predictions = model.transform(test)

In [10]:
predictions.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     31|   1.0|    13|0.72061175|
|     31|   3.0|     8|0.41344836|
|     31|   1.0|    24| 3.1905134|
|     85|   1.0|    26| 5.6590185|
|     85|   1.0|    15| 2.4205744|
|     85|   1.0|     4| 1.3469886|
|     85|   1.0|    25|  4.796378|
|     85|   1.0|     2|-3.7027206|
|     65|   2.0|     3|0.78865254|
|     65|   2.0|    15| 0.7771948|
|     65|   5.0|    23| 2.6956856|
|     65|   1.0|     2|0.35914445|
|     53|   1.0|     9| 2.6459434|
|     53|   1.0|    25| 2.2707365|
|     78|   1.0|    28| 1.6759197|
|     78|   1.0|    27| 1.2931199|
|     78|   1.0|     1|0.85148066|
|     34|   1.0|    19|  0.832001|
|     34|   3.0|    25|0.87624526|
|     34|   1.0|    14| 0.8074608|
+-------+------+------+----------+
only showing top 20 rows



In [11]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [12]:
rmse = evaluator.evaluate(predictions)

In [15]:
print('RMSE')
print(rmse)

RMSE
2.2778209427636478


In [16]:
# Prediction for a single user
single_user = test.filter(test['userId'] == 11).select(['userId','movieId'])

In [17]:
# List down all the movies for useID (11).
single_user.show()

+------+-------+
|userId|movieId|
+------+-------+
|    11|     12|
|    11|     21|
|    11|     39|
|    11|     40|
|    11|     48|
|    11|     72|
|    11|     89|
|    11|     99|
+------+-------+



In [18]:
recommendation = model.transform(single_user)

In [20]:
recommendation.orderBy('prediction',ascending=False).show()

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    11|     48| 2.3940814|
|    11|     40|   1.95193|
|    11|     99| 1.2514138|
|    11|     72|0.93912214|
|    11|     89|-0.5327917|
|    11|     21| -0.586107|
|    11|     12| -1.772366|
|    11|     39|-4.2725763|
+------+-------+----------+

