In [1]:
from pyspark.sql import SparkSession

In [2]:
# Étape 2 : Créer la session Spark avec les drivers JDBC
spark = SparkSession.builder \
    .appName("Test MySQL Ingestion") \
    .config("spark.driver.extraClassPath", "/usr/local/spark/jars/*") \
    .getOrCreate()

In [3]:

#  Étape 3 : Lire la table MySQL
df_mysql = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://mysql:3306/hospital_legacy") \
    .option("dbtable", "patients_legacy") \
    .option("user", "source_reader") \
    .option("password", "Source_Read_2024!") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()

In [4]:
df_mysql.show(50)

+----------+------+---------+--------------+----+--------------------+-----------+-----------+---------+-------------+
|patient_id|   nom|   prenom|date_naissance|sexe|             adresse|      ville|code_postal|telephone|date_creation|
+----------+------+---------+--------------+----+--------------------+-----------+-----------+---------+-------------+
|         1|Diallo|  Aminata|    1985-06-12|   F|Rue des Manguiers 12|      Dakar|      11000|771234567|   2020-01-15|
|         2|    Ba|   Moussa|    1978-03-22|   M|Avenue Blaise Diagne|     ThiÃ¨s|      21000|772345678|   2019-11-03|
|         3|   Sow|    Fatou|    1990-09-10|   F|      Rue 4, MÃ©dina|      Dakar|      11000|773456789|   2021-05-20|
|         4|Ndiaye| Ibrahima|    1982-12-01|   M|   CitÃ© Keur Gorgui|      Dakar|      11000|774567890|   2018-07-12|
|         5|  Fall|      Awa|    1995-04-18|   F|     Rue des Jardins|Saint-Louis|      32000|775678901|   2022-02-28|
|         6|  Diop|   Cheikh|    1975-08-30|   M

In [5]:
# Lire le fichier CSV
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/jovyan/work/data/patients_source.csv")

# Afficher les premières lignes
df_csv.show(10)

+-----------+------+--------+--------------+----+--------------------+-------------+-------------+-----------+
|external_id|   nom|  prenom|date_naissance|sexe|               email|     num_secu|source_system|date_import|
+-----------+------+--------+--------------+----+--------------------+-------------+-------------+-----------+
|     SRC001|Diallo| Aminata|    12/06/1985|   F|aminata.diallo@ex...|2850612345678|   SOURCE_CSV| 2024-01-01|
|     SRC002|    Ba|  Moussa|    22/03/1978|   M|moussa.ba@example...|2780322456789|   SOURCE_CSV| 2024-01-01|
|     SRC003|   Sow|   Fatou|    10/09/1990|   F|fatou.sow@example...|2900912345678|   SOURCE_CSV| 2024-01-01|
|     SRC004|Ndiaye|Ibrahima|    01/12/1982|   M|ibrahima.ndiaye@e...|2821212345678|   SOURCE_CSV| 2024-01-01|
|     SRC005|  Fall|     Awa|    18/04/1995|   F|awa.fall@example.com|2950412345678|   SOURCE_CSV| 2024-01-01|
|     SRC006|  Diop|  Cheikh|    30/08/1975|   M|cheikh.diop@examp...|2750812345678|   SOURCE_CSV| 2024-01-01|
|

In [6]:
import requests
import pandas as pd

# URL de l'API (depuis le conteneur Spark, utilise le nom du service Docker)
url = "http://api-medical:8000/visits"  # ← adapte ce endpoint selon ton API

# Requête GET
response = requests.get(url)

# Vérifie la réponse
if response.status_code == 200:
    data = response.json()
    df_api = pd.DataFrame(data)
    display(df_api)
else:
    print(f"Erreur {response.status_code} : {response.text}")


Unnamed: 0,count,visits
0,8,"{'visit_id': 8, 'patient_id': 8, 'date_visite'..."
1,8,"{'visit_id': 7, 'patient_id': 7, 'date_visite'..."
2,8,"{'visit_id': 6, 'patient_id': 6, 'date_visite'..."
3,8,"{'visit_id': 5, 'patient_id': 5, 'date_visite'..."
4,8,"{'visit_id': 4, 'patient_id': 4, 'date_visite'..."
5,8,"{'visit_id': 3, 'patient_id': 3, 'date_visite'..."
6,8,"{'visit_id': 2, 'patient_id': 2, 'date_visite'..."
7,8,"{'visit_id': 1, 'patient_id': 1, 'date_visite'..."


In [7]:
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "http://medical-minio:9000")
hadoop_conf.set("fs.s3a.access.key", "medicaladmin")
hadoop_conf.set("fs.s3a.secret.key", "MedicalSecurePass123!")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

In [8]:
bronze_path = "s3a://bronze"

In [9]:
# 1. MySQL → MinIO 
df_mysql.write.mode("overwrite").parquet(f"{bronze_path}/patients_legacy")

In [10]:
# 2. CSV → MinIO 
df_csv.write.mode("overwrite").parquet(f"{bronze_path}/patients_source")

In [14]:


# Si ce n’est pas déjà fait
spark = SparkSession.builder.getOrCreate()

# Conversion Pandas → Spark
df_api_spark = spark.createDataFrame(df_api)

# Écriture dans MinIO
df_api_spark.coalesce(1).write.mode("overwrite").parquet(f"{bronze_path}/visits")
