In [35]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [36]:
spark = SparkSession.builder \
.appName('proiect_tbd') \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.getOrCreate()

In [37]:
folder_path = "./dataset/datasets/netflix-inc/netflix-prize-data/versions/2"

In [38]:
df = spark.read.parquet(f"{folder_path}/data.parquet")

In [39]:
df.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- date: date (nullable = true)



In [40]:
df = df.drop('date')

In [41]:
df = df.withColumn("customer_id", df["customer_id"].cast("int"))
df = df.withColumn("movie_id", df["movie_id"].cast("int"))
df = df.withColumn("rating", df["rating"].cast("float"))

In [42]:
users_sample = df.select("customer_id").distinct().sample(fraction=0.05, seed=42)

df_sample = df.join(users_sample, "customer_id")

train, test = df_sample.randomSplit([0.8, 0.2], seed=42)

train.cache()
test.cache()

DataFrame[customer_id: int, movie_id: int, rating: float]

In [44]:
als = ALS(
userCol="customer_id",
itemCol="movie_id",
ratingCol="rating",
coldStartStrategy="drop"
)

In [45]:
param_grid = ParamGridBuilder() \
.addGrid(als.rank, [10, 100]) \
.addGrid(als.regParam, [.1]) \
.addGrid(als.maxIter, [10]) \
.build()

In [46]:
evaluator = RegressionEvaluator(
metricName="rmse",
labelCol="rating",
predictionCol="prediction")

In [47]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=2, parallelism = 6)

In [48]:
cv_model = cv.fit(train)

In [49]:
best_model = cv_model.bestModel
print(f"Rank = {best_model._java_obj.parent().getRank()}")
print(f"MaxIter = {best_model._java_obj.parent().getMaxIter()}")
print(f"RegParam = {best_model._java_obj.parent().getRegParam()}")

Rank = 10
MaxIter = 10
RegParam = 0.1


In [50]:
prediction = best_model.transform(test)
rmse = evaluator.evaluate(prediction)
print(f'RMSE = {rmse}')

RMSE = 0.8802686067343294


In [51]:
best_model.userFactors.show()
best_model.itemFactors.show()

+-----+--------------------+
|   id|            features|
+-----+--------------------+
| 1420|[0.2062382, -0.62...|
| 3040|[0.051690463, -1....|
| 3720|[0.03454953, -1.0...|
| 4700|[0.09872484, -0.9...|
| 5170|[0.25299394, -1.4...|
| 5570|[0.83103746, -0.6...|
| 5650|[-0.025405096, -0...|
| 5900|[0.1723902, -0.94...|
| 6360|[0.3888164, -1.06...|
| 6440|[-0.04673678, -0....|
| 8160|[-0.40817812, -1....|
| 9340|[-0.21737503, -0....|
|10150|[0.16012405, -1.1...|
|10590|[0.19862531, -0.3...|
|10630|[0.39672393, -0.7...|
|10800|[0.050946362, -1....|
|11760|[0.10006092, -0.7...|
|13070|[0.091259085, -0....|
|15630|[0.19547783, -0.8...|
|17140|[0.26336434, -1.3...|
+-----+--------------------+
only showing top 20 rows
+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.31707168, -0.5...|
| 20|[0.7878597, -0.59...|
| 30|[0.12952156, -0.7...|
| 40|[0.12555532, -0.9...|
| 50|[0.45601627, -0.8...|
| 60|[-0.36440235, -0....|
| 70|[0.73408884, -0.5...|
| 80|[0.5