In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!


StatementMeta(, 86f10469-4da0-427b-9917-860050d96b37, 3, Finished, Available, Finished)

In [8]:
# Lecture brute des fichiers CSV
df_danseurs = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/danseurs.csv")
df_inscriptions = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/inscriptions.csv")
df_tarifs = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/tarifs.csv")
df_formules = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/formule.csv")
df_durees = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/duree.csv")
df_danses = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/danse.csv")

# Nettoyage des noms de colonnes pour compatibilité Delta Lake
def clean_columns(df):
    for col_name in df.columns:
        new_col = col_name.strip().lower().replace(" ", "_").replace("é", "e").replace("è", "e").replace("ê", "e").replace("à", "a")
        df = df.withColumnRenamed(col_name, new_col)
    return df

# Nettoyage des colonnes
df_danseurs = clean_columns(df_danseurs)
df_inscriptions = clean_columns(df_inscriptions)
df_tarifs = clean_columns(df_tarifs)
df_formules = clean_columns(df_formules)
df_durees = clean_columns(df_durees)
df_danses = clean_columns(df_danses)

# Sauvegarde dans la zone de staging avec overwriteSchema
df_danseurs.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("stg_danseurs")
df_inscriptions.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("stg_inscriptions")
df_tarifs.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("stg_tarifs")
df_formules.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("stg_formules")
df_durees.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("stg_durees")
df_danses.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("stg_danses")


StatementMeta(, 69d68aa0-dbe6-4b8d-8ad7-13c008e1c206, 10, Finished, Available, Finished)

In [9]:
from pyspark.sql.functions import col, when, expr

# Chargement des tables staging
df_inscriptions = spark.table("stg_inscriptions")
df_tarifs = spark.table("stg_tarifs")
df_formules = spark.table("stg_formules")
df_durees = spark.table("stg_durees")
df_danses = spark.table("stg_danses")
df_danseurs = spark.table("stg_danseurs")

# Création des dimensions brutes
df_danses.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_danse")
df_formules.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_formule")
df_durees.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_duree")
df_danseurs.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_danseur")

# Enrichissement des tarifs
df_tarifs_enrichis = df_tarifs \
    .join(df_formules, "codeformule", "left") \
    .join(df_durees, df_tarifs["codeduree"] == df_durees["code_duree"], "left") \
    .join(df_danses, "codedanse", "left")

# Sauvegarde des tarifs enrichis
df_tarifs_enrichis.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_tarif")

# Enrichissement des inscriptions avec les tarifs
df = df_inscriptions.join(df_tarifs_enrichis, "idtarif", "left")

# Nettoyage et typage
df = df.withColumn("nbcourslimite", col("nbcourslimite").cast("int")) \
       .withColumn("nb_semaines", col("nb_semaines").cast("int")) \
       .withColumn("tarif", col("tarif").cast("double")) \
       .withColumn("dateinscription", col("dateinscription").cast("date"))

# Détermination du type de formule
df = df.withColumn(
    "typeformule",
    when(col("libelleformule").rlike("(?i)carnet"), "Carnet")
    .when(col("libelleformule").rlike("(?i)volonte"), "Hebdo")
    .otherwise("Hebdo")
)

# Calcul du nombre de séances
df = df.withColumn(
    "nbseances",
    when(col("typeformule") == "Carnet", col("nbcourslimite"))
    .when(col("libelleformule").rlike("(?i)volonte"), col("nb_semaines") * 5)
    .otherwise(col("nb_semaines") * col("nbcourslimite"))
)

# Calcul du chiffre d'affaires et des colonnes temporelles
df = df.withColumn("ca", col("tarif")) \
       .withColumn("mois", expr("month(dateinscription)")) \
       .withColumn("annee", expr("year(dateinscription)")) \
       .withColumn("tarifparseance", (col("tarif") / col("nbseances")).cast("double"))

# Sauvegarde dans la table de faits
df.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_fait_inscriptions")


StatementMeta(, 69d68aa0-dbe6-4b8d-8ad7-13c008e1c206, 11, Finished, Available, Finished)

In [11]:
from pyspark.sql.functions import col, expr, sequence, explode, to_date, date_format, dayofweek

# Définir la plage de dates
start_date = "2024-08-01"
end_date = "2025-07-31"

# Créer une séquence de dates
df_dates = spark.sql(f"""
    SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) as date
""")

