# GOLD LAYER - Mod√®le Dimensionnel (Star Schema)

**Flux ETL** : Bronze (brut) ‚Üí Silver (nettoy√©) ‚Üí **Gold (m√©tier)**

**Auteurs** : Nejma MOUALHI | Brieuc OLIVIERI | Nicolas TAING

---

## Objectifs

### Cr√©er un mod√®le en √©toile (Star Schema) optimis√© :
1. **5 Dimensions** : Temps, Patient, Diagnostic, Professionnel, √âtablissement
2. **4 Tables de Faits** : Consultations, Hospitalisations, D√©c√®s, Satisfaction
3. **Partitionnement** temporel (ann√©e/mois)
4. **Optimisations** Spark SQL

## Architecture du mod√®le
Exemple FAIT_CONSULTATION
```
         dim_temps
             |
dim_patient --+-- FAIT_CONSULTATION -- dim_professionnel
             |
        dim_diagnostic
             |
      dim_etablissement
```

In [1]:
# Imports
import subprocess
subprocess.run(["rm", "-rf", "/home/jovyan/data/gold/*"], shell=True)
print("Imports loaded successfully")
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import time
from pyspark.sql import functions as F


Imports loaded successfully


In [2]:
# Configuration Spark avec optimisations avanc√©es
spark = SparkSession.builder \
    .appName("CHU_Gold_Star_Schema") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "10485760") \
    .getOrCreate()

print(f"Spark {spark.version} started with optimizations")

# Paths via environment (dynamic, no hardcoding)
import os
DATA_BASE = os.getenv('DATA_BASE', '/home/jovyan/data')
SILVER_BASE = f"{DATA_BASE}/silver"
GOLD_OUTPUT = f"{DATA_BASE}/gold"
BRONZE_BASE = f"{DATA_BASE}/bronze"

print(f"Source: {SILVER_BASE} (SILVER - cleaned data)")
print(f"Destination: {GOLD_OUTPUT}")


Spark 3.5.0 started with optimizations
Source: /home/jovyan/data/silver (SILVER - cleaned data)
Destination: /home/jovyan/data/gold


---

## ETAPE 1 : Cr√©ation des Dimensions

Les dimensions contiennent les attributs descriptifs pour l'analyse.

In [3]:
# Utility: safe delete of output directory before overwrite (handles locked folders)
import os, shutil
def rm_rf(path: str):
    try:
        if os.path.isdir(path):
            shutil.rmtree(path, ignore_errors=True)
        elif os.path.exists(path):
            os.remove(path)
    except Exception as e:
        print(f'Warning: could not pre-delete {path}: {e}')


In [6]:
# 1.1 DIMENSION TEMPS (g√©n√©r√©e, pas depuis Silver)
print("="*80)
print("DIMENSION: dim_temps")
print("="*80)

# Cr√©er une dimension temps compl√®te (2013-2025)
start_date = datetime(2013, 1, 1)
end_date = datetime(2025, 12, 31)
dates = []

current = start_date
while current <= end_date:
    dates.append((
        current.strftime("%Y%m%d"),     # id_temps
        current,                          # date
        current.year,                     # annee
        current.month,                    # mois
        (current.month - 1) // 3 + 1,     # trimestre
        current.strftime("%A"),           # jour_semaine
        current.strftime("%B"),           # nom_mois
        current.weekday() >= 5,           # est_weekend
        current.weekday()                 # numero_jour_semaine
    ))
    current += timedelta(days=1)

schema_temps = StructType([
    StructField("id_temps", StringType(), False),
    StructField("date_complete", DateType(), False),
    StructField("annee", IntegerType(), False),
    StructField("mois", IntegerType(), False),
    StructField("trimestre", IntegerType(), False),
    StructField("jour_semaine", StringType(), True),
    StructField("nom_mois", StringType(), True),
    StructField("est_weekend", BooleanType(), True),
    StructField("numero_jour_semaine", IntegerType(), True)
])

