In [0]:
%scala
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.functions._
import org.apache.spark.ml.tuning.{ParamGridBuilder,TrainValidationSplit}
import org.apache.spark.ml.evaluation.RegressionEvaluator

In [0]:
dbutils.fs.ls("FileStore/tables/")

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/29103444_bank.csv', name='29103444_bank.csv', size=5834917, modificationTime=1717028119000),
 FileInfo(path='dbfs:/FileStore/tables/Customer_Gender.csv', name='Customer_Gender.csv', size=7628, modificationTime=1717027822000),
 FileInfo(path='dbfs:/FileStore/tables/movie_ratings-1.csv', name='movie_ratings-1.csv', size=1373062, modificationTime=1723149511000),
 FileInfo(path='dbfs:/FileStore/tables/movie_ratings-2.csv', name='movie_ratings-2.csv', size=1373062, modificationTime=1723149764000),
 FileInfo(path='dbfs:/FileStore/tables/movie_ratings.csv', name='movie_ratings.csv', size=1373062, modificationTime=1723149478000)]

In [0]:
%scala
val df = spark.read.option("header","true").option("inferSchema","true").format("csv").load("dbfs:/FileStore/tables/movie_ratings.csv")
df.printSchema()

In [0]:
%scala
df.count()

In [0]:
%scala
df.head(10)

In [0]:
%scala
val Array(train,test) = df.randomSplit(Array(0.75,0.25),seed=81)

In [0]:
%scala
train.count()

In [0]:
%scala
test.count()

In [0]:
%scala
val als = new ALS().setMaxIter(10).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating").setColdStartStrategy("drop")

In [0]:
%scala
val als_model = als.fit(train)

In [0]:
%scala
val predictions = als_model.transform(test)

In [0]:
%scala
predictions.show()

In [0]:
%scala
val errors = predictions.select(abs($"rating"-$"prediction"))
errors.show()

In [0]:
%scala
errors.na.drop().describe().show()

In [0]:
%scala
val paramGrid = new ParamGridBuilder().addGrid(als.regParam,Array(0.01,0.1,1.0)).build()

val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")

val trainValidationSplit = new TrainValidationSplit().setEstimator(als).setEvaluator(evaluator).setEstimatorParamMaps(paramGrid).setTrainRatio(0.8)

val model = trainValidationSplit.fit(train)

In [0]:
%scala 
val predictions = model.transform(test)
predictions.show()

In [0]:
%scala
val rmse = evaluator.evaluate(predictions)
println("RMSE:",rmse)

In [0]:
%scala
val r2_eval = new RegressionEvaluator().setLabelCol("rating").setMetricName("r2")
println("R2 Score:",r2_eval.evaluate(predictions))