In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Healthcare Export") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "/Users/neelkalavadiya/Practicum_Project_Local/iceberg_warehouse") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()

# Suppress all WARNs logs
spark.sparkContext.setLogLevel("ERROR")

25/05/05 12:09:32 WARN Utils: Your hostname, NEELs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.190.186.140 instead (on interface en0)
25/05/05 12:09:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/05 12:09:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read the JSON
json_path = "/Users/neelkalavadiya/Practicum_Project_Local/iceberg_warehouse/raw_dataset/TX/vernon_memorial.csv"
df = spark.read.option("multiline", "true").json(json_path)
df.printSchema()

                                                                                

root
 |-- affirmation: struct (nullable = true)
 |    |-- affirmation: string (nullable = true)
 |    |-- confirm_affirmation: boolean (nullable = true)
 |-- hospital_address: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hospital_location: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- hospital_name: string (nullable = true)
 |-- last_updated_on: string (nullable = true)
 |-- license_information: struct (nullable = true)
 |    |-- license: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- pId: string (nullable = true)
 |-- standard_charge_information: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code_information: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |-- modifiers: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true

In [4]:
from pyspark.sql.functions import col, explode, when, udf, coalesce
from pyspark.sql.types import StringType

manual_pids = {
    "Kindred Hospital Ontario": "106361274",
    "RIVERSIDE COMMUNITY HOSPITAL":"050022"
}

@udf(StringType())
def assign_pid(hospital_name):
    return manual_pids.get(hospital_name)

# Step 3: Check if 'pId' exists
if 'pId' in df.columns:
    df_with_pid = df.withColumn(
        "provider_id",
        when(col("pId").isNotNull(), col("pId")).otherwise(assign_pid(col("hospital_name")))
    )
else:
    df_with_pid = df.withColumn(
        "provider_id", 
        assign_pid(col("hospital_name"))
    )n

license_fields = df.select("license_information.*").schema.names
if "license" in license_fields:
    license_col = "license"
elif "license_number" in license_fields:
    license_col = "license_number"
elif "hospital_license_number" in license_fields:
    license_col = "hospital_license_number"
else:
    raise ValueError("❌ No valid license field found in license_information")

# Step 4: Explode and select
df_exploded = df_with_pid.select(
    col("provider_id"),
    col("hospital_name"),
    col("hospital_address")[0].alias("hospital_address"),
    col("hospital_location")[0].alias("hospital_location"),
    col("last_updated_on"),
    col(f"license_information.{license_col}").alias("license_number"),
    col("license_information.state").alias("license_state"),
    explode("standard_charge_information").alias("sci")
)

In [4]:
#Explode code_information
df_code = df_exploded.select(
    "*",
    col("sci.description").alias("service_description"),
    explode("sci.code_information").alias("code_info"),
    col("sci.standard_charges").alias("charge_arr")
)

In [5]:
#Explode standard_charges
df_charges = df_code.select(
    "*",
    col("code_info.code").alias("code"),
    col("code_info.type").alias("code_type"),
    explode("charge_arr").alias("charge")
)

In [6]:
#Explode payers_information
df_payer = df_charges.select(
    "provider_id", "hospital_name", "hospital_address", "hospital_location",
    "last_updated_on", "license_number", "license_state", "service_description",
    "code", "code_type",
    col("charge.setting").alias("care_setting"),
    col("charge.gross_charge"),
    col("charge.discounted_cash"),
    col("charge.minimum").alias("min_charge"),
    col("charge.maximum").alias("max_charge"),
    explode("charge.payers_information").alias("payer")
)

In [7]:
payer_fields = df_payer.select("payer.*").schema.names
dollar_col = "standard_charge_dollar" if "standard_charge_dollar" in payer_fields else "estimated_amount"

In [8]:
df_final_flat = df_payer.select(
    "*",
    col("payer.payer_name"),
    col("payer.plan_name"),
    col("payer.methodology"),
    col(f"payer.{dollar_col}").alias("negotiated_dollar_amount"),
    col("payer.additional_payer_notes")
)

In [9]:
df_final_flat.show(10)

                                                                                

+-----------+--------------------+--------------------+--------------------+---------------+--------------+-------------+--------------------+--------------------+---------+------------+------------+---------------+----------+----------+--------------------+--------------------+------------------+--------------------+------------------------+----------------------+
|provider_id|       hospital_name|    hospital_address|   hospital_location|last_updated_on|license_number|license_state| service_description|                code|code_type|care_setting|gross_charge|discounted_cash|min_charge|max_charge|               payer|          payer_name|         plan_name|         methodology|negotiated_dollar_amount|additional_payer_notes|
+-----------+--------------------+--------------------+--------------------+---------------+--------------+-------------+--------------------+--------------------+---------+------------+------------+---------------+----------+----------+--------------------+------

In [10]:
parquet_path = "/Users/neelkalavadiya/Practicum_Project_Local/iceberg_warehouse/checkpoint_parquet/ingestion/SantaRosa_Hospital_MedicalCenter_SA_TX.parquet"
df_final_flat.write.mode("overwrite").parquet(parquet_path)


                                                                                

