In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, expr, rank, dense_rank, row_number
from pyspark.sql.window import Window


In [2]:
spark = SparkSession.builder \
    .appName("MovieLens Retrieval and Ranking") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/10 12:02:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/10 12:02:21 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/10 12:02:21 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
ratings = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("ratings.csv")

movies = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("movies.csv")


In [4]:
train_ratings, test_ratings = ratings.randomSplit([0.8, 0.2], seed=42)

In [5]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", nonnegative=True)
model = als.fit(train_ratings)

23/05/10 12:02:34 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/05/10 12:02:34 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [6]:
predictions = model.transform(test_ratings)

In [7]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error: {rmse}")

Root-mean-square error: 0.8755676176270328


In [8]:
window = Window.partitionBy("userId").orderBy(col("prediction").desc())
ranked_predictions = predictions.withColumn("rank", dense_rank().over(window))
ranked_predictions.show()

+------+-------+------+---------+----------+----+
|userId|movieId|rating|timestamp|prediction|rank|
+------+-------+------+---------+----------+----+
|     1|   1089|   5.0|964982951| 5.0975866|   1|
|     1|    940|   5.0|964982176| 4.9025445|   2|
|     1|   1197|   5.0|964981872|  4.828831|   3|
|     1|   2947|   5.0|964982176|  4.684357|   4|
|     1|      6|   4.0|964982224| 4.6345983|   5|
|     1|   2529|   5.0|964982242| 4.6165557|   6|
|     1|   3578|   5.0|964980668|  4.556016|   7|
|     1|   2005|   5.0|964981710| 4.5136485|   8|
|     1|   3703|   5.0|964981909| 4.5113173|   9|
|     1|   1031|   5.0|964982653|  4.395322|  10|
|     1|   2596|   5.0|964981144|  4.395197|  11|
|     1|    596|   5.0|964982838| 4.3634067|  12|
|     1|    943|   4.0|964983614|   4.36233|  13|
|     1|    151|   5.0|964984041| 4.3612294|  14|
|     1|   2000|   4.0|964982211| 4.3560743|  15|
|     1|   2096|   4.0|964982838|  4.311615|  16|
|     1|   1573|   5.0|964982290|  4.290023|  17|


In [9]:
def get_recommendations(user_id, top_n=10):
    user_recommendations = ranked_predictions.filter(col("userId") == user_id).limit(top_n)
    return user_recommendations.join(movies, "movieId").select("userId", "movieId", "title", "rank")

user_id = 133
top_n = 10
recommendations = get_recommendations(user_id, top_n)
recommendations.show(truncate=False)

+------+-------+-----------------------------------------+----+
|userId|movieId|title                                    |rank|
+------+-------+-----------------------------------------+----+
|133   |32     |Twelve Monkeys (a.k.a. 12 Monkeys) (1995)|4   |
|133   |153    |Batman Forever (1995)                    |10  |
|133   |367    |Mask, The (1994)                         |8   |
|133   |377    |Speed (1994)                             |7   |
|133   |454    |Firm, The (1993)                         |9   |
|133   |474    |In the Line of Fire (1993)               |5   |
|133   |524    |Rudy (1993)                              |3   |
|133   |527    |Schindler's List (1993)                  |1   |
|133   |529    |Searching for Bobby Fischer (1993)       |6   |
|133   |593    |Silence of the Lambs, The (1991)         |2   |
+------+-------+-----------------------------------------+----+

