In [19]:
from google.cloud import storage, bigquery
import pandas as pd
from pyspark.sql import SparkSession
import datetime
import json
import logging


#initialisation de GCS et BQ
GCS = storage.Client()
BQ = bigquery.Client()

#initialisation de la session spark

spark = SparkSession.builder.appName("HospitalAMySQLToLanding").getOrCreate()

# Google Cloud Storage (GCS) Configuration

GCS_BUCKET = "healthcar032025"
HOPITAL_NAME = "hopital-a"
LANDING_PATH = f"gs://{GCS_BUCKET}/landing/{HOPITAL_NAME}/"
ARCHIVE_PATH = f"gs://{GCS_BUCKET}/landing/{HOPITAL_NAME}/archive/"
CONFIG_FILE_PATH = f"gs://{GCS_BUCKET}/configs/load_config.csv"


MYSQL_CONFIG = {
    "url": "jdbc:mysql://34.56.4.219:3306/hopital_a_db?useSSL=false&allowPublicKeyRetrieval=true",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "azzedine",
    "password": "ZainaSalem1967@"
}

#test de connexion
try:
   
    df = spark.read.jdbc(url=MYSQL_CONFIG["url"], table="patients", properties={
                             "user": MYSQL_CONFIG["user"],
                             "password": MYSQL_CONFIG["password"],
                             "driver": MYSQL_CONFIG["driver"]
                         })
    print('leccture')
   
    df.show(5)  
    
    print("Connexion réussie!")
except Exception as e:
     logger.error("Erreur lors de la connexion à MySQL:", exc_info=True)


leccture
+------------+---------+--------+----------+-----------+--------------------+------+----------+--------------------+------------+
|   PatientID|FirstName|LastName|MiddleName|        SSN|         PhoneNumber|Gender|       DOB|             Address|ModifiedDate|
+------------+---------+--------+----------+-----------+--------------------+------+----------+--------------------+------------+
|HOSP1-000001|     Rick|   Russo|         U|188-23-9828|+1-630-829-7585x0769|Female|1937-06-04|Unit 0915 Box 706...|  2020-05-25|
|HOSP1-000002|  Gregory|  Graham|         B|730-45-8217|  456.746.7289x69233|Female|1937-06-10|9864 Gibson Islan...|  2021-06-05|
|HOSP1-000003|     Mary|    Ryan|         H|348-14-7947|        522-501-5461|Female|1926-08-09|6194 Joseph Turnp...|  2024-09-06|
|HOSP1-000004|   Daniel|   Brown|         D|013-38-1645|     +1-345-608-9409|  Male|1971-10-23|780 Conrad Isle, ...|  2022-04-07|
|HOSP1-000005|     Brad| Carroll|         M|461-53-6290|   963.994.2969x6232|  Ma

In [20]:
#big query confuguration
BQ_PROJECT = "hybrid-ridge-457421-t8"
BQ_AUDIT_TABLE = f"{BQ_PROJECT}.temp_dataset.audit_log" # stocker l'etat de chaque chargement
BQ_LOG_TABLE = f"{BQ_PROJECT}.temp_dataset.pipeline_logs" #stocker les events de l'excustion de la pipline
BQ_TEMP_PATH = f"{GCS_BUCKET}/temp/"  

log_entries = [] 

def log_event(type_event,message,table=None):
    log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "event_type": type_event,
        "message": message,
        "table": table
    }
    log_entries.append(log_entry)
    print(f"[{log_entry['timestamp']}] {type_event} - {message}") 
    
    
def save_logs_to_gcs():
    """Enregistrer les logs dans GCS sous forme de fichier JSON"""
    log_filename = f"pipeline_log_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    log_filepath = f"temp/pipeline_logs/{log_filename}"

    json_data = json.dumps(log_entries, indent=4)

    # Sauvegarde sur GCS
    bucket = GCS.bucket(GCS_BUCKET)
    blob = bucket.blob(log_filepath)
    blob.upload_from_string(json_data, content_type="application/json")

    print(f"✅ Logs successfully saved to GCS at gs://{GCS_BUCKET}/{log_filepath}")
    

        
    


In [21]:
def get_lastes_watemark(table_name):
    query = f"""
        SELECT MAX(load_timestamp) AS latest_timestamp
        FROM `{BQ_AUDIT_TABLE}`
        WHERE tablename = '{table_name}' and data_source = "hopital_a_db"
    """
    query_job = BQ.query(query)
    result = query_job.result()
    for row in result:
        if row.latest_timestamp:
            return row.timestamp
        else:
            return "1900-01-01 00:00:00"
        
    return "1900-01-01 00:00:00"
    