In [12]:
parquet_path = "/Users/neelkalavadiya/Practicum_Project_Local/iceberg_warehouse/checkpoint_parquet/ingestion/SantaRosa_Hospital_MedicalCenter_SA_TX.parquet"
df_read_back = spark.read.parquet(parquet_path)

In [14]:
spark.sql("DROP TABLE IF EXISTS local.bronze.SantaRosa_Hospital_MedicalCenter_SA_TX")


DataFrame[]

In [15]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS local.bronze")


DataFrame[]

In [16]:
df_read_back.writeTo("local.bronze.SantaRosa_Hospital_MedicalCenter_SA_TX") \
    .using("iceberg") \
    .tableProperty("format-version", "2") \
    .createOrReplace()


                                                                                

In [2]:
spark.sql("SHOW TABLES IN local.bronze").show(truncate=False)


+---------+----------------------------+-----------+
|namespace|tableName                   |isTemporary|
+---------+----------------------------+-----------+
|bronze   |baptist_medical_center_sa_tx|false      |
+---------+----------------------------+-----------+



                                                                                

In [3]:
spark.sql("Select plan_name,code, gross_charge, discounted_cash,standard_charge_percentage From local.bronze.baptist_medical_center_sa_tx where code = '0010U' LIMIT 10").show(truncate=False)


[Stage 1:>                                                          (0 + 1) / 1]

+--------------------------------+-----+------------+---------------+--------------------------+
|plan_name                       |code |gross_charge|discounted_cash|standard_charge_percentage|
+--------------------------------+-----+------------+---------------+--------------------------+
|United Health TX Mgd M/Care     |0010U|null        |null           |null                      |
|Humana Commercial               |0010U|null        |null           |null                      |
|Humana Commercial               |0010U|null        |null           |null                      |
|Molina Mgd M/Care Narrow Network|0010U|null        |null           |null                      |
|Wellpoint CHIP Mgd M/Caid       |0010U|null        |null           |null                      |
|Cigna Health Plan - New Business|0010U|null        |null           |null                      |
|Cigna Health Plan - HMO         |0010U|null        |null           |null                      |
|BCBS-TX Blue Advantage HIX   

                                                                                

In [13]:
spark.sql("Select code, gross_charge, min_charge, max_charge, discounted_cash,max_charge,standard_charge_percentage From local.silver.baptist_medical_center_sa_tx where code = '0007U' LIMIT 10").show(truncate=False)


+-----+------------+----------+----------+---------------+----------+--------------------------+
|code |gross_charge|min_charge|max_charge|discounted_cash|max_charge|standard_charge_percentage|
+-----+------------+----------+----------+---------------+----------+--------------------------+
|0007U|203.0       |1         |405       |203.0          |405       |null                      |
|0007U|203.0       |1         |405       |203.0          |405       |82.26600985221675         |
|0007U|203.0       |1         |405       |203.0          |405       |56.15763546798029         |
|0007U|203.0       |1         |405       |203.0          |405       |38.91625615763547         |
|0007U|203.0       |1         |405       |203.0          |405       |6200.0                    |
|0007U|203.0       |1         |405       |203.0          |405       |37.4384236453202          |
|0007U|203.0       |1         |405       |203.0          |405       |null                      |
|0007U|203.0       |1         

In [6]:
tables = spark.sql("SHOW TABLES IN local.bronze").collect()

for row in tables:
    table_name = row["tableName"]
    full_name = f"local.gold.{table_name}"
    print(f"Dropping table: {full_name}")
    spark.sql(f"DROP TABLE IF EXISTS {full_name}")


Dropping table: local.gold.dim_hospital
Dropping table: local.gold.dim_payer
Dropping table: local.gold.dim_procedure
Dropping table: local.gold.fact_charges


In [7]:
# Extract provider_id from the JSON
provider_id = df_final_flat.select("provider_id").first()["provider_id"]
print(provider_id)

[Stage 1:>                                                          (0 + 1) / 1]

1514675685


                                                                                

In [8]:
# Read all existing tables in bronze namespace and check if provider_id already ingested
table_df = spark.sql("SHOW TABLES IN local.bronze").select("tableName").collect()
print(table_df)

[Row(tableName='missiontrail_baptist_hospital_sa_tx'), Row(tableName='baptist_medical_center_sa_tx')]


In [19]:
for row in table_df:
    print(row)
    existing_table = f"local.bronze.{row['tableName']}"
    print(existing_table)
    existing_df = spark.read.table(existing_table)
    print(existing_df.select("provider_id").first()["provider_id"])

    
    

Row(tableName='missiontrail_baptist_hospital_sa_tx')
local.bronze.missiontrail_baptist_hospital_sa_tx
1514676722
Row(tableName='baptist_medical_center_sa_tx')
local.bronze.baptist_medical_center_sa_tx
1514675685
