In [2]:
import findspark
findspark.init('/home/mysparkub/spark-3.0.0-bin-hadoop2.7')

In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [4]:
spark = SparkSession.builder.appName('recommendation').getOrCreate()

In [6]:
data = spark.read.csv('files/movielens_ratings.csv', inferSchema=True, header=True)

In [7]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [10]:
data.head(2)

[Row(movieId=2, rating=3.0, userId=0), Row(movieId=3, rating=1.0, userId=0)]

In [11]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [12]:
traing_data, test_data = data.randomSplit([0.7, 0.3])

In [20]:
als = ALS(maxIter=5, regParam=0.01,
          userCol='userId', itemCol='movieId', ratingCol='rating')

In [21]:
model = als.fit(traing_data)

In [22]:
preds = model.transform(test_data)

In [24]:
preds.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|     31|   3.0|     7|-0.12558281|
|     85|   1.0|    28|   1.529695|
|     85|   1.0|     4|-0.60657084|
|     85|   4.0|     7|  2.2109587|
|     65|   1.0|    28| -0.5592395|
|     65|   1.0|    22| -0.8178828|
|     65|   1.0|    24|  1.1130672|
|     65|   1.0|     2|  2.7450645|
|     53|   3.0|    13|   1.110419|
|     53|   1.0|     6|  2.2310932|
|     53|   2.0|    19| -1.2846141|
|     53|   1.0|    23|  0.7481507|
|     53|   5.0|    21|  1.9827197|
|     53|   3.0|    14|  3.0363283|
|     78|   1.0|    19|  1.1203623|
|     78|   1.0|    17| 0.59944975|
|     78|   1.0|     4|  0.8138174|
|     78|   1.0|     8| 0.80272454|
|     78|   1.0|    24| 0.43931875|
|     78|   1.0|     2|  1.2526982|
+-------+------+------+-----------+
only showing top 20 rows



In [25]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                               predictionCol = 'prediction')

In [26]:
evaluator.evaluate(preds)

1.786415917969324

In [27]:
single_user = test_data.filter(test_data['userId']==11).select('movieId', 'userId')

In [28]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     10|    11|
|     12|    11|
|     18|    11|
|     22|    11|
|     27|    11|
|     39|    11|
|     40|    11|
|     45|    11|
|     48|    11|
|     59|    11|
|     62|    11|
|     64|    11|
|     67|    11|
|     71|    11|
|     81|    11|
|     90|    11|
+-------+------+



In [29]:
recommendations = model.transform(single_user)
recommendations.orderBy('prediction', ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     40|    11| 4.2707043|
|     39|    11| 3.6143785|
|     22|    11| 2.8721817|
|     18|    11| 2.7971923|
|     81|    11| 2.5131102|
|     10|    11| 2.2560072|
|     71|    11| 2.1320455|
|     27|    11| 2.0035443|
|     59|    11| 1.9092798|
|     90|    11| 1.3019637|
|     67|    11| 1.0935684|
|     48|    11| 0.7256887|
|     64|    11| 0.7231885|
|     45|    11|0.44451886|
|     62|    11|-2.3899038|
|     12|    11|-2.4471366|
+-------+------+----------+

