In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("recommendation").getOrCreate()

In [3]:
data = spark.read.csv('movielens_ratings.csv',header=True, inferSchema=True)

In [4]:
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [5]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [6]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [7]:
training, test = data.randomSplit([0.8,0.2])

In [8]:
als = ALS(maxIter = 5, regParam=0.01, userCol = 'userId',itemCol = 'movieId', ratingCol = 'rating')

In [9]:
model = als.fit(training)

In [10]:
predictions = model.transform(test)

In [11]:
predictions.show()

+-------+------+------+------------+
|movieId|rating|userId|  prediction|
+-------+------+------+------------+
|      1|   1.0|     4|  0.10545519|
|      1|   1.0|     6|   2.1765072|
|      1|   1.0|    19|   1.6640809|
|      1|   1.0|    20| 0.023291722|
|      1|   3.0|    25|   1.8769858|
|      1|   1.0|    26| -0.58442277|
|      6|   1.0|     4|   2.1058404|
|      6|   1.0|     6|   1.9085337|
|      6|   1.0|    13|   1.8811514|
|      5|   1.0|     6| 0.024649287|
|      4|   2.0|     1|   2.2692835|
|      4|   1.0|     5|  0.48510626|
|      4|   1.0|     9|   1.2786849|
|      4|   1.0|    12|-0.053144917|
|      4|   2.0|    20|   2.6056507|
|      2|   1.0|     3|   1.3905638|
|      2|   1.0|    16|    3.975776|
|      2|   1.0|    19|   2.1156929|
|      2|   1.0|    23|  0.36899206|
|      0|   1.0|     8|   0.6399194|
+-------+------+------+------------+
only showing top 20 rows



In [12]:
evaluation = RegressionEvaluator(predictionCol = 'prediction', labelCol = 'rating', metricName = 'rmse')

In [13]:
rmse = evaluation.evaluate(predictions)

In [14]:
print(rmse)

1.9026778907875577


In [18]:
single_user = test.filter(test['userId'] == 11).select('userId','movieId')

In [19]:
single_user.show()

+------+-------+
|userId|movieId|
+------+-------+
|    11|      0|
|    11|     30|
|    11|     39|
|    11|     41|
|    11|     59|
|    11|     66|
|    11|     71|
|    11|     72|
+------+-------+



In [20]:
recommendations = model.transform(single_user)
recommendations.orderBy('prediction', ascending=False).show()

+------+-------+-----------+
|userId|movieId| prediction|
+------+-------+-----------+
|    11|     30|   4.537833|
|    11|      0|   3.268625|
|    11|     41|  2.8707871|
|    11|     59|  2.7919974|
|    11|     39|  1.8909669|
|    11|     72|-0.14433062|
|    11|     66|-0.24861374|
|    11|     71|-0.76370305|
+------+-------+-----------+



In [21]:
test.filter(test['userId'] == 11).select('movieId','rating').show()

+-------+------+
|movieId|rating|
+-------+------+
|      0|   1.0|
|     30|   5.0|
|     39|   1.0|
|     41|   1.0|
|     59|   1.0|
|     66|   4.0|
|     71|   3.0|
|     72|   3.0|
+-------+------+

