In [1]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
movie = sc.textFile("./ml-latest-small/movies.csv")
movie_header = movie.take(1)[0]
movies = movie.filter(lambda l : l != movie_header).map(lambda l : l.split(",")).map(lambda l : (int(l[0]), str(l[1])))
rdf = spark.read.csv("./ml-latest-small/ratings.csv", header = True, inferSchema = True)

In [3]:
rdf.show(10)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
+------+-------+------+---------+
only showing top 10 rows



In [4]:
(rtrain, rtest) = rdf.randomSplit([0.8, 0.2])

In [5]:
rmodel = ALS(maxIter = 5, regParam = 0.01, userCol = "userId", itemCol = "movieId", ratingCol = "rating", coldStartStrategy = "drop").fit(rtrain)

In [6]:
pred = rmodel.transform(rtest)

In [7]:
pred.show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   436|    471|   3.0| 833530187|  4.058058|
|   216|    471|   3.0| 975212641| 2.4973757|
|   411|    471|   4.0| 835532928| 3.1790078|
|   608|    471|   1.5|1117161794| 2.6571565|
|   426|    471|   5.0|1451081135| 4.7889247|
|   260|    471|   4.5|1109409455|  1.997573|
|   373|    471|   5.0| 846830388| 4.3667307|
|   609|    833|   3.0| 847221080|  1.463094|
|   492|    833|   4.0| 863976674| 1.0984309|
|   608|    833|   0.5|1117506344|  1.060861|
+------+-------+------+----------+----------+
only showing top 10 rows



In [8]:
rmse = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction").evaluate(pred)
mae = RegressionEvaluator(metricName = "mae", labelCol = "rating", predictionCol = "prediction").evaluate(pred)

In [9]:
rmse

1.0892575269578453

In [10]:
mae

0.8150389481860125

In [11]:
user_rec = rmodel.recommendForAllUsers(10)

In [12]:
recommendations = user_rec.rdd.filter(lambda l : l[0] == 1).map(lambda l : l[1]).map(lambda l : [i[0] for i in l]).collect()[0]

In [13]:
m = dict(movies.collect())

In [14]:
rlist = []
for i in recommendations:
    rlist.append(m[i])

In [15]:
rlist

['Hamlet (1996)',
 '"Life Less Ordinary',
 'Jesus Camp (2006)',
 'Show Me Love (Fucking Åmål) (1998)',
 '"Neon Genesis Evangelion: The End of Evangelion (Shin seiki Evangelion Gekijô-ban: Air/Magokoro wo',
 'His Girl Friday (1940)',
 'Romeo and Juliet (1968)',
 'Atlantic City (1980)',
 'Key Largo (1948)',
 '"Holy Mountain']