# 📒 Notebook : 05_gold_star_schema

---

## 📝 Objectif

- Construire la couche GOLD du modèle en étoile pour l’analyse BI.
- Générer, à partir des Silver, : `fact_df` + toutes les dimensions.
- Stocker toutes les tables Gold prêtes pour dashboards et reporting.

## 📤 Inputs

| Table           | Format     | Nom de Table           | Description                    |
|-----------------|-----------|------------------------|---------------------------------|
| icd_code        | Delta      | silver_icd_code        | Table silver catégorie de maladie          |
| hopital      | Delta      | silver_hopital         | Table silver hopital (concaténation des 4 df)    |
| meds_code        | Delta      | silver_meds_code        | Table silver catégorie de médicament          |
| motifs_code        | Delta      | silver_motifs_code        | Table silver catégorie de motifs d'admission          |

## 📤 Outputs

| Table           | Format     | Nom de Table           | Description                    |
|-----------------|-----------|------------------------|---------------------------------|
| dim_patient      | Delta      | dim_patient         | Table gold dimension patient    |
| dim_maladie       | Delta      | dim_maladie         | Table gold dimension maladie    |
| dim_motif_admission       | Delta      | dim_motif_admission         | Table gold dimension motif admission    |
| dim_medicament       | Delta      | dim_medicament         | Table gold dimension médicament    |
| dim_hopital       | Delta      | dim_hopital         | Table gold dimension hopital    |
| dim_temps      | Delta      | dim_temps         | Table gold dimension temps    |
| fact_consultation       | Delta      | fact_df         | Table gold table des faits    |



---

## 🗓️ Versioning & Mise à jour

| Version | Date         | Modifications                            |
|---------|--------------|------------------------------------------|
| 1.0     | 2025-05-06   | Création du notebook Gold complet        |

---


In [4]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 1. INIT SPARK & LOAD SILVER ------------------------------------------------
spark = SparkSession.builder.appName("StarSchemaHealthcare").getOrCreate()

# Lecture données silver
silver_df_hopital = spark.read.format("delta").load("Files/silver/hopital")
silver_df_icd_code = spark.read.format("delta").load("Files/silver/icd_code")
silver_df_meds_code = spark.read.format("delta").load("Files/silver/meds_code")
silver_df_motifs_code = spark.read.format("delta").load("Files/silver/motifs_code")

display(silver_df_hopital.head(10))
display(silver_df_icd_code.head(10))
display(silver_df_meds_code.head(10))
display(silver_df_motifs_code.head(10))

StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3dca46e2-bd08-410e-a7b6-58c093ef710f)

SynapseWidget(Synapse.DataFrame, b154694a-ca0f-4f77-9034-4a6f0475157b)

SynapseWidget(Synapse.DataFrame, de96118b-f38a-4cab-8bb0-411991abe73a)

SynapseWidget(Synapse.DataFrame, 90df38dd-6666-4b6c-8011-19312302ddce)

In [5]:
#Création des dimensions

# DIMENSION PATIENT ------------------------------------------------------
dim_patient = (
    silver_df_hopital
        .select('patient_id','dep_name','age', 'gender', 'profile_id','consultation_id','ethnicity','race','lang','religion','maritalstatus','employstatus','insurance_status','disposition','arrivalmode','previousdispo','n_surgeries','n_edvisits','n_admissions','year_fictive','arrivalmonth','arrivalday','arrivalhour_bin')
        .dropDuplicates()
)

dim_patient = dim_patient.withColumnRenamed("dep_name", "hopital_nom")

dim_patient = dim_patient.withColumn(
    "timestamp_id",
    F.concat_ws('_', F.col('year_fictive'), F.col('arrivalmonth'), F.col('arrivalday'), F.col('arrivalhour_bin'))
)

dim_patient = (
    dim_patient
        .select('patient_id','hopital_nom','age', 'gender', 'profile_id','consultation_id','ethnicity','race','lang','religion','maritalstatus','employstatus','insurance_status','disposition','arrivalmode','previousdispo','n_surgeries','n_edvisits','n_admissions','timestamp_id')
        .dropDuplicates()
)

