# Projet Spark – Analyse ACS Patients

**Objectif pédagogique**  
Reproduire toutes les étapes des séances 1 → 3 de l’atelier Spark : chargement de données, ingestion RDD → DataFrame, Spark SQL, agrégations/statistiques, export et visualisation.

*Dataset principal* : [`CardioDataSets/acs_patients_df.csv`](https://raw.githubusercontent.com/vincentarelbundock/Rdatasets/master/csv/CardioDataSets/acs_patients_df.csv)

*Persona* : **Dr Emma Martin**, cardiologue interventionnelle (CHU) souhaitant suivre la sévérité clinique des patients ACS et la charge des facteurs de risque.


## SÉANCE 1 – Objectifs, chargement, pré‑traitements


In [3]:
// 1‑A • SparkSession & imports (sur Databricks, `spark` existe déjà)
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("ACS Project")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

// 1‑B • Source de données (CSV GitHub – format "raw")
val acsUrl = "acs_patients_df.csv"

// 1‑C • Lecture brute, schéma auto + cache
val acsRaw = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .option("nullValue", "NA")
  .csv(acsUrl)
  .cache()

acsRaw.show(5)


+--------+---+------+----------------+-------+------+----+------+------+-----------+-------+-----+----+----+----+---+---+-------+
|rownames|age|   sex|cardiogenicShock|  entry|    Dx|  EF|height|weight|        BMI|obesity|   TC|LDLC|HDLC|  TG| DM|HBP|smoking|
+--------+---+------+----------------+-------+------+----+------+------+-----------+-------+-----+----+----+----+---+---+-------+
|       1| 62|  Male|              No|Femoral| STEMI|18.0| 168.0|  72.0|25.51020408|    Yes|215.0| 154|  35| 155|Yes| No| Smoker|
|       2| 78|Female|              No|Femoral| STEMI|18.4| 148.0|  48.0| 21.9138057|     No| NULL|NULL|NULL| 166| No|Yes|  Never|
|       3| 76|Female|             Yes|Femoral| STEMI|20.0|  NULL|  NULL|       NULL|     No| NULL|NULL|NULL|NULL| No|Yes|  Never|
|       4| 89|Female|              No|Femoral| STEMI|21.8| 165.0|  50.0|18.36547291|     No|121.0|  73|  20|  89| No| No|  Never|
|       5| 56|  Male|              No| Radial|NSTEMI|21.8| 162.0|  64.0|24.38652644|     N

spark = org.apache.spark.sql.SparkSession@1e805d0e
acsUrl = acs_patients_df.csv
acsRaw = [rownames: int, age: int ... 16 more fields]


[rownames: int, age: int ... 16 more fields]

In [4]:
// 1‑D • Définition de la case‑class (types "boxed" pour autoriser NULL)
case class AcsPatient(
  rownames:   java.lang.Integer,
  age:        java.lang.Integer,
  sex:        String,
  cardiogenicShock: String,
  entry:      String,
  Dx:         String,
  EF:         java.lang.Double,
  height:     java.lang.Double,
  weight:     java.lang.Double,
  BMI:        java.lang.Double,
  obesity:    String,
  TC:         java.lang.Double,
  LDLC:       java.lang.Double,
  HDLC:       java.lang.Double,
  TG:         java.lang.Double,
  DM:         String,
  HBP:        String,
  smoking:    String
)

// 1‑E • DataFrame nettoyé (ici on garde tout, mais on pourrait .na.fill / .na.drop)
val acsClean = acsRaw
val acsDS    = acsClean.as[AcsPatient]  // Dataset typé


defined class AcsPatient
acsClean = [rownames: int, age: int ... 16 more fields]
acsDS = [rownames: int, age: int ... 16 more fields]


[rownames: int, age: int ... 16 more fields]

## SÉANCE 2 – Ingestion, Spark SQL


In [5]:
// 2‑A • Passage RDD → DataFrame + exploration
val acsRDD = acsDS.rdd               // RDD[AcsPatient]
val acsDF  = acsRDD.toDF()           // DataFrame

acsDF.show(10, truncate = false)
acsDF.printSchema()
println(s"Total rows : ${acsDF.count}")
acsDF.summary().show()


+--------+---+------+----------------+-------+---------------+----+------+------+-----------+-------+-----+-----+----+-----+---+---+---------+
|rownames|age|sex   |cardiogenicShock|entry  |Dx             |EF  |height|weight|BMI        |obesity|TC   |LDLC |HDLC|TG   |DM |HBP|smoking  |
+--------+---+------+----------------+-------+---------------+----+------+------+-----------+-------+-----+-----+----+-----+---+---+---------+
|1       |62 |Male  |No              |Femoral|STEMI          |18.0|168.0 |72.0  |25.51020408|Yes    |215.0|154.0|35.0|155.0|Yes|No |Smoker   |
|2       |78 |Female|No              |Femoral|STEMI          |18.4|148.0 |48.0  |21.9138057 |No     |NULL |NULL |NULL|166.0|No |Yes|Never    |
|3       |76 |Female|Yes             |Femoral|STEMI          |20.0|NULL  |NULL  |NULL       |No     |NULL |NULL |NULL|NULL |No |Yes|Never    |
|4       |89 |Female|No              |Femoral|STEMI          |21.8|165.0 |50.0  |18.36547291|No     |121.0|73.0 |20.0|89.0 |No |No |Never    |

acsRDD = MapPartitionsRDD[24] at rdd at <console>:33
acsDF = [rownames: int, age: int ... 16 more fields]


[rownames: int, age: int ... 16 more fields]

In [6]:
// 2‑B • Extraction des dimensions
import org.apache.spark.sql.functions.monotonically_increasing_id

val dimSex = acsDF.select("sex").distinct
  .withColumn("id_sex", monotonically_increasing_id)

val dimDx = acsDF.select("Dx").distinct
  .withColumn("id_dx", monotonically_increasing_id)

val dimAccess = acsDF.select(col("entry").alias("access")).distinct
  .withColumn("id_access", monotonically_increasing_id)

// Table de faits
val factACS = acsDF
  .join(dimSex,   Seq("sex"))
  .join(dimDx,    Seq("Dx"))
  .join(dimAccess, acsDF("entry") === dimAccess("access"))
  .drop("sex", "Dx", "entry", "access")
  .cache()


dimSex = [sex: string, id_sex: bigint]
dimDx = [Dx: string, id_dx: bigint]
dimAccess = [access: string, id_access: bigint]
factACS = [rownames: int, age: int ... 16 more fields]


[rownames: int, age: int ... 16 more fields]

In [7]:
// 2‑C • Persistance Hive / Delta : 4 tables
dimSex.write    .mode("overwrite").saveAsTable("dim_sex")
dimDx.write     .mode("overwrite").saveAsTable("dim_dx")
dimAccess.write .mode("overwrite").saveAsTable("dim_access")
factACS.write   .mode("overwrite").saveAsTable("fact_acs_patients")


In [8]:
// 2‑D • Exemples de requêtes SQL
spark.sql("""
SELECT dx.Dx, s.sex, COUNT(*) AS nb_patients
FROM   fact_acs_patients f
JOIN   dim_dx  dx ON f.id_dx  = dx.id_dx
JOIN   dim_sex s  ON f.id_sex = s.id_sex
GROUP  BY dx.Dx, s.sex
ORDER  BY nb_patients DESC
""").show()


+---------------+------+-----------+
|             Dx|   sex|nb_patients|
+---------------+------+-----------+
|Unstable Angina|  Male|        247|
|          STEMI|  Male|        220|
|Unstable Angina|Female|        153|
|         NSTEMI|  Male|        103|
|          STEMI|Female|         84|
|         NSTEMI|Female|         50|
+---------------+------+-----------+



## SÉANCE 3 – Agrégations, export & visualisations


In [9]:
// 3‑A • Indicateurs (≥ 5 agrégations, dont ≥ 4 groupées)

// 1. Pourcentage de STEMI
spark.sql("""
SELECT ROUND(SUM(CASE WHEN dx.Dx = 'STEMI' THEN 1 ELSE 0 END) / COUNT(*) * 100, 2) AS pct_stemi
FROM fact_acs_patients f
JOIN dim_dx dx ON f.id_dx = dx.id_dx
""").show()

// 2. Choc cardiogénique par type de Dx
spark.sql("""
SELECT dx.Dx,
       ROUND(AVG(CASE WHEN cardiogenicShock = 'Yes' THEN 1 ELSE 0 END) * 100, 2) AS shock_pct
FROM   fact_acs_patients f
JOIN   dim_dx dx ON f.id_dx = dx.id_dx
GROUP  BY dx.Dx
ORDER  BY shock_pct DESC
""").show()

// 3. EF moyenne par tranche d'âge
factACS.withColumn("age_band",
    when($"age" < 40, "<40")
     .when($"age" < 60, "40-59")
     .when($"age" < 80, "60-79")
     .otherwise(">=80"))
  .groupBy("age_band")
  .agg(round(avg("EF"), 1).alias("avg_EF"))
  .orderBy("age_band")
  .show()

// 4. Obésité (%) par sexe
spark.sql("""
SELECT s.sex,
       ROUND(AVG(CASE WHEN BMI >= 30 THEN 1 ELSE 0 END) * 100, 2) AS obesity_pct
FROM   fact_acs_patients f
JOIN   dim_sex s ON f.id_sex = s.id_sex
GROUP  BY s.sex
""").show()

// 5. Cholestérol total moyen selon diabète
spark.sql("""
SELECT DM,
       ROUND(AVG(TC), 1) AS avg_TC
FROM   fact_acs_patients
GROUP  BY DM
""").show()


+---------+
|pct_stemi|
+---------+
|    35.47|
+---------+

+---------------+---------+
|             Dx|shock_pct|
+---------------+---------+
|          STEMI|    15.79|
|         NSTEMI|     2.61|
|Unstable Angina|      0.0|
+---------------+---------+

+--------+------+
|age_band|avg_EF|
+--------+------+
|   40-59|  56.4|
|   60-79|  55.7|
|     <40|  53.3|
|    >=80|  54.7|
+--------+------+

+------+-----------+
|   sex|obesity_pct|
+------+-----------+
|Female|       4.53|
|  Male|       4.04|
+------+-----------+

+---+------+
| DM|avg_TC|
+---+------+
| No| 184.7|
|Yes| 186.2|
+---+------+



In [10]:
// 3‑B • Export CSV (exemple : distribution EF par tranche d'âge)
val ageEf = factACS.withColumn("age_band",
    when($"age" < 40, "<40")
     .when($"age" < 60, "40-59")
     .when($"age" < 80, "60-79")
     .otherwise(">=80"))
  .groupBy("age_band")
  .agg(round(avg("EF"),1).alias("avg_EF"))

ageEf.coalesce(1)
  .write.mode("overwrite")
  .option("header", "true")
  .format("com.databricks.spark.csv")
  .save("out/age_ef")


ageEf = [age_band: string, avg_EF: double]


[age_band: string, avg_EF: double]

### Graphiques

* **Databricks CE** : utilisez le bouton *Visualize* dans chacune des requêtes ci‑dessus (barres, secteurs, boxplots…).  
* **Codespaces / local** : importez les CSV générés dans un tableur ou outil de BI (Sheets, Excel, Tableau…) et réalisez trois graphiques illustrant les indicateurs majeurs pour le Dr Martin.
