# Pipeline Batch ETL MovieLens

Ce notebook ex√©cute, √©tape par √©tape, l‚ÄôETL batch pour pr√©parer les donn√©es MovieLens √† l‚Äôentra√Ænement d‚Äôun mod√®le ALS robuste.

**√âtapes**  
1. Configuration & lecture des donn√©es  
2. Analyse exploratoire rapide  
3. Nettoyage avanc√©  
4. Enrichissement temporel  
5. √âcriture en Parquet  
6. Validation
arquet


In [1]:
# Cellule 1 ‚Äì Imports & SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, dayofmonth, count, mean, stddev
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("BatchETLPipelineEnhanced") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()



## 1. Lecture des donn√©es brutes

On charge les CSV depuis HDFS et on affiche quelques lignes.
.


In [2]:
movies_raw = spark.read.csv(
    "hdfs://namenode:9000/movielens/raw/movies/movies.csv",
    header=True, inferSchema=True
)
ratings_raw = spark.read.csv(
    "hdfs://namenode:9000/movielens/raw/ratings/ratings.csv",
    header=True, inferSchema=True
)

print(f"üîç Raw films  : {movies_raw.count()}")
print(f"üîç Raw notes  : {ratings_raw.count()}")

movies_raw.show(5, truncate=False)
ratings_raw.show(5, truncate=False)


üîç Raw films  : 27278
üîç Raw notes  : 20000263
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

+------+-------+------+-------------------+
|userId|movieId|rating|timestamp          |
+------+-------+------+------------

## 2. Analyse exploratoire rapide

On regarde la distribution des notes pour d√©tecter d‚Äô√©ventuels outliers.


In [3]:
stats = ratings_raw.select(
    mean("rating").alias("mean"),
    stddev("rating").alias("stddev")
).first()
mean_rating, stddev_rating = stats["mean"], stats["stddev"]
print(f"Mean={mean_rating:.3f}, StdDev={stddev_rating:.3f}")

ratings_raw.groupBy("rating").count().orderBy("rating").show()


Mean=3.526, StdDev=1.052
+------+-------+
|rating|  count|
+------+-------+
|   0.5| 239125|
|   1.0| 680732|
|   1.5| 279252|
|   2.0|1430997|
|   2.5| 883398|
|   3.0|4291193|
|   3.5|2200156|
|   4.0|5561926|
|   4.5|1534824|
|   5.0|2898660|
+------+-------+



## 3. Strat√©gie de nettoyage avanc√©

Pour maximiser la qualit√© de l‚ÄôALS, on :
1. Calcule le **z-score** des notes par utilisateur et exclut tout `|z| > 3`.  
2. Filtre ensuite les notes hors de l‚Äôintervalle global `[Œº ¬± 3œÉ]`.  
3. √âlimine les utilisateurs ayant < 20 interactions nettes et les films ayant < 50 interactions nettes.  
4. Supprime doublons et nulls.

Cette double d√©tection d‚Äôoutliers (global + par utilisateur) rendra le mod√®le plus robuste aux comportements extr√™mes.

In [4]:
from pyspark.sql.functions import avg, stddev, expr

# 1. Stats globales
stats = ratings_raw.select(mean("rating").alias("Œº"), stddev("rating").alias("œÉ")).first()
Œº, œÉ = stats["Œº"], stats["œÉ"]

# 2. Z-score par utilisateur
user_stats = ratings_raw.groupBy("userId") \
    .agg(avg("rating").alias("Œº_u"), stddev("rating").alias("œÉ_u"))

ratings_z = ratings_raw.join(user_stats, "userId") \
    .withColumn("z_score", (col("rating") - col("Œº_u"))/col("œÉ_u"))

# 3. Filtrage z-score et global
clean1 = ratings_z.filter((col("z_score").between(-3,3))) \
    .filter((col("rating") >= Œº - 3*œÉ) & (col("rating") <= Œº + 3*œÉ))

# 4. Compter interactions nettes
user_counts = clean1.groupBy("userId").count().alias("user_count")
movie_counts = clean1.groupBy("movieId").count().alias("movie_count")

# 5. Exclure les petits volumes
clean2 = clean1.join(user_counts.filter(col("count")>=20), "userId") \
               .join(movie_counts.filter(col("count")>=50), "movieId")

# 6. Suppression nulls/doublons
ratings_clean = clean2 \
    .dropna(how="any", subset=["userId","movieId","rating","timestamp"]) \
    .dropDuplicates(["userId","movieId","timestamp"]) \
    .cache()

count_after = ratings_clean.count()
print(f"üìä Notes apr√®s nettoyage : {count_after}")
ratings_clean.show(3, truncate=False)


