In [1]:
import pandas as pd

# Chargement des fichiers

In [2]:
# Table contenant les informations sur les prestations remboursées
er_prs_f=pd.read_csv('er_prs_f.csv',sep=';')
print(er_prs_f.head())

   id                         NUM_ENQ  prs_nat_ref exe_soi_dtd exe_soi_dtf  \
0   1  DPXX:000000000000000000000001X         1130  04/03/2013  04/03/2013   
1   2  DPXX:000000000000000000000001X         1331  05/03/2013  05/03/2013   
2   3  DPXX:000000000000000000000001X         3313  05/03/2013  05/03/2013   
3   4  DPXX:000000000000000000000001X         3125  07/03/2013  07/03/2013   
4   5  DPXX:000000000000000000000002X         1130  05/03/2013  01/07/2013   

   pse_spe_cod  pse_act_nat  etb_pre_fin  
0          1.0          NaN          NaN  
1          6.0          NaN  750300360.0  
2          NaN         50.0  750023772.0  
3          NaN         26.0          NaN  
4          6.0         26.0  750023772.0  


In [3]:
# Table contenant des informations sur les professionnels de santé
ir_act_v=pd.read_csv('ir_act_v.csv', sep=';')
print(ir_act_v.head())

   pfs_act_nat             label
0           26  Kinesitherapeute
1           50        Pharmacien


In [4]:
# Table contenant des informations sur les professionnels de santé
ir_spe_v=pd.read_csv('ir_spe_v.csv', sep=';')
print(ir_spe_v.head())

   pfs_spe_cod                label
0            1  Medecin generaliste
1            6           Radiologue


In [5]:
#Table contenant les informations des assurés
ir_ben_r=pd.read_csv('ir_ben_r.csv', sep=';')
print(ir_ben_r.head())

                   NUM_ENQ  ben_sex_cod  ben_nai_ann  ben_nai_moi  \
0  DPXX:00000000000000001X            2         1963           12   
1    DPXX:000000000000002X            1         1971            2   
2    DPXX:000000000000003X            1         1962           12   
3    DPXX:000000000000004X            2         1959            3   
4    DPXX:000000000000005X            1         1998            4   

   ben_res_dpt  ben_res_reg  
0           75          114  
1           93          114  
2           93          114  
3           94          114  
4           93          114  


In [6]:
#Table contenant des informations sur les établissements de santé
t_mcoaae=pd.read_csv('t_mcoaae.csv', sep=';')
print(t_mcoaae.head())

     eta_num                        soc_rai
0  750300360  l'Hopital Prive des Peupliers
1  750023772            Pharmacie Plaisance


# Creation de la table Person(Python)


In [7]:
#Lors de l'importation des donnees de la colonne NUM_ENQ des espaces invisibles ont introduits des erreurs du coup je les enlever 
ir_ben_r['NUM_ENQ'] = ir_ben_r['NUM_ENQ'].str.strip()

In [8]:
# Filtration des données pour Camille Honette (code NUM_ENQ = DPXX:00000000000000001X)
person_data = ir_ben_r[ir_ben_r['NUM_ENQ'] == 'DPXX:00000000000000001X']

In [9]:
if person_data.empty:
    print("Aucune donnée trouvée pour NUM_ENQ = 'DPXX:00000000000000001X'")
else:
    # Création de la table Person
    person = pd.DataFrame({
        'person_id': [1],  # génère arbitrairement ici
        #si ben_sex_cod=1 c'est un homme sin une femme(selon les conventions OMOP)
        'gender_concept_id': [8507 if person_data['ben_sex_cod'].values[0] == 1 else 8532], # Concept OMOP pour Male/female
        'year_of_birth': person_data['ben_nai_ann'].values, #La méthode .values récupère les valeurs de la série
        'month_of_birth': person_data['ben_nai_moi'].values,
        'person_source_value': person_data['NUM_ENQ'].values,
        'localisation_id': [person_data['ben_res_dpt'].values[0]],  # Utilisation du département de résidence comme location_id
        'gender_source_value': person_data['ben_sex_cod'].values
    })

    # Sauvegarde en fichier CSV
    person.to_csv('Person.csv', index=False) # index=False pour éviter d'écrire l'index des lignes dans le fichier
    print('Table Person créée et sauvegardée')


Table Person créée et sauvegardée


# Creation de la table Care Site 

In [10]:
# importer une interface pour interagir avec des bases de données SQLite à partir de Python
import sqlite3

In [11]:
#Connection à la base de donnees SQLite
conn = sqlite3.connect('health_data.db')  
#Creation d'un curseur pour executer les requetes SQL et recuperer les resultats
cursor = conn.cursor()

In [12]:
# Supprimer la table si elle existe déjà
cursor.execute("DROP TABLE IF EXISTS Care_Site;")
#Creation de la table
create_table_query = """
CREATE TABLE IF NOT EXISTS Care_Site (
    cc_site_id INTEGER PRIMARY KEY,
    care_site_name TEXT,
    location_id INTEGER,
    care_site_source_value TEXT
);
"""
cursor.execute(create_table_query)


<sqlite3.Cursor at 0x27ee73998c0>

