**SIMPLE ETL TEMPLATE - MULTI-DATA SOURCE JSON VIA S3 + CSV**

In [0]:
%sql
---Create PATIENT_BRONZE Delta Table using SQL

CREATE TABLE IF NOT EXISTS patient_bronze (
    patient_id           STRING,
    member_email         STRING,
    first_name           STRING,
    last_name            STRING,
    date_of_birth        STRING,
    gender               STRING,
    address              STRING,
    city                 STRING,
    state                STRING,
    zip_code             STRING,
    phone_number         STRING,
    creation_date        STRING,  
    last_visit_date      STRING,  
    last_known_ip        STRING   
)
USING DELTA
LOCATION '/mnt/datalake/patient_bronze/'
TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true'
);


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

bronze_schema = 
    patient_id           STRING,
    member_email         STRING,
    first_name           STRING,
    last_name            STRING,
    date_of_birth        STRING,
    gender               STRING,
    address              STRING,
    city                 STRING,
    state                STRING,
    zip_code             STRING,
    phone_number         STRING,
    creation_date        STRING, 
    last_visit_date      STRING,  
    last_known_ip        STRING 

raw_path = "s3a://your-generic-bucket/raw/patient_json/"

bronze_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema(bronze_schema)
    .load(raw_path)
)

bronze_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/mnt/datalake/_checkpoints/patient_bronze/") \
    .outputMode("append") \
    .start("/mnt/datalake/patient_bronze/")

In [0]:
%sql
SELECT * FROM patient_bronze LIMIT 10

In [0]:
# Clean and Anonymize to patient_Silver Table

from pyspark.sql.functions import sha2, concat_ws, to_date

patient_bronze_df = spark.read.format("delta").load("/mnt/datalake/patient_bronze/")

patient_silver_df = patient_bronze_df \
    .withColumn("email", sha2(col("email"), 256)) \ 
    .withColumn("creation_date", to_date("creation_date", "yyyy-MM-dd")) \
    .withColumn("last_activity_date", to_date("last_activity_date", "yyyy-MM-dd"))

patient_silver_df.write.format("delta").mode("overwrite").save("/mnt/datalake/patient_silver/")
spark.sql("CREATE TABLE IF NOT EXISTS patient_silver USING DELTA LOCATION '/mnt/datalake/patient_silver/'")

In [0]:
# Null check
assert patient_silver_df.filter("email IS NULL").count() == 0, "Missing emails found"

# Date format check
invalid_dates = patient_silver_df.filter(col("creation_date").isNull() | col("last_activity_date").isNull())
assert invalid_dates.count() == 0, "Date conversion issues found"

# Duplicate email check
email_count = patient_silver_df.count()
distinct_email_count = patient_silver_df.select("email").distinct().count()
assert email_count == distinct_email_count, "Duplicate emails found"

In [0]:
# Ingest customer claims data (CSV to claims_silver)

claims_schema = [
    ("claim_id", "STRING"),
    ("member_id", "STRING"),
    ("member_email", "STRING"),
    ("claim_type", "STRING"),  # e.g., inpatient, outpatient, pharmacy
    ("provider_id", "STRING"),
    ("provider_name", "STRING"),
    ("service_start_date", "STRING"),
    ("service_end_date", "STRING"),
    ("claim_submission_date", "STRING"),
    ("claim_status", "STRING"),  # e.g., approved, denied, pending
    ("diagnosis_code", "STRING"),  # ICD-10 codes
    ("procedure_code", "STRING"),  # CPT/HCPCS codes
    ("billed_amount", "FLOAT"),
    ("allowed_amount", "FLOAT"),
    ("paid_amount", "FLOAT"),
    ("patient_responsibility", "FLOAT"),
    ("last_processed_date", "STRING"),
    ("payer_name", "STRING")
]
csv_path = "/mnt/datalake/raw/patient_claims_csv/"

claims_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", True)
    .schema(claims_schema)
    .load(csv_path)
)

claims_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/mnt/datalake/_checkpoints/claims_silver/") \
    .outputMode("append") \
    .start("/mnt/datalake/claims_silver/")

spark.sql("CREATE TABLE IF NOT EXISTS claims_silver USING DELTA LOCATION '/mnt/datalake/claims_silver/'")

In [0]:
assert claims_df.columns is not None, "Claims CSV missing columns"
assert claims_df.count() > 0, "Claims data is empty"

In [0]:
# Join patient_silver and claims_silver into patient_claims_gold

patient_silver_df = spark.read.format("delta").load("/mnt/datalake/patient_silver/")
claims_silver_df = spark.read.format("delta").load("/mnt/datalake/claims_silver/")

patient_claims_gold_df = patient_silver_df.join(claims_silver_df, on="email", how="inner")

patient_claims_gold_df.write.format("delta").mode("overwrite").save("/mnt/datalake/patient_gold/")
spark.sql("CREATE TABLE IF NOT EXISTS patient_gold USING DELTA LOCATION '/mnt/datalake/patient_gold/'")


In [0]:
%sql
---perform any additinoal DML operations for further transformations.

In [0]:
# Join integrity check
assert patient_claims_gold_df.count() > 0, "Gold table has no data after join"

# Optional: sample metric check
if "total_claims" in patient_claims_gold_df.columns:
    assert patient_claims_gold_df.filter(col("total_claims") < 0).count() == 0, "Invalid claims values detected"

print("All validations passed!")

In [0]:
%sql
---Final tables ready to be used to be for consumption (Dashboards, ML models, Analytics, etc) 