In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator,TrainValidationSplit

In [0]:
spark = SparkSession.builder.appName("Colloborative-Recommendor-System").getOrCreate()

In [0]:
path = "/FileStore/tables/movies.csv"

df = spark.read \
  .format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("path", path) \
  .load()

In [0]:
from collections import Counter

movies = df.select('movieId').rdd.flatMap(lambda x: x).collect()
counts = Counter(movies)
print('Top 10 movies')
print(sorted(counts, key=counts.get, reverse=True)[0:10])

In [0]:
users = df.select('userId').rdd.flatMap(lambda x: x).collect()

counts = Counter(users)
print('Top 10 users')
print(sorted(counts, key=counts.get, reverse=True)[0:10])

In [0]:
als = ALS(maxIter=20, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

In [0]:
# Model with 80/20 split
(training80, test20) = df.randomSplit([0.8, 0.2])
modelA = als.fit(training)
predictionsA = model.transform(test)
predictionsA.show(1)

In [0]:
# Model with 60/40 split
(training60, test40) = df.randomSplit([0.6, 0.4])
modelB = als.fit(training)
predictionsB = model.transform(test)
predictionsB.show(1)

In [0]:
evaluatorrmse = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluatorrmse.evaluate(predictionsA)
print("RMSE for Model with 80/20 split:", rmse)
rmse = evaluatorrmse.evaluate(predictionsB)
print("RMSE for Model with 60/40 split:", rmse)

In [0]:
evaluatormse = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')
mse = evaluatormse.evaluate(predictionsA)
print("MSE for Model with 80/20 split:", mse)
mse = evaluatormse.evaluate(predictionsB)
print("MSE for Model with 60/40 split:", mse)

In [0]:
evaluatormae = RegressionEvaluator(metricName='mae', labelCol='rating', predictionCol='prediction')
mae = evaluatormae.evaluate(predictionsA)
print("MAE for Model with 80/20 split:", mae)
mae = evaluatormae.evaluate(predictionsB)
print("MAE for Model with 60/40 split:", mae)

In [0]:
parameters = ParamGridBuilder().addGrid(als.regParam, [0.01, 0.05]).addGrid(als.rank, [10, 50]).build()

trainvs = TrainValidationSplit(estimator=als, estimatorParamMaps=parameters, evaluator=evaluatorrmse)

In [0]:
model = trainvs.fit(training80)

In [0]:
predictionsA = modelA.transform(test20)

rmse = evaluatorrmse.evaluate(predictions)

In [0]:
predictions.show(1)

In [0]:
print("RMSE value for model with 60/40 split after hyperparameter tuning:", rmse)

In [0]:
def recommendmovies(user):
  user_subset = df.where(df.userId == user)
  user_recs = model.recommendForUserSubset(user_subset, 75)
  recs = (user_recs.select("userId", explode("recommendations").alias("recommendation"))\
          .select("userId", "recommendation.*")).sort('rating', ascending=False)
  recs.select('userId','movieId','rating')
  top_15_recs = (recs.join(user_subset, on=['movieId'], how='left_anti')).show(15)
  return top_15_recs

In [0]:
recommendmovies(11)

In [0]:
recommendmovies(23)