In [0]:
from pyspark.sql import SparkSession, Row
from datetime import datetime
import uuid
import traceback

# ---------------------------
# Initialize Spark Session
# ---------------------------
spark = SparkSession.builder.appName("HealthcareBronzeLayer").getOrCreate()

# ---------------------------
# Job ID for this run
# ---------------------------
job_id = str(uuid.uuid4())

# ---------------------------
# Logging function
# ---------------------------
def log_step(step_name, status="INFO", message=""):
    log_df = spark.createDataFrame([
        Row(
            job_id=job_id,
            timestamp=datetime.utcnow().isoformat(),
            step=step_name,
            status=status,
            message=message
        )
    ])
    (
       log_df.write
        .format("delta")
        .mode("append")
        .saveAsTable("logs.bronze_logs.log")
    )

try:
    log_step("START", "INFO", "Bronze layer ingestion started")

    # ---------------------------
    # Load Bronze patient data
    # ---------------------------
    bronze_df = spark.read.format("delta").load("s3://sai-bucket-hospital/delta/hearlth_records/2025-08-21/")
    log_step("LOAD_PATIENT", "INFO", f"Loaded {bronze_df.count()} rows from patient Bronze data")

    # ---------------------------
    # Load Bronze hospital data
    # ---------------------------
    bronze_df1 = spark.read.format("delta").load("s3://sai-bucket-hospital/delta/Hospital_details/")
    log_step("LOAD_HOSPITAL", "INFO", f"Loaded {bronze_df1.count()} rows from hospital Bronze data")

    # ---------------------------
    # Save patient Bronze table
    # ---------------------------
    bronze_df.write.format("delta").mode("overwrite").saveAsTable("processed_outputs.bronze.patient")
    log_step("SAVE_PATIENT", "INFO", "Patient Bronze table saved successfully")

    # ---------------------------
    # Save hospital Bronze table
    # ---------------------------
    bronze_df1.write.format("delta").mode("overwrite").saveAsTable("processed_outputs.bronze.hospital")
    log_step("SAVE_HOSPITAL", "INFO", "Hospital Bronze table saved successfully")

    log_step("END", "SUCCESS", "Bronze layer ingestion completed successfully ✅")

except Exception as e:
    log_step("ERROR", "FAIL", traceback.format_exc())
    raise
