In [0]:
from pyspark.sql.functions import *


# 1) Set path and read JSON

# multiLine=True because JSON is nested over multiple lines
path = "/Workspace/Users/sayed@wustl.edu/data_5035_2026/week03/negotiated_rates.json"
df = spark.read.option("multiLine", True).json(path)


# 2) Flatten nested JSON

flat_df = (
    df
    # Explode 'out_of_network' array: one row per service
    .select(F.explode("out_of_network").alias("OutOfNetwork"))
    
    # Extract top-level fields and explode 'allowed_amounts': one row per allowed amount
    .select(
        F.col("OutOfNetwork.name").alias("name"),
        F.col("OutOfNetwork.billing_code_type").alias("billing_code_type"),
        F.col("OutOfNetwork.billing_code").alias("billing_code"),
        F.col("OutOfNetwork.description").alias("description"),
        F.explode("OutOfNetwork.allowed_amounts").alias("amounts")
    )
    
    # Extract allowed_amount fields and explode 'providers': one row per provider
    .select(
        "name",
        "billing_code_type",
        "billing_code",
        "description",
        F.col("amounts.service_code").alias("service_code"),
        F.col("amounts.billing_class").alias("billing_class"),
        F.col("amounts.payments.allowed_amount").alias("allowed_amount"),
        F.explode("amounts.payments.providers").alias("provider")
    )
    
    # Select final fields including first NPI
    .select(
        "name",
        "billing_code_type",
        "billing_code",
        "description",
        "service_code",
        "billing_class",
        "allowed_amount",
        F.col("provider.billed_charge").alias("billed_charge"),
        F.col("provider.npi")[0].alias("npi")
    )
)


# 3) Display flattened table


display(flat_df)