dim_temps = spark.createDataFrame(dates, schema=schema_temps)

print(f"{dim_temps.count():,} days created (2013-2025)")
dim_temps.show(5)

# Sauvegarde SANS partitionnement pour compatibilit√© Hive/Superset
dim_temps.write \
    .mode("overwrite") \
    .parquet(f"{GOLD_OUTPUT}/dim_temps")

print(f"Saved to: {GOLD_OUTPUT}/dim_temps (no partitioning)")
print(f"Note: No partitioning for Hive/Superset compatibility")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


DIMENSION: dim_temps


Py4JError: SparkSession does not exist in the JVM

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [None]:
# 1.2 DIMENSION PATIENT (depuis SILVER)
print("\n" + "="*80)
print("üî∑ DIMENSION: dim_patient")
print("="*80)

# Charger depuis SILVER (donn√©es d√©j√† anonymis√©es et nettoy√©es)
df_patient_silver = spark.read.parquet(f"{SILVER_BASE}/patient")

# S√©lection pour dimension (d√©j√† propre depuis Silver)
dim_patient = df_patient_silver.select(
    F.col("id_patient"),
    F.col("nom_hash"),         # D√©j√† anonymis√© en Silver
    F.col("prenom_hash"),      # D√©j√† anonymis√© en Silver
    F.col("sexe"),
    F.col("age"),
    F.col("date_naissance"),   # D√©j√† format√© en Silver
    F.col("ville"),
    F.col("code_postal"),
    F.col("pays"),
    F.col("groupe_sanguin")
)

print(f" {dim_patient.count():,} patients")
dim_patient.show(5, truncate=False)

# Sauvegarde
dim_patient.write \
    .mode("overwrite") \
    .parquet(f"{GOLD_OUTPUT}/dim_patient")

print(f" Sauvegard√©: {GOLD_OUTPUT}/dim_patient")

In [None]:
# 1.3 DIMENSION DIAGNOSTIC (depuis SILVER)
print("\n" + "="*80)
print("üî∑ DIMENSION: dim_diagnostic")
print("="*80)

df_diagnostic_silver = spark.read.parquet(f"{SILVER_BASE}/diagnostic")

# V√©rifier les colonnes disponibles
print(f"Colonnes disponibles: {df_diagnostic_silver.columns}")

dim_diagnostic = df_diagnostic_silver.select(
    F.col("Code_diag").alias("code_diag"),
    F.col("Diagnostic").alias("libelle"),
    # Ajout cat√©gorie CIM-10 (premi√®re lettre du code)
    F.col("Code_diag").substr(1, 1).alias("categorie")
).dropDuplicates(["code_diag"])

print(f" {dim_diagnostic.count():,} diagnostics")
dim_diagnostic.show(5, truncate=False)

dim_diagnostic.write \
    .mode("overwrite") \
    .parquet(f"{GOLD_OUTPUT}/dim_diagnostic")

print(f" Sauvegard√©: {GOLD_OUTPUT}/dim_diagnostic")

In [None]:
# 1.4 DIMENSION PROFESSIONNEL (depuis SILVER)
print("\n" + "="*80)
print("üî∑ DIMENSION: dim_professionnel")
print("="*80)

df_prof_silver = spark.read.parquet(f"{SILVER_BASE}/professionnel_de_sante")

# V√©rifier colonnes
print(f"Colonnes disponibles: {df_prof_silver.columns}")

dim_professionnel = df_prof_silver.select(
    F.col("Identifiant").alias("id_prof"),
    F.col("Nom").alias("nom"),
    F.col("Prenom").alias("prenom"),
    F.col("Code_specialite").alias("code_specialite")
).dropDuplicates(["id_prof"])

# Jointure avec sp√©cialit√©s (depuis Silver)
df_spec_silver = spark.read.parquet(f"{SILVER_BASE}/specialites")
print(f"Colonnes sp√©cialit√©s: {df_spec_silver.columns}")

