In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, lpad, concat, expr, row_number
from pyspark.sql.types import StringType
from pyspark.sql.window import Window
import os

# Arrêter la session Spark existante si nécessaire
try:
    spark.stop()
except:
    pass

# Initialisation de Spark
spark = SparkSession.builder.appName("Election Data Pipeline").getOrCreate()

# === Configurations ===
BRONZE_PATH = "/content/bronze"
SILVER_PATH = "/content/silver"
GOLD_PATH = "/content/gold"
CANDIDAT_PATH = f"{GOLD_PATH}/candidats"

# Créer les dossiers nécessaires
os.makedirs(BRONZE_PATH, exist_ok=True)
os.makedirs(SILVER_PATH, exist_ok=True)
os.makedirs(GOLD_PATH, exist_ok=True)
os.makedirs(CANDIDAT_PATH, exist_ok=True)

# === Fichiers Sources === (Assurez-vous d'avoir uploadé vos fichiers dans /content)
RAW_FILES = [
    ("/content/resultats-par-niveau-subcom-t1-france-entiere.csv", "2022", "T1"),
    ("/content/resultats-par-niveau-subcom-t2-france-entiere-_4_.csv", "2022", "T2"),
    ("/content/Presidentiel_2017_1erTour.csv", "2017", "T1"),
    ("/content/presidentielle_2017_Tour2.csv", "2017", "T2"),
    ("/content/presidentiel_2012_T1.csv", "2012", "T1"),
    ("/content/presidentiel_2012_t2.csv", "2012", "T2")
]

# Fonction nettoyage
def clean_columns(df):
    for col_name in df.columns:
        new_name = col_name.lower().strip().replace(" ", "_").replace("%", "pct").replace("/", "_").replace(".", "")
        df = df.withColumnRenamed(col_name, new_name)
    if "exprimés" in df.columns:
        df = df.withColumnRenamed("exprimés", "exprimes")
    return df


In [None]:
dfs_bronze = []
for file_path, year, tour in RAW_FILES:
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
    df = df.withColumn("ingestion_date", current_timestamp()) \
           .withColumn("annee", lit(year)) \
           .withColumn("tour", lit(tour))
    df.coalesce(1).write.mode("overwrite").parquet(f"{BRONZE_PATH}/election_{year}_{tour}.parquet")
    dfs_bronze.append(df)

print("✅ Étape 1 : BRONZE terminée")

In [None]:
dfs_silver = []
for df in dfs_bronze:
    df = clean_columns(df)
    df = df.dropna(how="all").dropDuplicates()

    if "votants" in df.columns and "inscrits" in df.columns:
        df = df.withColumn("taux_participation", (col("votants") / col("inscrits")))
    if "abstentions" in df.columns and "inscrits" in df.columns:
        df = df.withColumn("taux_abstention", (col("abstentions") / col("inscrits")))
    if "blancs" in df.columns and "votants" in df.columns:
        df = df.withColumn("taux_blancs", (col("blancs") / col("votants")))

    if "code_du_département" in df.columns and "code_de_la_commune" in df.columns:
        df = df.withColumn(
            "code_insee",
            concat(
                lpad(col("code_du_département").cast(StringType()), 2, "0"),
                lpad(col("code_de_la_commune").cast(StringType()), 3, "0")
            )
        )

    year, tour = df.select("annee").first()[0], df.select("tour").first()[0]
    df.coalesce(1).write.mode("overwrite").parquet(f"{SILVER_PATH}/election_{year}_{tour}.parquet")
    dfs_silver.append(df)

print("✅ Étape 2 : SILVER terminée")

In [None]:
gold_columns = ["code_du_département", "libellé_du_département", "inscrits", "abstentions", "votants", "exprimes", "annee", "tour", "taux_participation", "taux_abstention", "taux_blancs", "code_insee"]
gold_df = None
for df in dfs_silver:
    available_cols = [col for col in gold_columns if col in df.columns]
    current = df.select(*available_cols)
    gold_df = current if gold_df is None else gold_df.unionByName(current, allowMissingColumns=True)

gold_df.coalesce(1).write.mode("overwrite").parquet(f"{GOLD_PATH}/elections_unifiees.parquet")
print("✅ Étape 3 : GOLD terminée")
gold_df.show(10)

In [None]:
candidat_dfs = []
for df in dfs_silver:
    if all(c in df.columns for c in ["nom", "prénom", "voix", "exprimes"]):
        candidat_df = df.select(
            "code_insee", "annee", "tour", "nom", "prénom", "voix", "exprimes",
            "taux_participation", "taux_abstention", "taux_blancs"
        ).withColumn("nom_complet", expr("concat(`prénom`, ' ', nom)")) \
         .withColumn("pct_voix_exprimes", col("voix") / col("exprimes"))

        candidat_df = candidat_df.select(
            "code_insee", "annee", "tour", "nom_complet", "voix", "pct_voix_exprimes",
            "taux_participation", "taux_abstention", "taux_blancs"
        )

        candidat_dfs.append(candidat_df)

if candidat_dfs:
    candidats_all = candidat_dfs[0]
    for d in candidat_dfs[1:]:
        candidats_all = candidats_all.unionByName(d, allowMissingColumns=True)

    candidats_all.coalesce(1).write.mode("overwrite").parquet(f"{CANDIDAT_PATH}/resultats_par_candidat.parquet")
    print("✅ Étape 4 : CANDIDATS terminée")
    candidats_all.show(10)

print("🎉 Pipeline complet terminé avec succès")