#Afficher un aperçu
display(dim_patient.head(20))

from pyspark.sql.functions import col


#SCD0

# 1. Charger la table patient existante (si elle existe, sinon créer un DataFrame vide)
try:
    existing_dim_patient = spark.read.format("delta").load("Files/gold/dim_patient/")
except:
    # Si pas de données existantes, DataFrame vide avec même schéma
    existing_dim_patient = spark.createDataFrame([], dim_patient.schema)

# 2. Déterminer les patients qui sont nouveaux (= absents dans la table cible)
new_patients = (
    dim_patient.join(
        existing_dim_patient.select("patient_id"), 
        on="patient_id", 
        how="left_anti"
    )
)

# 3. Ajouter les nouveaux (SCD0: jamais de modification des anciens !)
new_patients.write.format("delta").mode("append").save("Files/gold/dim_patient/")

#Afficher un aperçu
display(dim_patient.head(20))


StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ea2ad63d-ff2b-4f95-98af-a53e79d31cb0)

SynapseWidget(Synapse.DataFrame, 8921e4e5-094d-470f-898a-b6252bc230f3)

In [9]:
# DIMENSION MALADIE - enrichie via mapping ICD 
from pyspark.sql.functions import explode
from delta.tables import DeltaTable

output_path = "Files/gold/dim_maladie/"

# 1. Construction de la dimension maladie
from pyspark.sql.functions import explode


# 1. "Exploser" les listes : une ligne par code maladie
maladie_exploded = silver_df_hopital\
    .withColumn("diagnosis", explode("diagnosis_list")) \

# 2. Dimension maladie : couple patient_id, diagnosis (optionnellement distinct)
dim_maladie = maladie_exploded.select("consultation_id","patient_id", "diagnosis").distinct()

# 3. Enrichir avec description, catégorie, etc.
dim_maladie = dim_maladie.join(silver_df_icd_code, on="diagnosis", how="left")

dim_maladie = dim_maladie.withColumnRenamed("diagnosis", "maladie")

consultation = silver_df_hopital.select("consultation_id").distinct()
dim_maladie = consultation.join(dim_maladie, on="consultation_id", how="left")

# 2. Upsert/merge SCD1 (création si first run, update sinon)
if DeltaTable.isDeltaTable(spark, output_path):
    deltaTable = DeltaTable.forPath(spark, output_path)
    (
        deltaTable.alias("target")
        .merge(
            source = dim_maladie.alias("source"),
            condition = "target.maladie = source.maladie"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    dim_maladie.write.format("delta").mode("overwrite").save(output_path)

#Afficher un aperçu
display(dim_maladie.head(20))


StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 11, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8bfd386e-c2b0-463a-8c14-8f3df171539d)

In [11]:
# DIMENSION TEMPS
dim_temps = (
    silver_df_hopital
      .select(
          F.concat_ws('_', F.col('year_fictive'), F.col('arrivalmonth'), F.col('arrivalday'), F.col('arrivalhour_bin')).alias('timestamp_id'),
          F.col('year_fictive').alias('year'),
          F.col('month_num').alias('month_num'),
          F.col('arrivalday').alias('day'),
          F.col('arrivalmonth').alias('month'),
          F.col('weekday_num').alias('weekday'),
          F.col('arrivalhour_bin').alias('arrivalhour_bin'),
          F.split(F.col('arrivalhour_bin'), '-')[0].cast('int').alias('hour_bin_start'),
          F.col('hour_bin_end')
      ).dropDuplicates()
)

#Afficher un aperçu
display(dim_temps.head(20))

dim_temps.write.format("delta").mode("overwrite").save("Files/gold/dim_temps/")


StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ec9ccd5c-c9f0-40e5-bc06-3d3b57367ec8)