dim_professionnel = dim_professionnel.join(
    df_spec_silver.select(
        F.col("Code_specialite"),
        F.col("Specialite").alias("nom_specialite")
    ),
    on="code_specialite",
    how="left"
)

print(f" {dim_professionnel.count():,} professionnels")
dim_professionnel.show(5, truncate=False)

dim_professionnel.write \
    .mode("overwrite") \
    .parquet(f"{GOLD_OUTPUT}/dim_professionnel")

print(f" Sauvegard√©: {GOLD_OUTPUT}/dim_professionnel")

In [None]:
# 1.5 DIMENSION ETABLISSEMENT (depuis SILVER + enrichi avec DEPARTEMENTS)
print("\n" + "="*80)
print("üî∑ DIMENSION: dim_etablissement (enrichie avec r√©gions/d√©partements)")
print("="*80)

df_etab_silver = spark.read.parquet(f"{SILVER_BASE}/etablissement_sante")

# Lire d√©partements depuis Bronze
df_dept = spark.read.parquet("/home/jovyan/data/bronze/csv/departements")
print(f" D√©partements charg√©s : {df_dept.count()} d√©partements")

# Cr√©er dimension de base
from pyspark.sql.functions import substring

dim_etablissement = df_etab_silver.select(
    F.col("finess_site").alias("finess"),
    F.col("siret_site").alias("siret"),
    F.col("raison_sociale").alias("nom"),
    F.col("commune").alias("ville"),
    F.col("code_postal"),
    F.col("telephone"),
    F.col("email"),
    # Extraire code d√©partement (2 premiers chiffres du code postal)
    F.substring(F.col("code_postal"), 1, 2).alias("code_departement")
).filter(
    F.col("finess").isNotNull()
).dropDuplicates(["finess"])

# Enrichir avec r√©gion/d√©partement
dim_etablissement = dim_etablissement.join(
    df_dept.select(
        F.col("num_departement"),
        F.col("libelle_departement"),
        F.col("libelle_region"),
        F.col("abv_region")
    ),
    dim_etablissement["code_departement"] == df_dept["num_departement"],
    "left"
)

print(f" {dim_etablissement.count():,} √©tablissements (enrichis avec r√©gions)")
dim_etablissement.show(5, truncate=False)

dim_etablissement.write \
    .mode("overwrite") \
    .parquet(f"{GOLD_OUTPUT}/dim_etablissement")

print(f" Sauvegard√©: {GOLD_OUTPUT}/dim_etablissement")
print(f"   - Colonnes g√©ographiques ajout√©es: libelle_departement, libelle_region, abv_region")

---

## √âTAPE 2 : Cr√©ation des Tables de Faits

Les faits contiennent les mesures et les cl√©s √©trang√®res vers les dimensions.

In [None]:
# 2.1 FAIT CONSULTATION (depuis SILVER)
print("\n" + "="*80)
print(" FAIT: fait_consultation")
print("="*80)

df_consultation_silver = spark.read.parquet(f"{SILVER_BASE}/consultation")

# V√©rification des colonnes disponibles
print(" Colonnes disponibles dans consultation Silver:")
for col_name in df_consultation_silver.columns:
    print(f"  - {col_name}")
print()

# Transformation pour fait avec les colonnes correctes (selon l'erreur pr√©c√©dente)
fait_consultation = df_consultation_silver.select(
    F.col("id_consultation"),
    F.col("id_patient"),
    F.col("id_professionnel").alias("id_prof"),
    F.col("id_diagnostic").alias("code_diag"),
    F.col("id_mutuelle"),  # Correction: "id_mutuelle" au lieu de "id_salle"
    
    # Cl√© temporelle
    F.date_format(F.col("date_consultation"), "yyyyMMdd").alias("id_temps"),
    F.col("date_consultation"),
    
    # Dimensions temporelles (pour partitionnement)
    F.col("annee"),
    F.col("mois"),
    F.col("jour"),
    
    # Mesures disponibles (selon donn√©es Silver)
    F.col("heure_debut"),
    F.col("heure_fin"),
    F.col("motif")  # Correction: "motif" au lieu de "notes"
)

