In [30]:
from google.cloud import storage, bigquery
import pandas as pd
from pyspark.sql import SparkSession
import datetime
import json

In [31]:
storage_client = storage.Client()
bq_client = bigquery.Client()

In [32]:
log_entries = []

def log_event(event_type, message, table=None):
    """Log an event and store it in the log list"""
    log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "event_type": event_type,
        "message": message,
        "table": table
    }
    log_entries.append(log_entry)
    print(f"[{log_entry['timestamp']}] {event_type} - {message}")

In [33]:
spark = SparkSession.builder.appName("HospitalASqlToLanding").getOrCreate()

In [None]:
GCP_BUCKET="healthcare-bucket-0201"
HOSPITAL_NAME="hospital-b"
LANDING_PATH=f"gs://{GCP_BUCKET}/landing/{HOSPITAL_NAME}/"
ARCHIVE_PATH=f"gs://{GCP_BUCKET}/landing/{HOSPITAL_NAME}/archive/"
CONFIG_FILE_PATH=f"gs://{GCP_BUCKET}/configs/load_config.csv"

In [35]:
BQ_PROJECT="learning-0201"
BQ_AUDIT_TABLE=f"{BQ_PROJECT}.temp_dataset.audit_log"
BQ_LOG_TABLE=f"{BQ_PROJECT}.temp_dataset.pipeline_logs"
BQ_TEMP_PATH=f"{GCP_BUCKET}/temp/"

In [None]:
MYSQL_CONFIG = {
    "url": "jdbc:mysql://10.13.176.3/hospital-a-db?useSSL=true&allowPublicKeyRetrieval=true",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "",
    "password": ""
}

In [37]:
def read_config_file():
    df = spark.read.csv(CONFIG_FILE_PATH, header=True)
    log_event("INFO", "Successfully read the config file")
    return df

config_df = read_config_file()

[2026-01-02T09:48:20.809931] INFO - Successfully read the config file


In [39]:
def move_existing_files_to_archive(table):
    blobs = list(storage_client.bucket(GCP_BUCKET).list_blobs(prefix=f"landing/{HOSPITAL_NAME}/{table}/"))
    existing_files = [blob.name for blob in blobs if blob.name.endswith(".json")]
    
    if not existing_files:
        log_event("INFO", f"No existing files for table {table}")
        return
    
    for file in existing_files:
        source_blob = storage_client.bucket(GCP_BUCKET).blob(file)
        
        date_part = file.split("_")[-1].split(".")[0]
        year, month, day = date_part[-4:], date_part[2:4], date_part[:2]
        
        archive_path = f"landing/{HOSPITAL_NAME}/archive/{table}/{year}/{month}/{day}/{file.split('/')[-1]}"
        dest_blob = storage_client.bucket(GCP_BUCKET).blob(archive_path)
        
        storage_client.bucket(GCP_BUCKET).copy_blob(source_blob, storage_client.bucket(GCP_BUCKET), dest_blob.name)
        source_blob.delete()
        
        log_event("INFO", f"Moved {file} to {archive_path}")

In [40]:
def get_latest_watermark(table_name):
    query = f"""
        SELECT MAX(load_timestamp) AS latest_timestamp
        FROM `{BQ_AUDIT_TABLE}`
        WHERE tablename = '{table_name}' and data_source = "hospital_a_db"
    """
    query_job = bq_client.query(query)
    result = query_job.result()
    for row in result:
        return row.latest_timestamp if row.latest_timestamp else "1900-01-01 00:00:00"
    return "1900-01-01 00:00:00"

In [None]:
def extract_and_save_to_landing(table, load_type, watermark_col):
    try:
        last_watermark = get_latest_watermark(table) if load_type.lower() == "incremental" else None
        log_event("INFO", f"Latest watermark for {table}: {last_watermark}", table=table)

        query = f"(SELECT * FROM {table}) AS t" if load_type.lower() == "full" else \
                f"(SELECT * FROM {table} WHERE {watermark_col} > '{last_watermark}') AS t"

        df = (spark.read.format("jdbc")
                .option("url", MYSQL_CONFIG["url"])
                .option("user", MYSQL_CONFIG["user"])
                .option("password", MYSQL_CONFIG["password"])
                .option("driver", MYSQL_CONFIG["driver"])
                .option("dbtable", query)
                .load())

        log_event("SUCCESS", f"✅ Successfully extracted data from {table}", table=table)

        today = datetime.datetime.today().strftime('%d%m%Y')
        JSON_FILE_PATH = f"landing/{HOSPITAL_NAME}/{table}/{table}_{today}.json"

        bucket = storage_client.bucket(GCP_BUCKET)
        blob = bucket.blob(JSON_FILE_PATH)
        blob.upload_from_string(df.toPandas().to_json(orient="records", lines=True), content_type="application/json")

        log_event("SUCCESS", f"✅ JSON file successfully written to gs://{GCP_BUCKET}/{JSON_FILE_PATH}", table=table)
        
        # Insert Audit Entry
        audit_df = spark.createDataFrame([
            ("hospital_b_db", table, load_type, df.count(), datetime.datetime.now(), "SUCCESS")], 
            ["data_source", "tablename", "load_type", "record_count", "load_timestamp", "status"])

        (audit_df.write.format("bigquery")
            .option("table", BQ_AUDIT_TABLE)
            .option("temporaryGcsBucket", GCP_BUCKET)
            .mode("append")
            .save())

        log_event("SUCCESS", f"✅ Audit log updated for {table}", table=table)

    except Exception as e:
        log_event("ERROR", f"Error processing {table}: {str(e)}", table=table)

In [None]:
for row in config_df.collect():
    if row["is_active"] == '1' and row["datasource"] == "hospital_b_db": 
        db, src, table, load_type, watermark, _, targetpath = row
        move_existing_files_to_archive(table)
        extract_and_save_to_landing(table, load_type, watermark)

[2026-01-02T10:01:49.639512] INFO - No existing files for table encounters
[2026-01-02T10:01:50.384143] INFO - Latest watermark for encounters: 1900-01-01 00:00:00
[2026-01-02T10:04:01.639040] ERROR - Error processing encounters: An error occurred while calling o238.load.
: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
	at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:165)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:55)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:840)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:416)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:237)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:180)
	at org.apache.spark

In [47]:
def save_logs_to_gcs():
    """Save logs to a JSON file and upload to GCS"""
    log_filename = f"pipeline_log_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    log_filepath = f"temp/pipeline_logs/{log_filename}"  
    
    json_data = json.dumps(log_entries, indent=4)

    # Get GCS bucket
    bucket = storage_client.bucket(GCP_BUCKET)
    blob = bucket.blob(log_filepath)
    
    # Upload JSON data as a file
    blob.upload_from_string(json_data, content_type="application/json")

    print(f"✅ Logs successfully saved to GCS at gs://{GCP_BUCKET}/{log_filepath}")

In [48]:
def save_logs_to_bigquery():
    """Save logs to BigQuery"""
    if log_entries:
        log_df = spark.createDataFrame(log_entries)
        log_df.write.format("bigquery") \
            .option("table", BQ_LOG_TABLE) \
            .option("temporaryGcsBucket", BQ_TEMP_PATH) \
            .mode("append") \
            .save()
        print("✅ Logs stored in BigQuery for future analysis")

In [49]:
save_logs_to_gcs()
save_logs_to_bigquery()

✅ Logs successfully saved to GCS at gs://healthcare-bucket-0201/temp/pipeline_logs/pipeline_log_20260102103028.json


                                                                                

✅ Logs stored in BigQuery for future analysis
