In [0]:
# Mount ADLS Gen2
# Required each time the cluster is restarted which should be only on the first notebook as they run in order

tiers = ["bronze", "silver", "gold"]
adls_paths = {tier: f"abfss://{tier}@stockagedataengineering.dfs.core.windows.net/" for tier in tiers}

# Accessing paths
bronze_adls = adls_paths["bronze"]
silver_adls = adls_paths["silver"]
gold_adls = adls_paths["gold"] 

dbutils.fs.ls(bronze_adls)
dbutils.fs.ls(silver_adls)
dbutils.fs.ls(gold_adls)

[FileInfo(path='abfss://gold@stockagedataengineering.dfs.core.windows.net/Consultation.csv', name='Consultation.csv', size=14466626, modificationTime=1746460728000),
 FileInfo(path='abfss://gold@stockagedataengineering.dfs.core.windows.net/Date.csv', name='Date.csv', size=1908623, modificationTime=1746460724000),
 FileInfo(path='abfss://gold@stockagedataengineering.dfs.core.windows.net/agregations/', name='agregations/', size=0, modificationTime=1746461915000)]

In [0]:
from pyspark.sql.functions import to_timestamp, date_format
import os

# === Traitement du fichier Date.csv ===
df_date = spark.read.option("header", "true").csv(silver_adls + "Date.csv")

# Conversion et formatage
df_date = df_date.withColumn("Date", date_format(to_timestamp("Date"), "dd/MM/yyyy"))

# Écriture dans un dossier temporaire Gold
tmp_path_date = gold_adls + "_tmp_Date/"
(df_date.coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(tmp_path_date))

# Récupérer le fichier part-xxxxx.csv
part_file_date = [f.path for f in dbutils.fs.ls(tmp_path_date) if f.name.startswith("part-")][0]
final_path_date = gold_adls + "Date.csv"
dbutils.fs.mv(part_file_date, final_path_date)
dbutils.fs.rm(tmp_path_date, recurse=True)


# === Traitement du fichier Consultation.csv ===
df_consult = spark.read.option("header", "true").csv(silver_adls + "Consultation.csv")

# Conversion et formatage des dates
df_consult = (df_consult
    .withColumn("CheckinDate", date_format(to_timestamp("CheckinDate"), "dd/MM/yyyy"))
    .withColumn("CheckoutDate", date_format(to_timestamp("CheckoutDate"), "dd/MM/yyyy")))

# Écriture dans un dossier temporaire Gold
tmp_path_consult = gold_adls + "_tmp_Consultation/"
(df_consult.coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(tmp_path_consult))

# Récupérer le fichier part-xxxxx.csv
part_file_consult = [f.path for f in dbutils.fs.ls(tmp_path_consult) if f.name.startswith("part-")][0]
final_path_consult = gold_adls + "Consultation.csv"
dbutils.fs.mv(part_file_consult, final_path_consult)
dbutils.fs.rm(tmp_path_consult, recurse=True)


True

Agrégations pour Patient.csv

In [0]:
from pyspark.sql.functions import year, when, col
from pyspark.sql import functions as F

# Charger le fichier Patient.csv
df_patient = spark.read.option("header", "true").csv(silver_adls + "Patient.csv")

# ➤ Répartition par sexe
agg_sexe = df_patient.groupBy("Gender").count()

# ➤ Répartition par tranche d’âge
df_patient = df_patient.withColumn("AnneeNaissance", year("Birth Date").cast("int"))
df_patient = df_patient.withColumn("TrancheAge", 
    when(col("AnneeNaissance") >= 2006, "0-18")
    .when(col("AnneeNaissance") >= 1989, "19-35")
    .when(col("AnneeNaissance") >= 1964, "36-60")
    .otherwise("60+"))

agg_age = df_patient.groupBy("TrancheAge").count()

# ➤ Répartition géographique (nationalité)
agg_nationalite = df_patient.groupBy("Nationality").count().orderBy("count", ascending=False)

# Chemin vers le dossier d'agrégations dans le container Gold
agg_path = gold_adls + "agregations/"