print(f" {fait_consultation.count():,} consultations")
fait_consultation.show(5)

# Statistiques
print("\n Statistiques:")
fait_consultation.select(
    F.count("*").alias("total_consultations"),
    F.countDistinct("id_patient").alias("patients_uniques"),
    F.countDistinct("id_prof").alias("professionnels_uniques"),
    F.min("annee").alias("annee_min"),
    F.max("annee").alias("annee_max")
).show()

# Sauvegarde avec PARTITIONNEMENT par ann√©e et mois
print("\n Sauvegarde avec optimisations (partitionnement)...")
fait_consultation.write \
    .mode("overwrite") \
    .partitionBy("annee", "mois") \
    .parquet(f"{GOLD_OUTPUT}/fait_consultation")

print(f" Sauvegard√©: {GOLD_OUTPUT}/fait_consultation")
print(f"   - Partitionn√© par: annee, mois")
print(f"   - Format: Parquet compress√©")

In [None]:
# 2.2 FAIT D√âC√àS (depuis SILVER)
print("\n" + "="*80)
print(" FAIT: fait_deces")
print("="*80)

df_deces_silver = spark.read.parquet(f"{SILVER_BASE}/deces_2019")

fait_deces = df_deces_silver.select(
    F.monotonically_increasing_id().alias("id_deces"),
    
    # Identit√©s anonymis√©es (depuis Silver)
    F.col("nom_hash"),
    F.col("prenom_hash"),
    F.col("acte_deces_hash"),
    
    # Donn√©es d√©mographiques
    F.col("sexe"),
    F.col("date_naissance"),
    F.col("date_deces"),
    F.col("age_deces"),
    
    # Cl√© temporelle
    F.date_format(F.col("date_deces"), "yyyyMMdd").alias("id_temps"),
    F.col("annee_deces").alias("annee"),
    F.col("mois_deces").alias("mois"),
    
    # Lieux
    F.col("code_lieu_naissance"),
    F.col("lieu_naissance"),
    F.col("pays_naissance"),
    F.col("code_lieu_deces")
)

print(f" {fait_deces.count():,} d√©c√®s (2019)")
fait_deces.show(5)

# Statistiques
print("\nüìà Statistiques:")
fait_deces.select(
    F.count("*").alias("total_deces"),
    F.avg("age_deces").alias("age_moyen"),
    F.min("age_deces").alias("age_min"),
    F.max("age_deces").alias("age_max")
).show()

# Sauvegarde
fait_deces.write \
    .mode("overwrite") \
    .partitionBy("annee", "mois") \
    .parquet(f"{GOLD_OUTPUT}/fait_deces")

print(f" Sauvegard√©: {GOLD_OUTPUT}/fait_deces (partitionn√© par annee, mois)")

In [None]:
# 2.3 FAIT HOSPITALISATION (depuis tables AAAA + date - VRAIES DONN√âES)
print("\n" + "="*80)
print("FAIT: fait_hospitalisation (VRAIES DONN√âES)")
print("="*80)

# √âTAPE 1: Charger les tables Bronze AAAA et date
print("üìñ Chargement tables AAAA et date depuis Bronze...")
df_aaaa = spark.read.parquet(f"{SILVER_BASE}/../bronze/postgres/AAAA") \
    .drop("ingestion_timestamp", "ingestion_date")
df_date = spark.read.parquet(f"{SILVER_BASE}/../bronze/postgres/date") \
    .drop("ingestion_timestamp", "ingestion_date")

print(f"   - AAAA: {df_aaaa.count():,} lignes")
print(f"   - date: {df_date.count():,} lignes")

# √âTAPE 2: Ajouter row_id pour jointure par position (les 2 tables ont m√™me nb lignes)
from pyspark.sql.functions import monotonically_increasing_id, to_date, datediff