In [14]:
# DIMENSION HOPITAL 
dim_hopital = (
    silver_df_hopital
    .select("consultation_id","patient_id", "dep_name")
    .withColumnRenamed("dep_name", "hopital_id")
    .withColumn("hopital_name", F.when(F.col("hopital_id") == "A", "Hopital Alpha")
                                   .when(F.col("hopital_id") == "B", "Hopital Beta")
                                   .when(F.col("hopital_id") == "C", "Hopital Gamma")
                                   .otherwise(F.col("hopital_id")))
)


#Afficher un aperçu
display(dim_hopital.head(20))

dim_hopital.write.format("delta").mode("overwrite").save("Files/gold/dim_hopital/")

StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a042fe9e-7249-4f17-80c7-19b0bfdb87c7)

In [16]:
# DIMENSION MOTIF ADMISSION enrichie mapping via motifs_code
from pyspark.sql.functions import explode

motif_exploded = silver_df_hopital.withColumn("Motif", explode("motif_list"))

dim_motif_admission = motif_exploded.select("consultation_id","patient_id","Motif").distinct()

# Lecture du fichier csv 
df_motifs = spark.read.csv('Files/raw/motif_categorie.csv', header=True, sep=';')


# 3. Enrichir avec description, catégorie.
dim_motif_admission = dim_motif_admission.join(df_motifs, on="Motif", how="left")

dim_motif_admission = dim_motif_admission.withColumnRenamed("Motif", "motif_admission")

consultation = silver_df_hopital.select("consultation_id").distinct()
dim_motif_admission = consultation.join(dim_motif_admission, on="consultation_id", how="left")

#Afficher un aperçu
display(dim_motif_admission.head(20))

dim_motif_admission.write.format("delta").mode("overwrite").save("Files/gold/dim_motif_admission/")


StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 18, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4feee4d9-4b32-4cb9-b265-ec8d898a9485)

In [17]:
# DIMENSION MEDICAMENT mapping via meds_code
from pyspark.sql.functions import explode

meds_exploded = silver_df_hopital.withColumn("medicament_code", explode("meds_list"))

dim_medicament = meds_exploded.select("consultation_id","patient_id","medicament_code").distinct()


# Lecture du fichier csv 
df_meds = spark.read.csv('Files/raw/meds_cat.csv', header=True, sep=";")


# 3. Enrichir avec description, catégorie.
dim_medicament = dim_medicament.join(df_meds, on="medicament_code", how="left")

dim_medicament = dim_medicament.withColumnRenamed("medicament_code", "medicament")

consultation = silver_df_hopital.select("consultation_id").distinct()
dim_medicament = consultation.join(dim_medicament, on="consultation_id", how="left")

#Afficher un aperçu
display(dim_medicament.head(20))

dim_medicament.write.format("delta").mode("overwrite").save("Files/gold/dim_medicament/")

StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e8d5918b-ee25-40f3-9027-d302d5ef6bb9)

In [18]:
from pyspark.sql import functions as F
from pyspark.sql.functions import explode

df = dim_patient \
    .join(dim_maladie,    "consultation_id", "left") \
    .join(dim_medicament, "consultation_id", "left") \
    .join(dim_motif_admission, "consultation_id", "left") \
    .join(dim_hopital,    "consultation_id", "left") \
    .join(dim_temps,      "timestamp_id", "left")


# 2. Jointure avec TOUTES LES DIMENSIONS avec alias pour chaque !
fact_consultation = df \
    .join(dim_patient.alias("dp"), 'patient_id', 'left') \
    .join(dim_hopital.alias("dh"), df.hopital_nom == F.col("dh.hopital_id"), 'left') \
    .join(dim_maladie.alias("dm"), df.maladie == F.col("dm.maladie"), 'left') \
    .join(dim_medicament.alias("dmed"), df.medicament == F.col("dmed.medicament"), 'left') \
    .join(dim_motif_admission.alias("dma"), df.motif_admission == F.col("dma.motif_admission"), 'left') \
    .join(dim_temps.alias("dt"), df.timestamp_id == F.col("dt.timestamp_id"), 'left') \