# === Sauvegarder les résultats un par un ===
def save_aggregation(df, name):
    tmp_path = agg_path + "_tmp_" + name + "/"
    final_path = agg_path + name + ".csv"
    
    (df.coalesce(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .csv(tmp_path))
    
    part_file = [f.path for f in dbutils.fs.ls(tmp_path) if f.name.startswith("part-")][0]
    dbutils.fs.mv(part_file, final_path)
    dbutils.fs.rm(tmp_path, recurse=True)

# Sauvegardes
save_aggregation(agg_sexe, "repartition_par_sexe")
save_aggregation(agg_age, "repartition_par_tranche_age")
save_aggregation(agg_nationalite, "repartition_par_nationalite")


Agrégations sur Cout.csv et CoutTraitement.csv

In [0]:
from pyspark.sql.functions import col

# Lire les deux fichiers CSV
df_patient = spark.read.option("header", "true").csv(silver_adls + "Patient.csv")
df_consultation = spark.read.option("header", "true").csv(silver_adls + "Consultation.csv")

# Effectuer la jointure interne sur Encontre_ID
df_jointure = df_patient.join(
    df_consultation,
    on="Patient_ID")

In [0]:
df_jointure.head()

Row(Patient_ID='TR754', First Name='Xu', Last Name='Chen', Gender='Male', Birth Date='1922-04-06', Height='173', Weight='62', Marital Status='Divorced', Nationality='Chinese', Blood Type='A+', Encounter_ID='2433', Disease_ID='1490', ResponsibleDoctorID='856', InsuranceKey='82', RoomKey='203', CheckinDate='2024-06-05 21:24:00', CheckoutDate='2024-07-02 20:35:00', CheckinDateKey='20240605', CheckoutDateKey='20240702', Patient_Severity_Score='68.4', RadiologyType='None', RadiologyProcedureCount='0', EndoscopyType='None', EndoscopyProcedureCount='0', CompanionPresent='True')

In [0]:
from pyspark.sql.functions import col

# Lire les deux fichiers CSV
df_Cout = spark.read.option("header", "true").csv(silver_adls + "Cout.csv")

# Effectuer la jointure interne sur Encontre_ID
df_jointure_2= df_jointure.join(
    df_Cout,
    on="Encounter_ID")

In [0]:
df_jointure_2.head()

Row(Encounter_ID='5925', Patient_ID='TRDTW246', First Name='Mei Jia Zi', Last Name='Tai Tian', Gender='Female', Birth Date='1968-07-03', Height='131', Weight='51', Marital Status='Married', Nationality='Japanese', Blood Type='O-', Disease_ID='1449', ResponsibleDoctorID='844', InsuranceKey='82', RoomKey='206', CheckinDate='2024-03-06 12:41:00', CheckoutDate='2024-04-17 11:58:00', CheckinDateKey='20240306', CheckoutDateKey='20240417', Patient_Severity_Score='2.2', RadiologyType='Ultrasound', RadiologyProcedureCount='1', EndoscopyType='None', EndoscopyProcedureCount='0', CompanionPresent='True', CostType='NutritionCost', CostAmount='4410.0')

In [0]:
from pyspark.sql.functions import col

# Lire les deux fichiers CSV
df_Traitement = spark.read.option("header", "true").csv(silver_adls + "Traitement.csv")

# Effectuer la jointure interne sur Encontre_ID
df_jointure_3= df_jointure_2.join(
    df_Traitement,
    on="Encounter_ID")

In [0]:
df_jointure_3.head()

Row(Encounter_ID='5925', Patient_ID='TRDTW246', First Name='Mei Jia Zi', Last Name='Tai Tian', Gender='Female', Birth Date='1968-07-03', Height='131', Weight='51', Marital Status='Married', Nationality='Japanese', Blood Type='O-', Disease_ID='1449', ResponsibleDoctorID='844', InsuranceKey='82', RoomKey='206', CheckinDate='2024-03-06 12:41:00', CheckoutDate='2024-04-17 11:58:00', CheckinDateKey='20240306', CheckoutDateKey='20240417', Patient_Severity_Score='2.2', RadiologyType='Ultrasound', RadiologyProcedureCount='1', EndoscopyType='None', EndoscopyProcedureCount='0', CompanionPresent='True', CostType='NutritionCost', CostAmount='4410.0', Treatment_ID='3398', Treatment_Type='Therapy', Treatment_Name='Dialysis', Follow_Up='Yes', Complications='No', Drug_Boxes_Used='0', Therapy_Sessions='75', Drug_Cost='0', Surgery_Cost='0', Post_Surgery_Care_Cost='0', Education_Rehab_Cost='75000', Hospital_Drug_Quantity='0', Discharge_Drug_Quantity='0', Total_Drug_Quantity='0')

In [0]:
from pyspark.sql.functions import year, when, col
from pyspark.sql import functions as F

# ➤ Coût total par patient

agg_couttotal = df_jointure_3.groupBy("Patient_ID").agg(F.sum("CostAmount").alias("Montant"))

# ➤ Coût moyen par consultation
agg_coutmoyerparconsultation = df_jointure_3.groupBy("Encounter_ID").agg(F.avg("CostAmount").alias("CoutMoyen"))

# ➤ Coût total des traitements
from pyspark.sql import functions as F

df_jointure_3 = df_jointure_3.withColumn(
    "Cout_Total_Par_Encontre",
    col("Drug_Cost").cast("double") +
    col("Surgery_Cost").cast("double") +
    col("Post_Surgery_Care_Cost").cast("double") +
    col("Education_Rehab_Cost").cast("double")
)

agg_couttotaltraitement = df_jointure_3.groupBy("Treatment_ID") \
    .agg(F.sum("Cout_Total_Par_Encontre").alias("CoutTotal")) \
    
# Chemin vers le dossier d'agrégations dans le container Gold
agg_path = gold_adls + "agregations/"

# === Sauvegarder les résultats un par un ===
def save_aggregation(df, name):
    tmp_path = agg_path + "_tmp_" + name + "/"
    final_path = agg_path + name + ".csv"
    
    (df.coalesce(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .csv(tmp_path))
    
    part_file = [f.path for f in dbutils.fs.ls(tmp_path) if f.name.startswith("part-")][0]
    dbutils.fs.mv(part_file, final_path)
    dbutils.fs.rm(tmp_path, recurse=True)

# Sauvegardes
save_aggregation(agg_couttotal, " Coût total par patient")
save_aggregation(agg_coutmoyerparconsultation, "Coût moyen par consultation")
save_aggregation(agg_couttotaltraitement, "Coût total des traitements")

In [0]:
# Liste des fichiers à exclure
exclude_files = {"Consultation.csv", "Date.csv"}

# Liste tous les fichiers CSV dans le container Silver
silver_files = [f for f in dbutils.fs.ls(silver_adls) if f.name.endswith(".csv") and f.name not in exclude_files]

for file in silver_files:
    filename = file.name.replace(".csv", "")
    tmp_input_path = silver_adls + filename + ".csv"

    # Lire le fichier depuis Silver
    df = spark.read.option("header", "true").csv(tmp_input_path)

    # Écrire dans un dossier temporaire sous Gold
    tmp_output_path = gold_adls + "_tmp_" + filename + "/"
    (df.coalesce(1)
       .write
       .mode("overwrite")
       .option("header", "true")
       .csv(tmp_output_path))

    # Récupérer le nom du fichier part-xxxxx.csv
    tmp_files = dbutils.fs.ls(tmp_output_path)
    part_file = [f.path for f in tmp_files if f.name.startswith("part-")][0]

    # Définir le chemin final dans Gold
    final_output_path = gold_adls + filename + ".csv"

    # Déplacer le fichier final et supprimer le dossier temporaire
    dbutils.fs.mv(part_file, final_output_path)
    dbutils.fs.rm(tmp_output_path, recurse=True)

print("✅ Tous les fichiers (sauf Consultation.csv et Date.csv) ont été copiés dans le container Gold.")


✅ Tous les fichiers (sauf Consultation.csv et Date.csv) ont été copiés dans le container Gold.
