In [1]:
from pyspark.sql.functions import *

In [2]:
ratings = spark.read.parquet("/mnt/spark-data/movielens.parquet").cache()

In [3]:
ratings.filter(col("title").contains("Compton")).show(truncate=False)

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS()

print(als.explainParams())

In [5]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",nonnegative=True, coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [6]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


In [7]:
my_movies = []
for title in ["Mr. Nobody", "Inception", "Black Mirror", "Compton", "Fermat"]:
  movie = ratings.filter(col("title").contains(title)).select("movieId", "title").distinct().collect()[0]
  print(str(movie[0]) + ": " + movie[1])
  my_movies.append(movie[0])

In [8]:
ratings.printSchema()

In [9]:
my_ratings = []
for movieId in my_movies:
  my_ratings.append((0, movieId, 5.0, ""))

myDF = spark.createDataFrame(my_ratings, ratings.schema)
display(myDF)

In [10]:
all_ratings = ratings.union(myDF)

In [11]:
als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, coldStartStrategy="drop")

model = als.fit(all_ratings)


In [12]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForUserSubset(ratings.select(lit(0).alias("userId")),20).cache()

In [13]:
display(userRecs)

In [14]:
exploded = userRecs.select(explode("recommendations").alias("rec"))
display(exploded)

In [15]:
mymovies = exploded.select(col("rec").getItem("movieId").alias("movieId"))
display(mymovies)

In [16]:
titles = ratings.select("movieId", "title").distinct()

In [17]:
display(mymovies.join(titles, mymovies["movieId"] == titles["movieId"])
        .drop(mymovies["movieId"]).drop(titles["movieId"]))