In [21]:
# import all the modules 

from google.cloud import storage, bigquery
import pandas as pd
from pyspark.sql import SparkSession 
import datetime
import json

In [22]:
#Initialize GCS & Bigquery clients
storage_client = storage.Client()
bq_client = bigquery.Client()

# Initialze the SparkSession. 
spark = SparkSession.builder.appName("HospitalMySQLToLanding").getOrCreate()

25/08/04 13:39:54 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
25/08/04 13:39:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
25/08/04 13:39:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
25/08/04 13:39:55 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [26]:
# SetUp Variables from source to sink 

# google Cloud Storage Configurations 
GCS_BUCKET = "healthcare-bucket-192"
HOSPITAL_NAME= "hospital-a"
LANDING_PATH= f"gs://{GCS_BUCKET}/landing/{HOSPITAL_NAME}/"
ARCHIEVE_PATH= f"gs://{GCS_BUCKET}/landing/{HOSPITAL_NAME}/archive"
CONFIG_FILE_PATH= f"gs://{GCS_BUCKET}/configs/load_config.csv"

In [27]:
# Bigquery configurations

BQ_PROJECT= "gcpdataengineering-467713"
BQ_AUDIT_TABLE= f"{BQ_PROJECT}.temp.audit_log"
BQ_LOG_TABLE= f"{BQ_PROJECT}.temp.pipeline_logs"
BQ_TEMP_PATH= f"{GCS_BUCKET}/temp/"

In [28]:
# SQL COnfigurations

MYSQL_CONFIG={
    "url": "jdbc:mysql://34.9.188.225:3306/hospital_a_db?useSSL=false&allowPublicKeyRetrieval=true",
    "driver": "com.mysql.cj.jdbc.Driver",
    "user": "myuser",
    "password": "Dishu_192"
}

In [29]:
#Logging Mechanism 

log_entries=[]
def log_event(event_type, message, table=None):
    """Log an event store it in the log list"""
    log_entry={
        "timestamp" : datetime.datetime.now().isoformat(),
        "event_type" : event_type,
        "message" : message,
        "table" : table
    }
    log_entries.append(log_entry)
    print(f"[{log_entry['timestamp']}]{event_type}-{message}")  #Print for visibility

In [30]:
#read Config file and create a dataframe out of this 

def read_config_file():
    df=spark.read.csv(CONFIG_FILE_PATH,header=True)
    log_event("INFO"," ✅ Successfully read the log file")
    return df

config_df=read_config_file()
config_df.show()


                                                                                

[2025-08-04T13:40:13.320548]INFO- ✅ Successfully read the log file
+-------------------+-------------+------------+-----------+------------+---------+----------+
|           database|   datasource|   tablename|   loadtype|   watermark|is_active|targetpath|
+-------------------+-------------+------------+-----------+------------+---------+----------+
|hospital-a-mysql-db|hospital_a_db|  encounters|Incremental|ModifiedDate|        1|hospital-a|
|hospital-a-mysql-db|hospital_a_db|    patients|Incremental|ModifiedDate|        1|hospital-a|
|hospital-a-mysql-db|hospital_a_db|transactions|Incremental|ModifiedDate|        1|hospital-a|
|hospital-a-mysql-db|hospital_a_db|   providers|       Full|        null|        1|hospital-a|
|hospital-a-mysql-db|hospital_a_db| departments|       Full|        null|        1|hospital-a|
|hospital-b-mysql-db|hospital_b_db|  encounters|Incremental|ModifiedDate|        1|hospital-b|
|hospital-b-mysql-db|hospital_b_db|    patients|Incremental|ModifiedDate|     

In [31]:
def move_existing_files_to_archive(table):
    try:
        prefix = f"landing/{HOSPITAL_NAME}/{table}/"
        blobs = list(storage_client.bucket(GCS_BUCKET).list_blobs(prefix=prefix))
        existing_files = [blob.name for blob in blobs if blob.name.endswith(".json")]
        
        if not existing_files:
            log_event("INFO", f"No existing files for the table {table}", table=table)
            return
        
        for file in existing_files:
            source_blob = storage_client.bucket(GCS_BUCKET).blob(file)

            # Extract the date from filename pattern: table_ddmmyy.json
            filename = file.split("/")[-1]
            date_str = filename.replace(f"{table}_", "").replace(".json", "")
            
            if len(date_str) != 6:
                log_event("WARNING", f"Filename date format incorrect for: {filename}", table=table)
                continue

            day, month, year = date_str[:2], date_str[2:4], date_str[4:]

            # Archive path
            archive_path = f"landing/{HOSPITAL_NAME}/archive/{table}/{year}/{month}/{day}/{filename}"
            destination_blob = storage_client.bucket(GCS_BUCKET).blob(archive_path)

            # Copy and delete
            storage_client.bucket(GCS_BUCKET).copy_blob(source_blob, storage_client.bucket(GCS_BUCKET), destination_blob.name)
            source_blob.delete()

            log_event("INFO", f"Moved {file} to {archive_path}", table=table)
    
    except Exception as e:
        log_event("ERROR", f"Failed to move files to archive for table {table}: {str(e)}", table=table)


