## 1. Initialisation de Spark

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ALSModelTraining") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/04 16:44:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 2. Entraînement du modèle

In [2]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    rank=10,
    regParam=0.1,
    maxIter=10,
    coldStartStrategy="drop"
)

In [3]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Charger les données depuis HDFS
ratings = spark.read.csv("hdfs://namenode:9000/user/movielens/raw/rating.csv", header=True, inferSchema=True)

# Créer une fenêtre aléatoire par utilisateur
window = Window.partitionBy("userId").orderBy(F.rand())

# Garder au plus 20 notes par utilisateur
ratings_sampled = ratings.withColumn("row_num", F.row_number().over(window)) \
                         .filter(F.col("row_num") <= 10) \
                         .drop("row_num")

# Vérifier le schéma
ratings_sampled.printSchema()
ratings_sampled.show(5)


                                                                                

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)





+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|    12|    110|   4.0|1997-03-22 20:54:22|
|    12|    733|   4.0|1997-03-22 20:49:34|
|    12|    608|   5.0|1997-03-22 20:49:34|
|    12|    260|   4.0|1997-03-22 20:50:25|
|    12|    480|   3.0|1997-03-22 20:54:22|
+------+-------+------+-------------------+
only showing top 5 rows



                                                                                

In [4]:
ratings_sampled.groupBy("userId").count().orderBy("count", ascending=False).show()

[Stage 7:>                                                          (0 + 8) / 8]

+------+-----+
|userId|count|
+------+-----+
|     8|   10|
|     2|   10|
|     5|   10|
|     4|   10|
|     1|   10|
|    22|   10|
|    10|   10|
|    28|   10|
|    12|   10|
|    32|   10|
|    31|   10|
|    36|   10|
|     3|   10|
|    45|   10|
|    49|   10|
|    55|   10|
|    25|   10|
|    59|   10|
|    57|   10|
|    60|   10|
+------+-----+
only showing top 20 rows



                                                                                

In [5]:
(training, test) = ratings_sampled.randomSplit([0.8, 0.2], seed=42)

In [6]:
known_users = training.select("userId").distinct()
known_items = training.select("movieId").distinct()

test = test.join(known_users, on="userId", how="inner") \
           .join(known_items, on="movieId", how="inner")

In [7]:
print(f"training count = {training.count()}, test count = {test.count()}")
model = als.fit(training)

                                                                                

training count = 1108458, test count = 275791


                                                                                

## 3. Évaluation du modèle

In [8]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse:.4f}")

[Stage 178:>                                                        (0 + 8) / 8]

Root-mean-square error = 1.1350


                                                                                

## 4. Sauvegarde du modèle dans hdfs

In [9]:
# sauvergarde du fichier
model.write().overwrite().save("hdfs://namenode:9000/model/model_als")

                                                                                