In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder\
    .appName("Recommender System Example")\
    .getOrCreate()

In [4]:
ratings = spark.read.load('../../data/movie-lens/ratings.csv', format='csv', header=True, inferSchema=True)
tags = spark.read.load('../../data/movie-lens/tags.csv', format='csv', header=True, inferSchema=True)
movies = spark.read.load('../../data/movie-lens/movies.csv', format='csv', header=True, inferSchema=True)
links = spark.read.load('../../data/movie-lens/links.csv', format='csv', header=True, inferSchema=True)

In [10]:
ratings.show(5)
type(ratings)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



pyspark.sql.dataframe.DataFrame

In [6]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [7]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [8]:
tags.show(5)

+------+-------+--------------------+----------+
|userId|movieId|                 tag| timestamp|
+------+-------+--------------------+----------+
|    15|    339|sandra 'boring' b...|1138537770|
|    15|   1955|             dentist|1193435061|
|    15|   7478|            Cambodia|1170560997|
|    15|  32892|             Russian|1170626366|
|    15|  34162|         forgettable|1141391765|
+------+-------+--------------------+----------+
only showing top 5 rows



In [9]:
links.show(5)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
|      4|114885| 31357|
|      5|113041| 11862|
+-------+------+------+
only showing top 5 rows



In [11]:
training, test = ratings.randomSplit([0.8, 0.2])

In [12]:
from pyspark.ml.linalg import SparseVector
from pyspark.sql import functions as sf

In [15]:
[min_max_row] = ratings.agg(sf.max(ratings.movieId), sf.min(ratings.movieId)).collect()
print(min_max_row)
print(type(min_max_row))

Row(max(movieId)=163949, min(movieId)=1)
<class 'pyspark.sql.types.Row'>


In [16]:
min_id, max_id = min_max_row['min(movieId)'], min_max_row['max(movieId)']

In [17]:
from pyspark.ml.recommendation import ALS

In [18]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [19]:
model = als.fit(training)

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

In [21]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.121062065373399


In [23]:
predictions.show(5)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   232|    463|   4.0| 955089443| 3.7941897|
|   242|    463|   4.0| 956685706|  3.739812|
|   126|    471|   5.0| 833287141| 3.6285453|
|   460|    471|   5.0|1072836030| 3.6248734|
|   602|    471|   3.0| 842357922| 4.0422745|
+------+-------+------+----------+----------+
only showing top 5 rows



In [25]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [26]:
userRecs.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[1940, 6.2453218...|
|   463|[[1824, 5.4961762...|
|   496|[[1940, 7.8468585...|
|   148|[[1939, 5.372315]...|
|   540|[[5114, 10.293946...|
+------+--------------------+
only showing top 5 rows



In [27]:
movieRecs.show(5)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[145, 5.082226],...|
|   5300|[[366, 7.936352],...|
|   6620|[[395, 7.5966344]...|
|  54190|[[395, 18.805351]...|
|    471|[[366, 6.4629617]...|
+-------+--------------------+
only showing top 5 rows



In [28]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [29]:
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[1940, 6.2453218...|
|   463|[[1824, 5.4961762...|
|   148|[[1939, 5.372315]...|
+------+--------------------+