df_aaaa_idx = df_aaaa.withColumn("row_id", F.monotonically_increasing_id())
df_date_idx = df_date.withColumn("row_id", F.monotonically_increasing_id())

# √âTAPE 3: Jointure par position
df_hospit_raw = df_aaaa_idx.join(df_date_idx, "row_id", "inner")

print(f" Jointure: {df_hospit_raw.count():,} hospitalisations")

# √âTAPE 4: Transformation et nettoyage
fait_hospitalisation = df_hospit_raw.select(
    F.monotonically_increasing_id().alias("id_hospitalisation"),

    # Cl√©s √©trang√®res
    F.col("Num").alias("id_patient"),
    F.col("Code_diag").alias("code_diag"),

    # Dates (conversion dd/MM/yyyy ‚Üí date)
    to_date(F.col("date1"), "dd/MM/yyyy").alias("date_entree"),
    to_date(F.col("date2"), "dd/MM/yyyy").alias("date_sortie"),

    # Cl√©s temporelles
    F.date_format(to_date(F.col("date1"), "dd/MM/yyyy"), "yyyyMMdd").alias("id_temps_entree"),
    F.date_format(to_date(F.col("date2"), "dd/MM/yyyy"), "yyyyMMdd").alias("id_temps_sortie"),

    # Mesures
    F.datediff(
        to_date(F.col("date2"), "dd/MM/yyyy"),
        to_date(F.col("date1"), "dd/MM/yyyy")
    ).alias("duree_sejour_jours"),

    # Dimensions temporelles pour partitionnement
    F.year(to_date(F.col("date1"), "dd/MM/yyyy")).alias("annee"),
    F.month(to_date(F.col("date1"), "dd/MM/yyyy")).alias("mois")
)

# √âTAPE 5: Filtrer donn√©es invalides (dates nulles ou n√©gatives)
fait_hospitalisation = fait_hospitalisation.filter(
    (F.col("date_entree").isNotNull()) &
    (F.col("date_sortie").isNotNull()) &
    (F.col("duree_sejour_jours") >= 0)
)

print(f"{fait_hospitalisation.count():,} hospitalisations valides")
fait_hospitalisation.show(10)

# √âTAPE 6: Statistiques
print("\n Statistiques:")
fait_hospitalisation.select(
    F.count("*").alias("total_hospitalisations"),
    F.countDistinct("id_patient").alias("patients_uniques"),
    F.countDistinct("code_diag").alias("diagnostics_uniques"),
    F.avg("duree_sejour_jours").alias("duree_moyenne_jours"),
    F.min("duree_sejour_jours").alias("duree_min"),
    F.max("duree_sejour_jours").alias("duree_max"),
    F.min("annee").alias("annee_min"),
    F.max("annee").alias("annee_max")
).show()

# Distribution par ann√©e
print("\nüìÖ Distribution par ann√©e:")
fait_hospitalisation.groupBy("annee").agg(
    F.count("*").alias("nb_hospitalisations"),
    F.avg("duree_sejour_jours").alias("duree_moyenne")
).orderBy("annee").show()

# √âTAPE 7: Sauvegarde avec partitionnement
print("\nSauvegarde avec partitionnement par ann√©e et mois...")
fait_hospitalisation.write \
    .mode("overwrite") \
    .partitionBy("annee", "mois") \
    .parquet(f"{GOLD_OUTPUT}/fait_hospitalisation")

print(f"Sauvegard√©: {GOLD_OUTPUT}/fait_hospitalisation")
print(f"   - Partitionn√© par: annee, mois")
print(f"   - Format: Parquet compress√©")
print(f"   - Source: Tables AAAA + date (vraies donn√©es)")

In [None]:
# 2.3 FAIT SATISFACTION (depuis SILVER)
print("\n" + "="*80)
print(" FAIT: fait_satisfaction")
print("="*80)

df_satis_silver = spark.read.parquet(f"{SILVER_BASE}/satisfaction_2019")