üìä Notes apr√®s nettoyage : 19730315
+-------+------+------+-------------------+------------------+------------------+-------------------+-----+-----+
|movieId|userId|rating|timestamp          |Œº_u               |œÉ_u               |z_score            |count|count|
+-------+------+------+-------------------+------------------+------------------+-------------------+-----+-----+
|1      |31    |3.0   |2015-02-23 23:18:07|3.3760162601626016|1.5661841004394155|-0.2400843298416226|246  |49638|
|110    |31    |5.0   |2015-02-23 23:17:53|3.3760162601626016|1.5661841004394155|1.036904754288954  |246  |53622|
|260    |31    |5.0   |2015-02-23 23:17:13|3.3760162601626016|1.5661841004394155|1.036904754288954  |246  |54364|
+-------+------+------+-------------------+------------------+------------------+-------------------+-----+-----+
only showing top 3 rows



## 4. Enrichissement temporel

Extraire ann√©e, mois, jour et convertir `timestamp` en date si n√©cessaire.


In [5]:
ratings_enriched = (ratings_clean
    .withColumn("year",  year(col("timestamp")))
    .withColumn("month", month(col("timestamp")))
    .withColumn("day",   dayofmonth(col("timestamp")))
)
ratings_enriched.show(5, truncate=False)


+-------+------+------+-------------------+------------------+------------------+-------------------+-----+-----+----+-----+---+
|movieId|userId|rating|timestamp          |Œº_u               |œÉ_u               |z_score            |count|count|year|month|day|
+-------+------+------+-------------------+------------------+------------------+-------------------+-----+-----+----+-----+---+
|1      |31    |3.0   |2015-02-23 23:18:07|3.3760162601626016|1.5661841004394155|-0.2400843298416226|246  |49638|2015|2    |23 |
|110    |31    |5.0   |2015-02-23 23:17:53|3.3760162601626016|1.5661841004394155|1.036904754288954  |246  |53622|2015|2    |23 |
|260    |31    |5.0   |2015-02-23 23:17:13|3.3760162601626016|1.5661841004394155|1.036904754288954  |246  |54364|2015|2    |23 |
|364    |31    |3.0   |2015-02-25 06:13:27|3.3760162601626016|1.5661841004394155|-0.2400843298416226|246  |38946|2015|2    |25 |
|527    |31    |0.5   |2015-02-23 23:19:58|3.3760162601626016|1.5661841004394155|-1.83632068500

## 5. R√©duction du nombre de partitions Parquet

Pour √©viter de g√©n√©rer des milliers de petits fichiers, on repartitionne avant √©criture :

- **Movies** : 1 partition  
- **Ratings** : `year,month,day`, mais on limite √† ~ 10 partitions en coales√ßant sur la date.


In [6]:
ratings_final = ratings_enriched.select(
    "userId","movieId","rating","timestamp","year","month","day"
)

# V√©rif
print("Champs finaux :", ratings_final.columns)
ratings_final.show(3, truncate=False)

Champs finaux : ['userId', 'movieId', 'rating', 'timestamp', 'year', 'month', 'day']
+------+-------+------+-------------------+----+-----+---+
|userId|movieId|rating|timestamp          |year|month|day|
+------+-------+------+-------------------+----+-----+---+
|31    |1      |3.0   |2015-02-23 23:18:07|2015|2    |23 |
|31    |110    |5.0   |2015-02-23 23:17:53|2015|2    |23 |
|31    |260    |5.0   |2015-02-23 23:17:13|2015|2    |23 |
+------+-------+------+-------------------+----+-----+---+
only showing top 3 rows



In [7]:
# Movies en 1 unique CSV
movies_raw.repartition(1) \
    .write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("hdfs://namenode:9000/movielens/processed/batch/movies_csv")

# Ratings en 1 unique CSV
ratings_final.coalesce(1) \
    .write \
    .option("header", True) \
    .mode("overwrite") \
    .csv("hdfs://namenode:9000/movielens/processed/batch/ratings_csv")

print("üéâ √âcriture CSV termin√©e")


üéâ √âcriture CSV termin√©e


## 6. Validation des Parquet

V√©rification rapide des `count()` et aper√ßu.


In [8]:
# Lecture des movies depuis CSV
df_movies = spark.read.csv(
    "hdfs://namenode:9000/movielens/processed/batch/movies_csv",
    header=True, inferSchema=True
)

# Lecture des ratings depuis CSV
df_ratings = spark.read.csv(
    "hdfs://namenode:9000/movielens/processed/batch/ratings_csv",
    header=True, inferSchema=True
)

print(f"‚úîÔ∏è Films CSV   : {df_movies.count()}")
print(f"‚úîÔ∏è Notes CSV   : {df_ratings.count()}")

df_movies.show(5, truncate=False)
df_ratings.show(5, truncate=False)


‚úîÔ∏è Films CSV   : 27278
‚úîÔ∏è Notes CSV   : 19730315
+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

+------+-------+------+-------------------+----+-----+---+
|userId|movieId|rating|timestamp          |year|month|day|