In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.\
        builder.\
        appName('rec').\
        getOrCreate()

In [3]:
df = spark.read.csv('movielens_ratings.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [4]:
for item in df.head(1)[0]: print(item)

2
3.0
0


In [5]:
df.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [6]:
train_data, test_data = df.randomSplit([0.8, 0.2])

In [7]:
from pyspark.ml.recommendation import ALS
als = ALS(maxIter=5, regParam=0.01, userCol='userId', 
                                    itemCol='movieId', 
                                    ratingCol='rating')

model = als.fit(train_data)
predictions = model.transform(test_data)
predictions.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|      1|   1.0|     7|  2.3825893|
|      1|   1.0|    26|  1.8251883|
|      3|   1.0|     7|  2.2677824|
|      3|   2.0|     8|  2.1100519|
|      3|   1.0|    17|-0.45989278|
|      5|   2.0|     0|  0.9621633|
|      5|   3.0|    16|  1.5061926|
|      4|   2.0|     1|   2.454458|
|      4|   3.0|     2|  2.6311057|
|      4|   1.0|    23| -1.0439783|
|      4|   1.0|    24|  3.0143018|
|      4|   4.0|    26| 0.08363968|
|      2|   3.0|     0|  1.3308035|
|      2|   1.0|     3| 0.19763592|
|      2|   4.0|     8|   2.005714|
|      2|   1.0|    12|-0.28230345|
|      2|   1.0|    19|-0.14066327|
|      2|   1.0|    25|-0.91666746|
|      0|   1.0|     8|   0.826931|
|      0|   3.0|    10| 0.86404705|
+-------+------+------+-----------+
only showing top 20 rows



In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('RMSE: {:.3f}'.format(rmse))

RMSE: 1.668


In [9]:
single_user = test_data.filter(test_data['userId']==11).select(['movieId', 'userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     25|    11|
|     36|    11|
|     38|    11|
|     62|    11|
|     71|    11|
|     72|    11|
|     90|    11|
|     94|    11|
|     99|    11|
+-------+------+



In [10]:
recommendations = model.transform(single_user)
recommendations.orderBy('prediction', ascending=False).show()

+-------+------+-------------+
|movieId|userId|   prediction|
+-------+------+-------------+
|     72|    11|    3.2811694|
|     71|    11|   0.74047893|
|     94|    11|    0.7389283|
|     99|    11|    0.5197887|
|     90|    11|   0.47392392|
|     36|    11|-0.0063428283|
|     38|    11|   -1.0103924|
|     62|    11|   -2.2732852|
|     25|    11|    -4.855615|
+-------+------+-------------+

