In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('recommender').getOrCreate()

In [3]:
spark

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator

In [12]:
from pyspark.ml.recommendation import ALS

In [6]:
data = spark.read.csv('ratings.csv',inferSchema=True,header=True)

In [7]:
data.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [9]:
data.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+



In [10]:
training, test = data.randomSplit([0.8,0.2])

In [15]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId',itemCol='movieId',ratingCol = 'rating' )

In [17]:
model = als.fit(training)

In [19]:
preds = model.transform(test)

In [20]:
preds.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   191|    148|   5.0| 829760897|       NaN|
|   436|    471|   3.0| 833530187| 3.8209853|
|   409|    471|   3.0| 967912821|  4.915378|
|   372|    471|   3.0| 874415126| 1.3267151|
|   603|    471|   4.0| 954482443| 2.9638994|
|   218|    471|   4.0|1111624874|0.77381396|
|   500|    471|   1.0|1005528017| 2.7432816|
|   387|    471|   3.0|1139047519| 3.1823983|
|   610|    471|   4.0|1479544381| 3.4578323|
|   555|    471|   3.0| 978746933| 3.3763144|
|   176|    471|   5.0| 840109075|0.88349676|
|   599|    833|   1.5|1519330029| 2.1927679|
|   132|   1088|   4.0|1329984080| 3.1832604|
|    64|   1088|   4.0|1161559902| 3.2349393|
|   489|   1088|   4.5|1332775009| 2.9172792|
|   381|   1088|   3.5|1168664508|  2.993562|
|    10|   1088|   3.0|1455619275|0.28925198|
|    68|   1088|   3.5|1158534614|  4.237457|
|   525|   1088|   4.5|1476478367|

In [21]:
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')

In [23]:
rmse = evaluator.evaluate(preds)

In [24]:
print('RMSE')
print(rmse)

RMSE
nan


In [32]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])

In [33]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      6|    11|
|    110|    11|
|    368|    11|
|    377|    11|
|    480|    11|
|    593|    11|
|    733|    11|
|   1518|    11|
|   1604|    11|
|   1721|    11|
|   2028|    11|
+-------+------+



In [34]:
recommendations = model.transform(single_user)

In [35]:
recommendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    733|    11|  6.166466|
|   1518|    11| 5.1923084|
|      6|    11| 5.1093063|
|    593|    11| 4.4319224|
|    480|    11| 4.2295084|
|    377|    11|  4.204706|
|   2028|    11|  4.049731|
|    110|    11| 3.8632648|
|    368|    11|  3.475604|
|   1721|    11|  3.399053|
|   1604|    11|  2.350225|
+-------+------+----------+

