Step 1: Verify the Flatten Logic
Build and validate the logic in Python before converting to Spark.

In [0]:
import json

# Load the JSON file
with open('negotiated_rates.json', 'r') as f:
    data = json.load(f)

# Preview the top-level keys
print('Top-level keys:', list(data.keys()))
print('Number of out_of_network entries:', len(data['out_of_network']))
print()

# Preview one entry to understand the structure
print(json.dumps(data['out_of_network'][0], indent=2))

In [0]:
# Flatten the nested JSON into a list of dictionaries

#   1. Each item in out_of_network
#   2. Each item in allowed_amounts (different service_code / provider groups)
#   3. Each provider in payments.providers, and each npi within that provider

flat_rows = []

for procedure in data['out_of_network']:
    # Pull the procedure-level fields
    name            = procedure['name']
    billing_code_type = procedure['billing_code_type']
    billing_code    = procedure['billing_code']
    description     = procedure['description']

    # Explode level 1: each allowed_amount entry
    for amount in procedure['allowed_amounts']:
        service_code    = amount['service_code']
        billing_class   = amount['billing_class']
        allowed_amount  = amount['payments']['allowed_amount']

        # Explode level 2: each provider within this amount
        for provider in amount['payments']['providers']:
            billed_charge = provider['billed_charge']

            # Explode level 3: each NPI in the provider's npi list
            for npi in provider['npi']:
                flat_rows.append({
                    'name':              name,
                    'billing_code_type': billing_code_type,
                    'billing_code':      billing_code,
                    'description':       description,
                    'service_code':      service_code,
                    'billing_class':     billing_class,
                    'allowed_amount':    allowed_amount,
                    'billed_charge':     billed_charge,
                    'npi':               int(npi)   # Cast to int to drop the .0
                })

print(f'Total flattened rows: {len(flat_rows)}')
print()

# Display all rows for verification
for i, row in enumerate(flat_rows):
    print(f'--- Row {i} ---')
    for k, v in row.items():
        print(f'  {k:20s} {v}')
    print()

Step 2: Convert to Spark

In [0]:
from pyspark.sql.functions import explode, col

# Step 2a: Read JSON - use multiline=True since this is a single JSON object
df = spark.read.option("multiLine", "true").json('/Workspace/Users/jay.kline21@outlook.com/data-5035-2026/week03/negotiated_rates.json')

# Look at the schema that Spark inferred
df.printSchema()

In [0]:
# Step 2b: Explode out_of_network list into one row per procedure
df_procedures = df_raw.select(
    explode(col('out_of_network')).alias('procedure')
)

df_procedures.printSchema()
df_procedures.show(truncate=False)

In [0]:
# Step 2c: Pull procedure-level fields and explode allowed_amounts
df_amounts = df_procedures.select(
    col('procedure.name').alias('name'),
    col('procedure.billing_code_type').alias('billing_code_type'),
    col('procedure.billing_code').alias('billing_code'),
    col('procedure.description').alias('description'),
    explode(col('procedure.allowed_amounts')).alias('amount')
)

df_amounts.printSchema()
df_amounts.show(truncate=False)

In [0]:
# Step 2d: Pull amount-level fields and explode providers
df_providers = df_amounts.select(
    col('name'),
    col('billing_code_type'),
    col('billing_code'),
    col('description'),
    col('amount.service_code').alias('service_code'),
    col('amount.billing_class').alias('billing_class'),
    col('amount.payments.allowed_amount').alias('allowed_amount'),
    explode(col('amount.payments.providers')).alias('provider')
)

df_providers.printSchema()
df_providers.show(truncate=False)

In [0]:
# Step 2e: Pull provider-level fields and explode npi list
df_npis = df_providers.select(
    col('name'),
    col('billing_code_type'),
    col('billing_code'),
    col('description'),
    col('service_code'),
    col('billing_class'),
    col('allowed_amount'),
    col('provider.billed_charge').alias('billed_charge'),
    explode(col('provider.npi')).alias('npi')
)

df_npis.printSchema()
df_npis.show(truncate=False)

In [0]:
# Step 2f: Cast NPI
df_final = df_npis.withColumn('npi', col('npi').cast('long'))

# Display the final table
df_final.show(truncate=False)

print(f'Total rows: {df_final.count()}')
df_final.printSchema()

In [0]:
# Build a Spark DataFrame from the Python list
df_python = spark.createDataFrame(flat_rows)

print('Python row count: ', df_python.count())
print('Spark  row count: ', df_final.count())
print()

# Sort both by the same columns so ordering is deterministic, then compare
sort_cols = ['billing_code', 'service_code', 'npi']

df_python_sorted = df_python.orderBy(sort_cols)
df_spark_sorted  = df_final.orderBy(sort_cols)

print('--- Python Output ---')
df_python_sorted.show(truncate=False)

print('--- Spark Output ---')
df_spark_sorted.show(truncate=False)

In [0]:
# Just call display() on your final dataframe
display(df_final)