In [24]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
    .appName("Recommender System Example")\
    .getOrCreate()

In [3]:
ratings = spark.read.load('../../data/movie-lens/ratings.csv', format='csv', header=True, inferSchema=True)
tags = spark.read.load('../../data/movie-lens/tags.csv', format='csv', header=True, inferSchema=True)
movies = spark.read.load('../../data/movie-lens/movies.csv', format='csv', header=True, inferSchema=True)
links = spark.read.load('../../data/movie-lens/links.csv', format='csv', header=True, inferSchema=True)

In [4]:
ratings.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [5]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [6]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [7]:
tags.show(5)

+------+-------+--------------------+----------+
|userId|movieId|                 tag| timestamp|
+------+-------+--------------------+----------+
|    15|    339|sandra 'boring' b...|1138537770|
|    15|   1955|             dentist|1193435061|
|    15|   7478|            Cambodia|1170560997|
|    15|  32892|             Russian|1170626366|
|    15|  34162|         forgettable|1141391765|
+------+-------+--------------------+----------+
only showing top 5 rows



In [8]:
links.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [9]:
training, test = ratings.randomSplit([0.8, 0.2])

In [10]:
from pyspark.ml.linalg import SparseVector
from pyspark.sql import functions as sf

In [11]:
[min_max_row] = ratings.agg(sf.max(ratings.movieId), sf.min(ratings.movieId)).collect()

In [12]:
min_id, max_id = min_max_row['min(movieId)'], min_max_row['max(movieId)']

In [13]:
from pyspark.ml.recommendation import ALS

In [14]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [15]:
model = als.fit(training)

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

In [17]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1308710219880738


In [18]:
predictions.show(5)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|   232|    463|   4.0|955089443| 4.2189507|
|    30|    463|   4.0|945277405| 3.1286047|
|    85|    471|   3.0|837512312|  4.214663|
|   588|    471|   3.0|842298526| 3.2913766|
|   126|    471|   5.0|833287141|  4.227268|
+------+-------+------+---------+----------+
only showing top 5 rows



In [19]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [20]:
userRecs.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[65188, 6.798457...|
|   463|[[501, 5.5781693]...|
|   496|[[1711, 8.327146]...|
|   148|[[6001, 6.2305584...|
|   540|[[8197, 8.511471]...|
+------+--------------------+
only showing top 5 rows



In [21]:
movieRecs.show(5)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[114, 5.613813],...|
|   5300|[[337, 9.532257],...|
|   6620|[[337, 9.550964],...|
|   7340|[[71, 7.4129243],...|
|  32460|[[410, 7.6847305]...|
+-------+--------------------+
only showing top 5 rows



In [22]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [23]:
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[65188, 6.798457...|
|   463|[[501, 5.5781693]...|
|   148|[[6001, 6.2305584...|
+------+--------------------+

