In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType,LongType, IntegerType

In [9]:
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()
movieSchema = StructType([ \
                     StructField("userId", IntegerType(), True), \
                         StructField("movieId", IntegerType(), True), \
                          StructField("rating", FloatType(), True), \
                             StructField("timestamp", LongType(), True)])


In [22]:
ratings = spark.read.schema(movieSchema).option("sep", "::").csv("./ratings.dat")
ratings.show(10)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|   5.0|978300760|
|     1|    661|   3.0|978302109|
|     1|    914|   3.0|978301968|
|     1|   3408|   4.0|978300275|
|     1|   2355|   5.0|978824291|
|     1|   1197|   3.0|978302268|
|     1|   1287|   5.0|978302039|
|     1|   2804|   5.0|978300719|
|     1|    594|   4.0|978302268|
|     1|    919|   4.0|978301368|
+------+-------+------+---------+
only showing top 10 rows



In [24]:
(training, testing) = ratings.randomSplit([0.6, 0.4])

In [25]:
als = ALS(maxIter=8, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

In [26]:
predictions = model.transform(testing)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Mean-Square-Error: {rmse * rmse}")

Mean-Square-Error: 0.8691739828950853