# 4. Sélection propre : on référence CHAQUE ALIAS explicitement
#fact_consultation = fact_consultation.select(
    #F.col("consultation_id"),                    # identifiant visite
    #F.col("patient_id"),                  # identifiant patient
    #F.col("hopital_id").alias("hopital_id"),
    #F.col("disposition_id"),
    #F.col("arrival_id").alias("arrival_mode_id"),
    #F.col("sortie_precedente_id").alias("previous_dispo_id"),
    #F.col("timestamp_id").alias("date_id"),        # identifiant temps
    #F.col('n_surgeries').alias('nb_chirurgies'),
    #F.col('n_edvisits').alias('nb_visited_urgences'),
    #F.col('n_admissions').alias('nb_admissions'),
#)

#table de faits SANS AMBIGUITE et compatible BI !
display(fact_consultation.head(50))


#Exporter la table de faits
#fact_consultation.write.format("delta").mode("overwrite").save("Files/gold/fact_consultation/")


StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 20, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 055bf61e-3425-436d-b0ee-2d9e7ea04206)

In [25]:
from pyspark.sql import functions as F

maladies_grouped = dim_maladie.groupBy("consultation_id").agg(
    F.collect_set("maladie").alias("maladies")
)
motifs_grouped = dim_motif_admission.groupBy("consultation_id").agg(
    F.collect_set("motif_admission").alias("motifs_admission")
)
medicaments_grouped = dim_medicament.groupBy("consultation_id").agg(
    F.collect_set("medicament").alias("medicaments")
)

from functools import reduce

dfs = [maladies_grouped, motifs_grouped, medicaments_grouped]
from pyspark.sql import DataFrame
consultation_agg = reduce(
    lambda df1, df2: df1.join(df2, on="consultation_id", how="full"),
    dfs
)

display(consultation_agg.limit(10))



StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f3dbb469-8aff-459d-837b-e97068d62dfd)

In [19]:
from pyspark.sql import functions as F

# On compte le nombre de lignes pour chaque consultation_id
lines_per_consult = (
    dim_maladie
      .groupBy("consultation_id")
      .agg(F.count("*").alias("nb_lignes"))
      .orderBy(F.desc("nb_lignes"))
)

# Affichage (exemple : top 20)
lines_per_consult.show(20)


StatementMeta(, c958734e-6346-44f9-ad3c-c8faaf3c28cf, 21, Finished, Available, Finished)

+---------------+---------+
|consultation_id|nb_lignes|
+---------------+---------+
|          64747|       75|
|         131838|       68|
|         299317|       68|
|         232762|       68|
|         418149|       68|
|         323309|       45|
|         339766|       45|
|         223069|       45|
|         339951|       44|
|         418661|       43|
|         234706|       42|
|         217367|       42|
|         171816|       41|
|         391086|       41|
|         506929|       41|
|         326635|       39|
|         344337|       39|
|         461010|       39|
|           7088|       39|
|         460859|       39|
+---------------+---------+
only showing top 20 rows



In [None]:
#Création dossier warehouse pour github

dim_patient.write.mode("overwrite").parquet("Files/warehouse/dim_patient.parquet")
dim_maladie.write.mode("overwrite").parquet("Files/warehouse/dim_maladie.parquet")
dim_medicament.write.mode("overwrite").parquet("Files/warehouse/dim_medicament.parquet")
dim_motif_admission.write.mode("overwrite").parquet("Files/warehouse/dim_motif_admission.parquet")
dim_hopital.write.mode("overwrite").parquet("Files/warehouse/dim_hopital.parquet")
dim_temps.write.mode("overwrite").parquet("Files/warehouse/dim_temps.parquet")
fact_consultation.write.mode("overwrite").parquet("Files/warehouse/fact_consultation.parquet")

StatementMeta(, 4b587929-a8d7-46da-bb13-2cf24d38b0ff, -1, Cancelled, , Cancelled)

In [None]:
dim_patient.printSchema()
dim_maladie.printSchema()
dim_medicament.printSchema()
dim_motif_admission.printSchema()
dim_hopital.printSchema()
dim_temps.printSchema()
fact_consultation.printSchema()

StatementMeta(, 4b587929-a8d7-46da-bb13-2cf24d38b0ff, -1, Cancelled, , Cancelled)