## 🎯 Objectif du notebook

Ce notebook constitue la **première étape** de notre pipeline de traitement de données.  
Son objectif principal est de :
- Lire les données brutes (Bronze) du fichier CSV `global_health.csv` issu de Kaggle,
- Réaliser une première **analyse exploratoire rapide** (schéma, valeurs nulles, statistiques),
- Nettoyer les noms de colonnes et filtrer les lignes incomplètes,
- Sauvegarder les données nettoyées au format Delta dans la **zone Silver**,
- Enregistrer un log technique de l’ingestion dans une table d’audit.


In [16]:
# Lecture du fichier brut depuis la zone Bronze
df_raw = spark.read.option("header", True).option("inferSchema", True).csv("Files/Bronze/global_health.csv")

# Schéma des colonnes
df_raw.printSchema()

# Nombre total de lignes
print("Nombre de lignes :", df_raw.count())

# Statistiques de base
display(df_raw.describe())

# Valeurs nulles par colonne
from pyspark.sql.functions import col, isnan, when, count

df_raw = df_raw.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c)
    for c in df_raw.columns
])

display(df_raw.limit(10))

StatementMeta(, ed865f0b-697d-4b0c-b607-1c07c543e08f, 18, Finished, Available, Finished)

root
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Disease Name: string (nullable = true)
 |-- Disease Category: string (nullable = true)
 |-- Prevalence Rate (%): double (nullable = true)
 |-- Incidence Rate (%): double (nullable = true)
 |-- Mortality Rate (%): double (nullable = true)
 |-- Age Group: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Population Affected: integer (nullable = true)
 |-- Healthcare Access (%): double (nullable = true)
 |-- Doctors per 1000: double (nullable = true)
 |-- Hospital Beds per 1000: double (nullable = true)
 |-- Treatment Type: string (nullable = true)
 |-- Average Treatment Cost (USD): integer (nullable = true)
 |-- Availability of Vaccines/Treatment: string (nullable = true)
 |-- Recovery Rate (%): double (nullable = true)
 |-- DALYs: integer (nullable = true)
 |-- Improvement in 5 Years (%): double (nullable = true)
 |-- Per Capita Income (USD): integer (nullable = true)
 |-- Education I

SynapseWidget(Synapse.DataFrame, a1a28102-0c8b-4d19-8f7a-2408745f14d5)

SynapseWidget(Synapse.DataFrame, 5addf934-943b-4619-a91e-2a442f3586e1)

In [4]:
# 1. Import des bibliothèques nécessaires
from pyspark.sql.functions import col
import re

# 2. Fonction de nettoyage des noms de colonnes
def clean_column_name(name):
    name = name.strip().lower().replace(" ", "_")           # remplace les espaces par des _
    name = re.sub(r'[^\w]', '', name)                       # enlève les caractères non alphanumériques
    return name

# 3. Chargement des données Bronze
df_bronze = spark.read.option("header", True).option("inferSchema", True).csv("Files/Bronze/global_health.csv")

# 4. Nettoyage des colonnes + valeurs nulles
df_clean = df_bronze.dropna(how="any")
df_clean = df_clean.select([col(c).alias(clean_column_name(c)) for c in df_clean.columns])


# 5. Sauvegarde en Silver (format Delta + table dans Lakehouse)
df_clean.write.format("delta").mode("overwrite").save("Files/silver/global_health_clean")
df_clean.write.mode("overwrite").saveAsTable("global_health_silver")


print(" Données brutes nettoyées et sauvegardées dans la zone Silver")


# 6. Affichage en aperçu
display(df_clean.limit(10))


StatementMeta(, ed865f0b-697d-4b0c-b607-1c07c543e08f, 6, Finished, Available, Finished)

 Données brutes nettoyées et sauvegardées dans la zone Silver


SynapseWidget(Synapse.DataFrame, 7380c3ee-8aae-481f-8159-b67fe224da9c)

In [5]:
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.types import *
from datetime import datetime

# Étapes techniques
nb_rows = df_bronze.count()
nb_cols = len(df_bronze.columns)
schema = str(df_bronze.schema)
columns = ", ".join(df_bronze.columns)

# Comptage des valeurs nulles
null_counts = df_bronze.select([col(c).isNull().cast("int").alias(c) for c in df_bronze.columns]).agg(
    *[_sum(col(c)).alias(c) for c in df_bronze.columns]
).toPandas().to_dict(orient="records")[0]

# Horodatage via Python (et non PySpark)
now = datetime.now()

# Log technique
log_data = [(now, "Global_Health.csv", nb_rows, nb_cols, columns, schema, str(null_counts))]
log_schema = StructType([
    StructField("ingestion_time", TimestampType(), True),
    StructField("source_file", StringType(), True),
    StructField("row_count", IntegerType(), True),
    StructField("column_count", IntegerType(), True),
    StructField("columns", StringType(), True),
    StructField("schema", StringType(), True),
    StructField("nulls_by_column", StringType(), True)
])

df_log = spark.createDataFrame(log_data, schema=log_schema)

# Enregistrement
df_log.write.mode("append").saveAsTable("bronze_ingestion_log")

# Affichage
display(df_log)


StatementMeta(, 0cf5c9b6-40aa-4306-a09c-e5ab73446aa6, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 39ffb48b-fe26-4cb5-a0d9-5999232225e7)