fait_satisfaction = df_satis_silver.select(
    F.monotonically_increasing_id().alias("id_satisfaction"),
    F.col("finess"),
    F.lit("20190101").alias("id_temps"),
    F.col("annee"),
    
    # Scores (d√©j√† typ√©s correctement depuis Silver)
    F.col("score_global"),
    F.col("score_accueil"),
    F.col("score_pec_infirmier"),
    F.col("score_pec_medical"),
    F.col("score_chambre"),
    F.col("score_repas"),
    F.col("score_sortie"),
    
    # M√©triques
    F.col("taux_recommandation"),
    F.col("nb_reponses_global").alias("nb_repondants"),
    F.col("nb_recommandations"),
    
    # Classification
    F.col("classement"),
    F.col("evolution")
)

print(f"{fait_satisfaction.count():,} √©valuations de satisfaction (2019)")
fait_satisfaction.show(5)

# Statistiques
print("\n Statistiques:")
fait_satisfaction.select(
    F.count("*").alias("total_evaluations"),
    F.avg("score_global").alias("score_global_moyen"),
    F.avg("taux_recommandation").alias("taux_reco_moyen"),
    F.sum("nb_repondants").alias("total_repondants")
).show()

# Sauvegarde
fait_satisfaction.write \
    .mode("overwrite") \
    .partitionBy("annee") \
    .parquet(f"{GOLD_OUTPUT}/fait_satisfaction")

print(f" Sauvegard√©: {GOLD_OUTPUT}/fait_satisfaction (partitionn√© par annee)")

---

## √âTAPE 3 : V√©rification et Validation du Mod√®le Gold

In [None]:
# Inventaire complet du mod√®le Gold
import os

print("\n" + "="*80)
print(" MOD√àLE GOLD - INVENTAIRE COMPLET")
print("="*80)

gold_tables = []

for table in os.listdir(GOLD_OUTPUT):
    path = f"{GOLD_OUTPUT}/{table}"
    try:
        df = spark.read.parquet(path)
        count = df.count()
        cols = len(df.columns)
        table_type = "DIMENSION" if table.startswith("dim_") else "FAIT"
        
        gold_tables.append({
            "table": table,
            "rows": count,
            "columns": cols,
            "type": table_type
        })
        
        print(f"{'‚úÖ' if table_type == 'DIMENSION' else '...'} {table:30s} | {count:>10,} lignes | {cols:>2} colonnes | {table_type}")
    except Exception as e:
        print(f"‚ö†Ô∏è  {table} - Erreur: {e}")

print("="*80)

In [None]:
# Statistiques globales
import pandas as pd

if len(gold_tables) > 0:
    df_stats = pd.DataFrame(gold_tables)
    
    print("\n R√âSUM√â PAR TYPE")
    print("="*50)
    summary = df_stats.groupby('type').agg({
        'table': 'count',
        'rows': 'sum',
        'columns': 'sum'
    }).rename(columns={'table': 'nb_tables'})
    print(summary)
    
    print("\nüìã D√âTAIL DES TABLES")
    print("="*50)
    display(df_stats)
    
    total_lignes = df_stats['rows'].F.sum()
    print(f"\n TOTAL GOLD LAYER: {total_lignes:,} lignes")

In [None]:
# FAIT_HOSPITALISATION depuis √©pisodes de consultations (Silver)
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

cons = (spark.read.parquet(f"{SILVER_BASE}/consultation")
        .select('id_patient','id_diagnostic','date_consultation')
        .withColumn('date_consultation', F.col('date_consultation').cast('date'))
)

w = W.partitionBy('id_patient').orderBy(F.col('date_consultation'))
w_cum = w.rowsBetween(W.unboundedPreceding, W.currentRow)

prev = F.lag('date_consultation',1).over(w)
new_ep = (prev.isNull() | (F.datediff(F.col('date_consultation'), prev) > 1)).cast('int')