# Ajout des colonnes utiles
df_dates = df_dates.withColumn("jour", expr("day(date)")) \
                   .withColumn("mois", expr("month(date)")) \
                   .withColumn("annee", expr("year(date)")) \
                   .withColumn("semaine", expr("weekofyear(date)")) \
                   .withColumn("jour_semaine", date_format(col("date"), "EEEE")) \
                   .withColumn("mois_nom", date_format(col("date"), "MMMM")) \
                   .withColumn("trimestre", expr("quarter(date)")) \
                   .withColumn("annee_mois", date_format(col("date"), "yyyy-MM")) \
                   .withColumn("jour_semaine_num", dayofweek(col("date"))) \
                   .withColumn("est_weekend", expr("case when dayofweek(date) in (1, 7) then true else false end"))

# Sauvegarde dans la zone Gold
df_dates.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_date")


StatementMeta(, 69d68aa0-dbe6-4b8d-8ad7-13c008e1c206, 13, Finished, Available, Finished)

In [17]:
from pyspark.sql.functions import col, lit, current_timestamp, max as spark_max
from pyspark.sql.utils import AnalysisException

# Lecture et nettoyage du fichier danseurs
df_staging = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv("Files/raw/danseurs.csv")

def clean_columns(df):
    for col_name in df.columns:
        new_col = col_name.strip().lower().replace(" ", "_").replace("é", "e").replace("è", "e").replace("ê", "e").replace("à", "a")
        df = df.withColumnRenamed(col_name, new_col)
    return df

df_staging = clean_columns(df_staging)

# Ajouter les colonnes SCD
df_staging_clean = df_staging.withColumn("date_debut", current_timestamp()) \
    .withColumn("date_fin", lit(None).cast("timestamp")) \
    .withColumn("version", lit(1)) \
    .withColumn("is_actif", lit(True))

# Charger ou créer df_existing
try:
    df_existing = spark.table("dw_dim_danseur")
except AnalysisException:
    df_existing = spark.createDataFrame([], df_staging_clean.schema)

# Ajouter colonnes SCD manquantes si besoin
for col_name, dtype in [("date_debut", "timestamp"), ("date_fin", "timestamp"), ("version", "int"), ("is_actif", "boolean")]:
    if col_name not in df_existing.columns:
        df_existing = df_existing.withColumn(col_name, lit(None).cast(dtype))

# Clé et colonnes suivies
cle = "noclient"
champs_suivis = ["adresse", "cp", "ville", "pub"]

# Jointure pour détection
df_joined = df_staging_clean.alias("new").join(
    df_existing.alias("old").filter(col("old.is_actif") == True),
    on=cle,
    how="left"
)

# Condition de changement
condition_diff = " OR ".join([f"new.{c} != old.{c}" for c in champs_suivis])

# Séparation des cas
df_changed = df_joined.filter(condition_diff).select("new.*")
df_unchanged = df_joined.filter(f"NOT ({condition_diff}) AND old.{cle} IS NOT NULL").select("old.*")

# Fermer les versions précédentes → ne garder QUE les colonnes de df_existing
df_closed_old = df_existing.alias("old").join(df_changed.select(cle).distinct(), cle, "inner") \
    .filter(col("old.is_actif") == True) \
    .drop("date_fin", "is_actif") \
    .withColumn("date_fin", current_timestamp()) \
    .withColumn("is_actif", lit(False))

# Créer les nouvelles versions
df_new_versions = df_changed.join(
    df_existing.groupBy(cle).agg(spark_max("version").alias("max_version")),
    cle, "left"
).withColumn("version", col("max_version") + 1).drop("max_version")

# Union de toutes les lignes
df_final = df_existing.filter("is_actif = false") \
    .unionByName(df_unchanged, allowMissingColumns=True) \
    .unionByName(df_closed_old, allowMissingColumns=True) \
    .unionByName(df_new_versions, allowMissingColumns=True)

# Sauvegarde
df_final.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_danseur")

StatementMeta(, 69d68aa0-dbe6-4b8d-8ad7-13c008e1c206, 19, Finished, Available, Finished)

In [18]:
from pyspark.sql.functions import col, lit

# Charger les données sources
df_formules = spark.table("stg_formules")

# Nettoyer les colonnes (comme pour les autres)
def clean_columns(df):
    for col_name in df.columns:
        new_col = col_name.strip().lower().replace(" ", "_").replace("é", "e").replace("è", "e").replace("ê", "e").replace("à", "a")
        df = df.withColumnRenamed(col_name, new_col)
    return df

df_formules = clean_columns(df_formules)

# SCD Type 1 = remplacement pur
df_formules.write.option("overwriteSchema", "true").mode("overwrite").saveAsTable("dw_dim_formule")


StatementMeta(, 69d68aa0-dbe6-4b8d-8ad7-13c008e1c206, 20, Finished, Available, Finished)