# **Couche Bronze – Ingestion des Données Brutes**
Ce notebook ingère le jeu de données brut *Online Retail II* à partir d’un volume géré dans le Lakehouse Databricks.  
Il utilise Pandas pour lire le fichier Excel, ajoute des métadonnées d’audit, et sauvegarde les données brutes au format Delta sous forme de table `bronze_sales` pour un traitement ultérieur dans la couche Silver.

In [ ]:
# Install openpyxl to enable reading Excel files with pandas
%pip install openpyxl
# Step 1: Importing pandas to read the Excel file
import pandas as pd

# Step 2: definition of the full path to the uploaded Excel file in your volume
file_path = "/Volumes/workspace/default/ecommerce_demo/online_retail_II.xlsx"

# Step 3: Read the Excel file using pandas
pdf = pd.read_excel(file_path)

# Step 4: Show the first 5 rows of data
pdf.head()


## Étape 2 : Conversion en DataFrame Spark

Le fichier Excel brut a été chargé avec succès dans une DataFrame Pandas (`pdf`).  
Il est maintenant converti en DataFrame Spark (`df_raw`) afin de permettre le traitement distribué des données avec Apache Spark.  
Cette transformation prépare le jeu de données pour les opérations ETL en aval et son stockage au format Delta Lake dans la couche Bronze.

In [ ]:
# Convert all columns to string type to avoid mixed-type conversion issues
pdf_clean = pdf.astype(str)

# Now safely convert to Spark DataFrame
df_raw = spark.createDataFrame(pdf_clean)

# Preview the result
df_raw.display()


## Étape 3 : Ajouter une colonne d’audit d’ingestion

Pour assurer la traçabilité des données, une colonne supplémentaire appelée `ingested_at` est ajoutée à l’ensemble de données.  
Cette colonne enregistre la date et l’heure exactes de l’ingestion des données dans l’environnement Databricks.  
Inclure ce type de métadonnées est une bonne pratique standard dans les pipelines de données pour faciliter le débogage, le versionnement et l’historisation.

In [ ]:
from pyspark.sql.functions import current_timestamp

# Add an ingestion timestamp column to the Spark DataFrame
df_bronze = df_raw.withColumn("ingested_at", current_timestamp())

# Display the result with the audit column
df_bronze.display()


## ✏️ Étape 3.1 : Ajuster les noms de colonnes pour la compatibilité Delta

Pour se conformer aux contraintes techniques de Delta Lake, les noms de colonnes contenant des espaces ou des caractères spéciaux ont été ajustés.  
Plus précisément, les espaces sont remplacés par des underscores (ex. `Customer ID` → `Customer_ID`). Cela garantit la compatibilité avec les règles de validation de schéma de Delta Lake.

Il est important de noter qu’aucune valeur des données n’a été modifiée au cours de ce processus.  
L’intégrité des données brutes est préservée tout en les rendant compatibles avec le format Delta.

In [ ]:
# Step: Clean column names by replacing spaces with underscores
df_bronze = df_bronze.toDF(*[col.replace(" ", "_") for col in df_bronze.columns])

# Display to confirm updated column names
df_bronze.display()

## Étape 4 : Sauvegarde sous forme de table Delta Bronze

La DataFrame Spark, désormais enrichie d’une colonne `ingested_at`, est sauvegardée de manière persistante au format Delta dans une table nommée `bronze_sales`.  
Cela marque l’achèvement de la couche Bronze, dans laquelle les données brutes sont capturées telles quelles pour des raisons d’audit et de traçabilité.  
En utilisant Delta Lake, les données bénéficient des fonctionnalités de contrôle de version, de validation de schéma et de Time Travel.

In [ ]:
# Save the final DataFrame as a Delta Table (Bronze layer)
df_bronze.write.format("delta").mode("overwrite").saveAsTable("bronze_sales")
