In [0]:
#all spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("movie-1").getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 4)

In [0]:
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
path = "/FileStore/tables/movies.csv"
df = spark.read \
  .format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .option("path", path) \
  .load()

In [0]:
display(df)

movieId,rating,userId
2,3,0
3,1,0
5,2,0
9,4,0
11,1,0
12,2,0
15,1,0
17,1,0
19,1,0
21,1,0


In [0]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [0]:
df_new = df.groupby('movieId').count()
df_new.sort('count',ascending=False).show(10)

In [0]:
df_new1 = df.groupby('userId').count()
df_new1.sort('count',ascending=False).show(10)

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator,TrainValidationSplit

In [0]:
(training, test) = df.randomSplit([0.8, 0.2])
als = ALS(userCol= "userId", itemCol= "movieId", ratingCol="rating", coldStartStrategy = "drop")

In [0]:
model = als.fit(training)

In [0]:
prediction = model.transform(test)

In [0]:
evaluator1_1 = RegressionEvaluator(metricName= "rmse", labelCol=  "rating",predictionCol= "prediction")
evaluator1_2 = RegressionEvaluator(metricName= "mse", labelCol=  "rating",predictionCol= "prediction")
evaluator1_3 = RegressionEvaluator(metricName= "mae", labelCol=  "rating",predictionCol= "prediction")
rmse = evaluator1_1.evaluate(prediction)
mse = evaluator1_2.evaluate(prediction)
mae = evaluator1_3.evaluate(prediction)

In [0]:
print("rmse =", rmse)
print("mse =", mse)
print("mae =", mae)

In [0]:
(training1, test1) = df.randomSplit([0.7, 0.3])
model1 = als.fit(training1)
prediction1 = model1.transform(test1)

In [0]:
evaluator2_1 = RegressionEvaluator(metricName= "rmse", labelCol=  "rating",predictionCol= "prediction")
evaluator2_2 = RegressionEvaluator(metricName= "mse", labelCol=  "rating",predictionCol= "prediction")
evaluator2_3 = RegressionEvaluator(metricName= "mae", labelCol=  "rating",predictionCol= "prediction")
rmse1 = evaluator2_1.evaluate(prediction1)
mse1 = evaluator2_2.evaluate(prediction1)
mae1 = evaluator2_3.evaluate(prediction1)

In [0]:
print("rmse =", rmse1)
print("mse =", mse1)
print("mae =", mae1)

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
parameters=ParamGridBuilder()\
.addGrid(als.regParam, [0.01, 0.05, 0.1])\
.addGrid(als.rank, [10, 50, 100])\
.build()

In [0]:
cv = CrossValidator(estimator =als,estimatorParamMaps =parameters,evaluator =evaluator1_1,numFolds=3)

In [0]:
model = cv.fit(training)

In [0]:
best_model = model.bestModel

In [0]:
print("rank:", best_model._java_obj.parent().getRank())
print("RegParam:", best_model._java_obj.parent().getRegParam())

In [0]:
predictions = model.transform(test)
rmse_f = evaluator1_1.evaluate(predictions)

In [0]:
print("rmse =",rmse_f)

In [0]:
recommendations = best_model.recommendForAllUsers(30)

In [0]:
recommendations1 = recommendations.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"),col("rec_exp.rating"))

In [0]:
recommendations1.show()

In [0]:
user10movie = df.select('movieId').filter('userId = 10')
user14movie = df.select('movieId').filter('userId = 14')

In [0]:
user10 = [int(row['movieId'])for row in user10movie.collect()]
user14 = [int(row['movieId'])for row in user14movie.collect()]

In [0]:
recommendations1.filter('userId = 10').filter(~recommendations1.movieId.isin(user10)).limit(10).show()

In [0]:
recommendations1.filter('userId = 14').filter(~recommendations1.movieId.isin(user10)).limit(10).show()