In [13]:
# Insertion des données avec IGNORE pour éviter les doublons
# 75 est le code de departement pour Paris
# J'ai attribué 1 et 2 arbitrairement comme clé unique
# INSERT OR IGNORE signifie que si une entrée avec le même care_site_id existe déjà, l'insertion sera ignorée pour éviter les doublons
insert_data_query = """
INSERT OR IGNORE INTO Care_Site (cc_site_id, care_site_name, location_id, care_site_source_value)
VALUES
(1, 'Hôpital Privé des Peupliers', 75, '750300360'), 
(2, 'Pharmacie Plaisance', 75, '750023772');
"""
# Execution de la requete
cursor.execute(insert_data_query)
# Validation des changements
conn.commit()
# Verification que les donnees ont etes inserees
df = pd.read_sql_query("SELECT * FROM Care_Site;", conn)
print(df)
#Fermeture de la connexion
conn.close()

   cc_site_id               care_site_name  location_id care_site_source_value
0           1  Hôpital Privé des Peupliers           75              750300360
1           2          Pharmacie Plaisance           75              750023772


# Creation de la table Provider

In [14]:
# Le fichier source pour les spécialités est ir_spe_v.csv et les données sur les professionnels de santé sont dans er_prs_f.csv
!pip install --upgrade pyspark
# Importation des modules
from pyspark.sql import SparkSession #pour travailler avec dataframes
from pyspark.sql.functions import expr #pour executer des expressions SQL sur les colonnes de DF
# Création de la session Spark avec le nom de l'application OMOP_Provider 
spark = SparkSession.builder.appName("OMOP_Provider").getOrCreate()
# Chargement des données source 
ir_spe_v = pd.read_csv('ir_spe_v.csv', sep=';')
er_prs_f = pd.read_csv('er_prs_f.csv', sep=';')
# Conversion en DataFrame pandas en DataFrame Spark
spe_df = spark.createDataFrame(ir_spe_v)
prs_df = spark.createDataFrame(er_prs_f)
# Filtration des professionnels de santé pour Camille Honette (NUM_ENQ = DPXX:00000000000000001X)
prs_filtered = prs_df.filter(prs_df['NUM_ENQ'] == 'DPXX:00000000000000001X')
# Jointure pour obtenir les spécialités
# specialty_source_value dérivé de pse_spe_cod de prs_filtered
# provider_source_value dérivé de pfs_spe_cod de spe_df
# specialty_concept_id !!
# spe_df contient les specialites medicales
# toutes les lignes de prs_filtered seront conservees mm si elles n'ont pas de correspondance dans spe_df
# on va joindre 2 DF pour associer les professionelles de sante a leur specialite
# j'ai utilise left comme type de jointure pour assurer qu'il ne se produit pas aucune perte de donnees
provider = prs_filtered.join(spe_df, prs_filtered.pse_spe_cod == spe_df.pfs_spe_cod, 'left') \
# renommer les colonnes apres la jointure
    .selectExpr("pse_spe_cod as specialty_source_value", "pfs_spe_cod as provider_source_value") \
#Provider_id genere automatiquement avce 'monotonically_increasing_id()'
    .withColumn("provider_id", expr("monotonically_increasing_id()"))
try:
    # sauvegarde dans un fichier parquet(un format de fichier columnar optimisé pour les traitements Big Data)
    provider.write.parquet("Provider.parquet")
    print("Table Provider créée et sauvegardée.")
except Exception as e:
    print("Erreur lors de la sauvegarde en fichier Parquet : ", e)

#Probleme de communication entre saprk le processus python du coup je vais essayé autre méthode



  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


Erreur lors de la sauvegarde en fichier Parquet :  An error occurred while calling o82.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3) (192.168.0.4 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.r

In [15]:
# Installer PySpark si nécessaire
!pip install --upgrade pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr



In [16]:
# Pour optimiser l’utilisation des ressources disponibles lors du traitement de données localement avec Sparkr
spark = SparkSession.builder \
    .appName("OMOP_Provider") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.network.timeout", "600s") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "8") \
    .master("local[*]") \
    .getOrCreate()

In [17]:
# Chargement des données source
ir_spe_v = pd.read_csv('ir_spe_v.csv', sep=';')
er_prs_f = pd.read_csv('er_prs_f.csv', sep=';')

In [18]:
# Conversion en DataFrame Spark
spe_df = spark.createDataFrame(ir_spe_v)
prs_df = spark.createDataFrame(er_prs_f)

  if is_categorical_dtype(series.dtype):
  sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


In [19]:
# Filtration des professionnels de santé pour Camille Honette (NUM_ENQ = DPXX:00000000000000001X)
prs_filtered = prs_df.filter(prs_df['NUM_ENQ'] == 'DPXX:00000000000000001X')

In [20]:
# Jointure pour obtenir les spécialités
provider = prs_filtered.join(spe_df, prs_filtered.pse_spe_cod == spe_df.pfs_spe_cod, 'left') \
    .selectExpr("pse_spe_cod as specialty_source_value", "pfs_spe_cod as provider_source_value") \
    .withColumn("provider_id", expr("monotonically_increasing_id()"))

In [21]:
# Répartir les données en 4 partitions pour mieux gérer les ressources
provider = provider.repartition(4)

In [22]:
# Sauvegarde du résultat au format Parquet avec gestion des erreurs
try:
    provider.write.mode("overwrite").parquet("Provider.parquet")
    print("Table Provider créée et sauvegardée avec succès.")
except Exception as e:
    print("Erreur lors de la sauvegarde en fichier Parquet : ", e)

# Meme erreur rencontre j'espere que l'ont discuttent pour que je puisse comprendre mieux le probleme


Erreur lors de la sauvegarde en fichier Parquet :  An error occurred while calling o165.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 2.0 failed 1 times, most recent failure: Lost task 13.0 in stage 2.0 (TID 30) (192.168.0.4 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spa