### L'objectif est creer un pipeline ADF qui :

- Sauvegarde (copy) un fichier CSV contenu dans ADLS 2 vers le dossier /bronze

- Invoque un notebook Databricks

- Le notebook ingére le fichier CSV source

- Transforme les données (nettoyage + filtre)

- Écrit les données finales au format Delta Lake + stockage dans ADLS

✅ Étapes dans Azure Data Factory\
🔹 Étape 1 : Créer les linked services
- 🔗 Azure Data Lake Storage Gen2 (source et destination)
- 🔗 Azure Databricks workspace

🔹 Étape 2 : Créer un pipeline ADF\
Contenu du pipeline :

- 🧱 1. Copy Data (optionnel, ingestion brute) Source : Fichier CSV dans /input/

- Sink :  /bronze/sauvegarde_loto_201902__.csv (pour sauvegarder avant traitement)

![Vue sur Azure Data Factory](https://drive.google.com/uc?export=view&id=1dc9jmBzeX_V1ChfJxeGWR4VL5rjvvwB3)
 




- Ce notebook lit le fichier loto_201902__.csv depuis Azure Data Lake Storage Gen2, filtre les tirages effectués uniquement les samedis entre le 1er janvier 2019 et le 31 décembre 2019, convertit les types de données nécessaires, et sauvegarde les résultats au format Delta Lake dans un chemin de sortie défini. 

- Ce traitement est utilisé dans un pipeline Azure Data Factory pour automatiser la chaîne d'ingestion et de transformation des données Loto.

In [0]:
storage_account_key = dbutils.secrets.get(scope="key_adls", key="storage_key")
# Configuration de l'accès au compte de stockage Azure comptestorage0000000001
spark.conf.set("fs.azure.account.key.comptestorage0000000001.dfs.core.windows.net", storage_account_key)

In [0]:
from pyspark.sql.functions import to_date, col, trim, upper

#Chemin vers le conteneur Azure "test"
 
dbutils.widgets.text("input_path", "abfss://test@comptestorage0000000001.dfs.core.windows.net/input/loto_201902__.csv")
dbutils.widgets.text("output_path", "abfss://test@comptestorage0000000001.dfs.core.windows.net/bronze/Bronze_loto_samedis_2019")

input_path = dbutils.widgets.get("input_path")
output_path = dbutils.widgets.get("output_path")

# Lecture CSV brut


df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("delimiter", ";")
    .load(input_path)
)

# Nettoyage et filtrage
df_cleaned = (
    df.withColumn("date_de_tirage", to_date("date_de_tirage", "dd/MM/yyyy"))
      .withColumn("date_de_forclusion", to_date("date_de_forclusion", "dd/MM/yyyy"))
      .withColumn("nombre_de_gagnant_au_rang1", col("nombre_de_gagnant_au_rang1").cast("int"))
      .withColumn("jour_de_tirage", trim(upper(col("jour_de_tirage"))))
      .filter(
          (col("date_de_tirage") >= "2019-01-01") &
          (col("date_de_forclusion") < "2020-01-01") &
          (col("jour_de_tirage") == "SAMEDI")
      )
)




In [0]:
# Sauvegarde en format Delta
df_cleaned.write.format("delta").mode("overwrite").save(output_path)