In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, concat_ws, first
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator


In [2]:
# Création Session Spark
spark = (SparkSession.builder
         .appName("ALS Recommendation System")
         .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000")
         .config("spark.driver.memory", "4g")  # Mémoire pour le driver
         .config("spark.executor.memory", "4g")  # Mémoire pour les executors
         .config("spark.sql.shuffle.partitions", "200")  # Nombre de partitions
         .config("spark.driver.maxResultSize", "2g")  # Taille max des résultats
         .getOrCreate())

In [3]:
movie = spark.read.csv("/data/movie.csv", header=True, inferSchema=True)

movie.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [4]:
rating = spark.read.csv("/data/rating.csv", header=True, inferSchema=True)
rating.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import col, when, count

# Compter les valeurs nulles pour chaque colonne du DataFrame `movie`
for column in movie.columns:
    null_count = movie.select(count(when(col(column).isNull(), 1)).alias("null_count")).collect()[0]["null_count"]
    print(f"Nombre de valeurs manquantes {column} = {null_count}")


Nombre de valeurs manquantes movieId = 0
Nombre de valeurs manquantes title = 0
Nombre de valeurs manquantes genres = 0


In [6]:

# Compter les valeurs nulles pour chaque colonne du DataFrame `rating`
for column in rating.columns:
    null_count = rating.select(count(when(col(column).isNull(), 1)).alias("null_count")).collect()[0]["null_count"]
    print(f"Nombre de valeurs manquantes {column} = {null_count}")



Nombre de valeurs manquantes userId = 0
Nombre de valeurs manquantes movieId = 0
Nombre de valeurs manquantes rating = 0
Nombre de valeurs manquantes timestamp = 0


In [7]:
# Vérifier le nombre de doublons dans `movie`
movie_dupes = movie.count() - movie.dropDuplicates().count()
print(f"Nombre de doublons dans movie = {movie_dupes}")

# Vérifier le nombre de doublons dans `rating`
rating_dupes = rating.count() - rating.dropDuplicates().count()
print(f"Nombre de doublons dans rating = {rating_dupes}")

Nombre de doublons dans movie = 0
Nombre de doublons dans rating = 0


In [8]:
# Vérifier le schéma des colonnes pour `movie`
movie.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [9]:
# Vérifier le schéma des colonnes pour `rating`
rating.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [10]:
from pyspark.ml.recommendation import ALS

# Étape 1 : Préparer les données d'entraînement et de test
# Fractionner les données en train et test
(training, test) = rating.randomSplit([0.8, 0.2], seed=42)

# Étape 2 : Configurer le modèle ALS

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,  # Garantit que les prédictions sont positives
    coldStartStrategy="drop",  # Ignore les prédictions avec données inconnues
    rank=10,  # Nombre de facteurs latents
    maxIter=10  # Nombre d'itérations
)


# Étape 3 : Entraîner le modèle ALS
als_model = als.fit(training)

In [14]:
predictions = als_model.transform(test)

# Configurer l'évaluateur de régression pour RMSE
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)

# Calculer le RMSE
rmse = evaluator.evaluate(predictions)
print(f"Le RMSE du modèle ALS est : {rmse}")


Le RMSE du modèle ALS est : 0.817251011276976


In [15]:
# Configurer l'évaluateur de régression pour RMSE
mae_evaluator = RegressionEvaluator(
    metricName="mae",
    labelCol="rating",
    predictionCol="prediction"
)

# Calculer le mae
mae = mae_evaluator.evaluate(predictions)
print(f"Le MAE du modèle ALS est : {mae}")


Le MAE du modèle ALS est : 0.6388887185607606


In [16]:
# Sauvegarder le modèle sur HDFS
als_model.write().overwrite().save("hdfs://namenode:9000/model/als_model")
print("Modèle ALS sauvegardé sur HDFS.")


Modèle ALS sauvegardé sur HDFS.