In [32]:
def get_latest_watermark(table_name):
    try:
        query = f"""
            SELECT MAX(load_timestamp) AS latest_timestamp
            FROM `{BQ_AUDIT_TABLE}`
            WHERE tablename = '{table_name}' AND data_source = 'hospital_a_db'
        """
        query_job = bq_client.query(query)
        result = query_job.result()
        
        for row in result:
            return row.latest_timestamp.strftime('%Y-%m-%d %H:%M:%S') if row.latest_timestamp else "1900-01-01 00:00:00"
        
        return "1900-01-01 00:00:00"
    
    except Exception as e:
        log_event("ERROR", f"Failed to get watermark for {table_name}: {str(e)}", table=table_name)
        return "1900-01-01 00:00:00"


In [33]:
def extract_and_save_to_landing(table, load_type, watermark_col):
    try:
        # Get latest watermark if incremental
        last_watermark = get_latest_watermark(table) if load_type.lower() == 'incremental' else None
        log_event("INFO", f"Latest watermark for {table}: {last_watermark}", table=table)
        
        # Build the query
        query = f"(SELECT * FROM {table}) AS t" if load_type.lower() == "full" else \
                f"(SELECT * FROM {table} WHERE {watermark_col} > '{last_watermark}') AS t"
        
        # Read data from MySQL
        df = (spark.read.format("jdbc")
              .option("url", MYSQL_CONFIG["url"])
              .option("user", MYSQL_CONFIG["user"])
              .option("password", MYSQL_CONFIG["password"])
              .option("driver", MYSQL_CONFIG["driver"])
              .option("dbtable", query)
              .load())

        log_event("SUCCESS", f"Successfully extracted data from {table}", table=table)
        
        # Prepare file path and upload to GCS
        today = datetime.datetime.today().strftime('%d%m%y')
        JSON_FILE_PATH = f"landing/{HOSPITAL_NAME}/{table}/{table}_{today}.json"

        bucket = storage_client.bucket(GCS_BUCKET)
        blob = bucket.blob(JSON_FILE_PATH)
        # Using Pandas because spark gnerated success files and log files 
        blob.upload_from_string(df.toPandas().to_json(orient="records", lines=True), content_type="application/json")
        
        log_event("SUCCESS", f"JSON file successfully written to gs://{GCS_BUCKET}/{JSON_FILE_PATH}", table=table)
        
        # Prepare and write audit entry to BigQuery
        audit_df = spark.createDataFrame([
            ("hospital_a_db", table, load_type, df.count(), datetime.datetime.now(), "SUCCESS")
        ], ["data_source", "tablename", "load_type", "record_count", "load_timestamp", "status"])

        (audit_df.write.format("bigquery")
         .option("table", BQ_AUDIT_TABLE)
         .option("temporaryGcsBucket", GCS_BUCKET)
         .mode("append")
         .save())

        log_event("SUCCESS", f"Audit log updated for {table}", table=table)

    except Exception as e:
        log_event("ERROR", f"Error processing {table}: {str(e)}", table=table)

        
# Whats done is variables, config files, logging, exceptions, error handling, reading wroting from multiple sources 

In [34]:
def save_logs_to_gcs(log_entries):
    """Saving logs to GCS"""
    log_filename = f"pipeline_log_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.json"
    log_filepath = f"temp/pipeline_logs/{log_filename}"
    
    json_data = json.dumps(log_entries, indent=4)
    
    # Get GCS bucket and blob reference
    bucket = storage_client.bucket(GCS_BUCKET)
    blob = bucket.blob(log_filepath)
    
    # Upload JSON data as a file
    blob.upload_from_string(json_data, content_type="application/json")
    
    print(f"Logs successfully saved to GCS at gs://{GCS_BUCKET}/{log_filepath}")


In [35]:
def save_logs_to_bigquery():
    """Saving logs to BigQuery"""
    if log_entries:
        log_df = spark.createDataFrame(log_entries)
        log_df.write.format("bigquery") \
            .option("table", BQ_LOG_TABLE) \
            .option("temporaryGcsBucket", GCS_BUCKET) \
            .mode("append") \
            .save()
        print("Logs stored in BigQuery for future analysis")


In [None]:
# Ingest only active tables from config for 'hospital_a_db'

for row in config_df.collect():
    if row["is_active"] == '1' and row["datasource"] == 'hospital_a_db':
        db, src, table, load_type, watermark, _, targetpath = row
        
        # Step 1: Move any previous JSON files to archive
        move_existing_files_to_archive(table)
        
        # Step 2: Extract new data and write to landing zone
        extract_and_save_to_landing(table, load_type, watermark)

save_logs_to_gcs ()
save_logs_to_bigquery()

In [None]:
spark.stop()