In [22]:
def extract_and_save_in_gcs(table_name,load_type,watermark_col):
   
    
    try:
        if load_type.lower()=='incremental':
            last_watermark=get_lastes_watemark(table_name)
            query=f"""
            (SELECT * FROM {table_name} WHERE {watermark_col} > '{last_watermark}') AS t
            """
        else:
            query=f"(SELECT * FROM {table_name}) AS t"
        
        df = (spark.read.format("jdbc")
                .option("url", MYSQL_CONFIG["url"])
                .option("user", MYSQL_CONFIG["user"])
                .option("password", MYSQL_CONFIG["password"])
                .option("driver", MYSQL_CONFIG["driver"])
                .option("dbtable", query)
                .load())
        log_event("SUCCESS", f" extract des donnees  reussie de {table}", table=table)
        
        today = datetime.datetime.today().strftime('%d%m%Y')
        JSON_FILE_PATH = f"landing/{HOPITAL_NAME}/{table}/{table}_{today}.json"
        
        bucket = GCS.bucket(GCS_BUCKET)
        blob = bucket.blob(JSON_FILE_PATH)
        blob.upload_from_string(df.toPandas().to_json(orient="records", lines=True), content_type="application/json")
        
        log_event("SUCCÈS", f" Fichier JSON enregistré avec succès dans gs://{GCS_BUCKET}/{JSON_FILE_PATH}", table=table)
        
        #table_audit
        audit_df = spark.createDataFrame([
            ("hospital_a_db", table, load_type, df.count(), datetime.datetime.now(), "SUCCESS")], 
            ["data_source", "tablename", "load_type", "record_count", "load_timestamp", "status"])

        (audit_df.write.format("bigquery")
            .option("table", BQ_AUDIT_TABLE)
            .option("temporaryGcsBucket", GCS_BUCKET)
            .mode("append")
            .save())
        
    except Exception as e:
        log_event("ERREUR", f"Erreur lors du traitement de la table {table} : {str(e)}", table=table)

        

In [23]:
def move_existing_files_to_archive(table):
    blobs = list(GCS.bucket(GCS_BUCKET).list_blobs(prefix=f"landing/{HOPITAL_NAME}/{table}/"))
    existing_files = [blob.name for blob in blobs if blob.name.endswith(".json")]
    
    if not existing_files:
        log_event("INFO", f"Aucun fichier existant pour la table {table}")
        return
    
    for file in existing_files:
        source_blob = GCS.bucket(GCS_BUCKET).blob(file)

        # Extract Date from File Name
        date_part = file.split("_")[-1].split(".")[0]
        year, month, day = date_part[-4:], date_part[2:4], date_part[:2]

        # Move to Archive
        archive_path = f"landing/{HOPITAL_NAME}/archive/{table}/{year}/{month}/{day}/{file.split('/')[-1]}"
        destination_blob = GCS.bucket(GCS_BUCKET).blob(archive_path)

        # Copy file to archive and delete original
        GCS.bucket(GCS_BUCKET).copy_blob(source_blob, GCS.bucket(GCS_BUCKET), destination_blob.name)
        source_blob.delete()

        log_event("INFO", f"Fichier {file} déplacé vers {archive_path}", table=table)
    
    
    
    
    

In [25]:
def read_config_file():
    df = spark.read.csv(CONFIG_FILE_PATH, header=True)
    log_event("INFO", "✅ Fichier de configuration lu avec succès")
    return df

# read config file
config_df = read_config_file()

for row in config_df.collect():
    if row["is_active"] == '1' and row["datasource"] == "hospital_a_db": 
        db, src, table, load_type, watermark, _, targetpath = row
        move_existing_files_to_archive(table)
        extract_and_save_in_gcs(table, load_type, watermark)
        
save_logs_to_gcs()


[2025-05-04T17:19:00.673663] INFO - ✅ Fichier de configuration lu avec succès
[2025-05-04T17:19:00.862381] INFO - Aucun fichier existant pour la table encounters
[2025-05-04T17:19:02.197242] SUCCESS -  extract des donnees  reussie de encounters


                                                                                

[2025-05-04T17:19:03.122841] SUCCÈS -  Fichier JSON enregistré avec succès dans gs://healthcar032025/landing/hopital-a/encounters/encounters_04052025.json


                                                                                

[2025-05-04T17:19:11.281347] INFO - Aucun fichier existant pour la table patients
[2025-05-04T17:19:12.412858] SUCCESS -  extract des donnees  reussie de patients
[2025-05-04T17:19:13.158620] SUCCÈS -  Fichier JSON enregistré avec succès dans gs://healthcar032025/landing/hopital-a/patients/patients_04052025.json


                                                                                

[2025-05-04T17:19:19.562205] INFO - Aucun fichier existant pour la table transactions
[2025-05-04T17:19:20.580737] SUCCESS -  extract des donnees  reussie de transactions


                                                                                

[2025-05-04T17:19:21.840479] SUCCÈS -  Fichier JSON enregistré avec succès dans gs://healthcar032025/landing/hopital-a/transactions/transactions_04052025.json


                                                                                

[2025-05-04T17:19:27.544239] INFO - Fichier landing/hopital-a/providers/providers_04052025.json déplacé vers landing/hopital-a/archive/providers/2025/05/04/providers_04052025.json
[2025-05-04T17:19:27.776329] SUCCESS -  extract des donnees  reussie de providers
[2025-05-04T17:19:28.119657] SUCCÈS -  Fichier JSON enregistré avec succès dans gs://healthcar032025/landing/hopital-a/providers/providers_04052025.json


                                                                                

[2025-05-04T17:19:33.680377] INFO - Fichier landing/hopital-a/departments/departments_04052025.json déplacé vers landing/hopital-a/archive/departments/2025/05/04/departments_04052025.json
[2025-05-04T17:19:33.921582] SUCCESS -  extract des donnees  reussie de departments
[2025-05-04T17:19:34.270278] SUCCÈS -  Fichier JSON enregistré avec succès dans gs://healthcar032025/landing/hopital-a/departments/departments_04052025.json


                                                                                

✅ Logs successfully saved to GCS at gs://healthcar032025/temp/pipeline_logs/pipeline_log_20250504171940.json
