In [1]:
import findspark

In [2]:
findspark.init('/home/lee/spark-2.4.5-bin-hadoop2.7/')

In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [6]:
from pyspark.ml.recommendation import ALS

In [7]:
data = spark.read.csv('ratings.csv',inferSchema = True, header = None, sep=":")

In [8]:
data.show()

+---+----+---+----+---+----+---------+
|_c0| _c1|_c2| _c3|_c4| _c5|      _c6|
+---+----+---+----+---+----+---------+
|  1|null|122|null|5.0|null|838985046|
|  1|null|185|null|5.0|null|838983525|
|  1|null|231|null|5.0|null|838983392|
|  1|null|292|null|5.0|null|838983421|
|  1|null|316|null|5.0|null|838983392|
|  1|null|329|null|5.0|null|838983392|
|  1|null|355|null|5.0|null|838984474|
|  1|null|356|null|5.0|null|838983653|
|  1|null|362|null|5.0|null|838984885|
|  1|null|364|null|5.0|null|838983707|
|  1|null|370|null|5.0|null|838984596|
|  1|null|377|null|5.0|null|838983834|
|  1|null|420|null|5.0|null|838983834|
|  1|null|466|null|5.0|null|838984679|
|  1|null|480|null|5.0|null|838983653|
|  1|null|520|null|5.0|null|838984679|
|  1|null|539|null|5.0|null|838984068|
|  1|null|586|null|5.0|null|838984068|
|  1|null|588|null|5.0|null|838983339|
|  1|null|589|null|5.0|null|838983778|
+---+----+---+----+---+----+---------+
only showing top 20 rows



In [9]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: integer (nullable = true)



In [10]:
data.drop('_c1','_c3','_c5','_c6').show()

+---+---+---+
|_c0|_c2|_c4|
+---+---+---+
|  1|122|5.0|
|  1|185|5.0|
|  1|231|5.0|
|  1|292|5.0|
|  1|316|5.0|
|  1|329|5.0|
|  1|355|5.0|
|  1|356|5.0|
|  1|362|5.0|
|  1|364|5.0|
|  1|370|5.0|
|  1|377|5.0|
|  1|420|5.0|
|  1|466|5.0|
|  1|480|5.0|
|  1|520|5.0|
|  1|539|5.0|
|  1|586|5.0|
|  1|588|5.0|
|  1|589|5.0|
+---+---+---+
only showing top 20 rows



In [11]:
from pyspark.sql.functions import col

data = data.select(col('_c0').alias('UserID'),col('_c2').alias('MovieID'),col('_c4').alias('rating'))

In [12]:
data.show()

+------+-------+------+
|UserID|MovieID|rating|
+------+-------+------+
|     1|    122|   5.0|
|     1|    185|   5.0|
|     1|    231|   5.0|
|     1|    292|   5.0|
|     1|    316|   5.0|
|     1|    329|   5.0|
|     1|    355|   5.0|
|     1|    356|   5.0|
|     1|    362|   5.0|
|     1|    364|   5.0|
|     1|    370|   5.0|
|     1|    377|   5.0|
|     1|    420|   5.0|
|     1|    466|   5.0|
|     1|    480|   5.0|
|     1|    520|   5.0|
|     1|    539|   5.0|
|     1|    586|   5.0|
|     1|    588|   5.0|
|     1|    589|   5.0|
+------+-------+------+
only showing top 20 rows



In [13]:
(train,test) = data.randomSplit([0.75,0.25])

In [14]:
als = ALS(maxIter=10,regParam=0.01,userCol='UserID',itemCol='MovieID', ratingCol='rating',coldStartStrategy="drop")

In [15]:
model = als.fit(train)

In [16]:
predictions = model.transform(test)

In [17]:
predictions.show()

+------+-------+------+----------+
|UserID|MovieID|rating|prediction|
+------+-------+------+----------+
| 45622|    148|   2.0|  2.829324|
| 38139|    148|   2.0|  2.217082|
|  5764|    148|   1.0|  2.861872|
|  2062|    148|   4.0| 3.5125773|
| 68259|    148|   3.0| 2.7314262|
| 68414|    148|   4.0| 3.1425054|
| 10518|    148|   3.0|  2.608514|
| 44369|    148|   3.0| 2.7075639|
|  5676|    148|   1.0|  4.059069|
| 12863|    148|   3.0|  2.489973|
| 25556|    148|   4.0| 2.6623514|
| 52552|    148|   3.0| 2.1763194|
|  8594|    148|   2.0| 1.6334934|
| 66167|    148|   2.5| 2.7152464|
| 68303|    148|   2.0|    2.5351|
| 63070|    148|   4.0| 3.2351332|
| 69998|    148|   2.0| 2.3257787|
| 14134|    148|   3.0| 2.2266705|
| 24983|    148|   2.0| 3.0914733|
| 45283|    148|   5.0|  3.174144|
+------+-------+------+----------+
only showing top 20 rows



In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')

In [19]:
rmse = evaluator.evaluate(predictions)

In [20]:
rmse

0.8172570989944358

Now, let us test this on a user.

In [23]:
user = test.filter(test['userID']==1).select(['UserID','MovieID'])

In [24]:
recommendation = model.transform(user)

In [25]:
recommendation.orderBy('prediction',ascending = False).show()

+------+-------+----------+
|UserID|MovieID|prediction|
+------+-------+----------+
|     1|    589| 4.8272343|
|     1|    594| 4.7399898|
|     1|    355| 4.5227833|
|     1|    329| 4.2197657|
+------+-------+----------+



In [26]:
user = test.filter(test['userID']==1).show()

+------+-------+------+
|UserID|MovieID|rating|
+------+-------+------+
|     1|    329|   5.0|
|     1|    355|   5.0|
|     1|    589|   5.0|
|     1|    594|   5.0|
+------+-------+------+



As we can see from above, the rating and the prediction is almost equal.