seq = (cons
       .withColumn('new_ep', new_ep)
       .withColumn('episode_seq', F.sum(F.col('new_ep')).over(w_cum))
       .withColumn('episode_id', F.concat_ws('_', F.col('id_patient').cast('string'), F.col('episode_seq').cast('string')))
)

# Mode du diagnostic par √©pisode
dc = seq.groupBy('episode_id','id_patient','id_diagnostic').count()
rnk = W.partitionBy('episode_id').orderBy(F.col('count').desc(), F.col('id_diagnostic'))
top = dc.withColumn('r', F.row_number().over(rnk)).filter(F.col('r')==1)

eps = (seq.groupBy('id_patient','episode_id')
          .agg(F.min('date_consultation').alias('date_entree'),
               F.max('date_consultation').alias('date_sortie'),
               F.count('*').alias('nb_consultations'),
               F.countDistinct('date_consultation').alias('nb_jours_distincts'))
       .join(top.select('episode_id', F.col('id_diagnostic').alias('code_diag')), 'episode_id','left'))

eps = eps.withColumn('duree_sejour_jours', F.datediff(F.col('date_sortie'), F.col('date_entree')) + F.lit(1))
eps = eps.filter( (F.col('duree_sejour_jours')>=2) | (F.col('nb_jours_distincts')>=2) )

fait_h = (eps
          .withColumn('id_hospitalisation', F.monotonically_increasing_id())
          .withColumn('id_temps_entree', F.date_format(F.col('date_entree'),'yyyyMMdd'))
          .withColumn('id_temps_sortie', F.date_format(F.col('date_sortie'),'yyyyMMdd'))
          .withColumn('annee', F.year(F.col('date_entree')))
          .withColumn('mois', F.month(F.col('date_entree')))
          .select('id_hospitalisation','id_patient','code_diag','date_entree','date_sortie',
                  'id_temps_entree','id_temps_sortie','duree_sejour_jours','nb_consultations','annee','mois')
)

print(f"Episodes: {fait_h.count()}")
fait_h.write.mode('overwrite').partitionBy('annee','mois').parquet(f"{GOLD_OUTPUT}/fait_hospitalisation")
print(f"üíæ Sauvegard√©: {GOLD_OUTPUT}/fait_hospitalisation (partitionn√© annee/mois)")


---

---

## MOD√àLE GOLD STAR SCHEMA TERMIN√â

### Ce qui a √©t√© cr√©√© :

#### üî∑ Dimensions (5) :
- **dim_temps** : Dimension temporelle compl√®te (2013-2025)
- **dim_patient** : Patients anonymis√©s (depuis Silver)
- **dim_diagnostic** : Codes diagnostics
- **dim_professionnel** : Professionnels de sant√© + sp√©cialit√©s
- **dim_etablissement** : √âtablissements de sant√© (donn√©es publiques)

#### 4 faits :
- **fait_consultation** : Consultations m√©dicales (partitionn√© ann√©e/mois)
- **fait_hospitalisation** : Hospitalisations avec dur√©e s√©jour (partitionn√© ann√©e/mois) ‚Üê NOUVEAU
- **fait_deces** : D√©c√®s 2019 anonymis√©s (partitionn√© ann√©e/mois)
- **fait_satisfaction** : Scores satisfaction E-Satis 2019 (partitionn√© ann√©e)

### Optimisations :
- **Partitionnement temporel** (ann√©e/mois pour requ√™tes rapides)
- **Format Parquet** (compression ~10x vs CSV)
- **Adaptive Query Execution** (optimisations Spark automatiques)
- **Anonymisation RGPD** (hash SHA-256 des donn√©es sensibles)
- **Typage correct** (integer, double, date)
- **D√©doublonnage** (cl√©s primaires uniques)



**Note importante** : Ce mod√®le Gold est construit depuis les donn√©es **Silver** (nettoy√©es et anonymis√©es), pas depuis Bronze. Flux ETLT.

**Fait_hospitalisation** : Construit depuis des √©pisodes d√©riv√©s des consultations Silver (dates d‚Äôentr√©e/sortie et dur√©es).