# Data Engineering - Assignment 03: JSON Parsing
## Parsing Healthcare Price Transparency Data

**Author:** Greg Sullivan
**Course:** DATA 5035 - Data Engineering

### Objective
Parse nested JSON healthcare data from United Health Services and flatten it into a tabular format.


In [0]:
from pyspark.sql.functions import col, explode

# Once again, took me a while to figure how to access this file.  Actually,
# my challcnge was more about where to place the file.  I eventually 
# figured that out and it worked!
json_path = "/Volumes/workspace/default/data_files/negotiated_rates.json"

# I'm new to Spark, so rough coding to follow.  
# I'm sure there is a better way to do this.
# I'm also sure there is a better way to do the explode and cast.  
# I'm just not sure how to do it, but here we go...
#
# multiline accounts for the nesting of lines within lines
# - a deeply nested data structure
df_raw = spark.read.option("multiline", "true").json(json_path)

print("Schema of raw JSON:")
df_raw.printSchema()
display(df_raw)


For each array element of the out_of_network array, we create a new row.
Then, we extract the fields from each item:

  name - billing item name
  billing_code_type - CPT or HCPCS
  billing_code - the actual code
  description - what the code means
  allowed_amounts - still nested! We'll explode this next

This yields 6 rows (one per billing item), but allowed_amounts is still an array

In [0]:
# Will try the Explode capability from Spark
from pyspark.sql.functions import col, explode

# Explode the out_of_network array
df_items = (df_raw
    .select(explode(col("out_of_network")).alias("item"))
    .select(
        col("item.name").alias("name"),
        col("item.billing_code_type").alias("billing_code_type"),
        col("item.billing_code").alias("billing_code"),
        col("item.description").alias("description"),
        col("item.allowed_amounts").alias("allowed_amounts")
    )
)

print(f"Number of billing items: {df_items.count()}")
display(df_items)

Next, we take each of the 6 rows from the prior cell
We "explode" as each billing item might have multiple allowed amounts
  (different rates for different service codes)
For example, "97140" has 2 allowed amounts (service codes "11" and "12")

Extracts:
  service_code - identifies type of service (like location: office vs hospital)
  billing_class - professional vs institutional
  allowed_amount - the negotiated rate
  providers - still an array! We'll explode this next

This results in more rows now (some billing items had multiple allowed amounts), 
but providers is still an array

In [0]:
# Explode allowed_amounts array
df_amounts = (df_items
    .select(
        col("name"),
        col("billing_code_type"),
        col("billing_code"),
        col("description"),
        explode(col("allowed_amounts")).alias("amount_detail")
    )
    .select(
        col("name"),
        col("billing_code_type"),
        col("billing_code"),
        col("description"),
        col("amount_detail.service_code").alias("service_code"),
        col("amount_detail.billing_class").alias("billing_class"),
        col("amount_detail.payments.allowed_amount").alias("allowed_amount"),
        col("amount_detail.payments.providers").alias("providers")
    )
)

print(f"Number of allowed amount records: {df_amounts.count()}")
display(df_amounts)

Here is our third explosion
It takes each row from the prior cell
  explode(col("providers")) - Each allowed amount might apply to multiple providers

Then, we extract:
  billed_charge - what the provider actually charges (before negotiation)
  npi_array - still an array! One more explosion to go

This results in even MORE rows
  one per provider per allowed amount, but npi_array is still an array

In [0]:
# Explode providers array
df_providers = (df_amounts
    .select(
        col("name"),
        col("billing_code_type"),
        col("billing_code"),
        col("description"),
        col("service_code"),
        col("billing_class"),
        col("allowed_amount"),
        explode(col("providers")).alias("provider_detail")
    )
    .select(
        col("name"),
        col("billing_code_type"),
        col("billing_code"),
        col("description"),
        col("service_code"),
        col("billing_class"),
        col("allowed_amount"),
        col("provider_detail.billed_charge").alias("billed_charge"),
        col("provider_detail.npi").alias("npi_array")
    )
)

print(f"Number of provider records: {df_providers.count()}")
display(df_providers)

Now, take each of the newly created rows from prior explosion
  explode(col("npi_array")) accounting for the possibility that 
    each provider record might have multiple NPIs (National Provider Identifiers)

I note NPI is a double (all .0).  So, I recast as bigint to make it clear integer

This yields the individual NPI numbers

Voila!  Our Final flat table! 
  No more arrays. Every single nested element is now its own row.

In [0]:
# Explode npi array and cast to bigint
df_final = (df_providers
    .select(
        col("name"),
        col("billing_code_type"),
        col("billing_code"),
        col("description"),
        col("service_code"),
        col("billing_class"),
        col("allowed_amount"),
        col("billed_charge"),
        explode(col("npi_array")).alias("npi")
    )
    .withColumn("npi", col("npi").cast("bigint"))
)

print(f"Final flattened records: {df_final.count()}")
display(df_final)

Let's confirm we have the right structure

printSchema() - Shows the data types of each column
show() - Displays first 5 rows as a table

Do our columns match what we were looking for?  YES

This confirms we have the right structure!

In [0]:
# Verify schema
print("Final Schema:")
df_final.printSchema()

# Show sample
df_final.show(5, truncate=False)

# Verify columns match expected format
expected_columns = ['name', 'billing_code_type', 'billing_code', 'description', 
                   'service_code', 'billing_class', 'allowed_amount', 'billed_charge', 'npi']
print(f"\nColumns match expected: {df_final.columns == expected_columns}")

I'll show some summary stats not just for verification,
  but also clarification

count the unique billing codes (how many different procedures)
count the unique NPIs (how many different providers)
group by billing code type (CPT vs HCPCS)
show stats on dollar amounts (min, max, average, etc.)

This gives us insights about the data we just flattened

In [0]:
# Basic statistics
print("Unique billing codes:", df_final.select("billing_code").distinct().count())
print("Unique NPIs:", df_final.select("npi").distinct().count())

print("\nBilling Code Type Distribution:")
df_final.groupBy("billing_code_type").count().show()

print("\nAmount Statistics:")
df_final.select("allowed_amount", "billed_charge").describe().show()

# Concluding Remarks

This was all done with a small number of lines of code, which 
makes me curious about the full power or Spark in this use case.
I see how useful this can be in multi-layer nested data sets, but
also for very large data sets (nested or not).  I look forward
to learning more about using Spark.

No wonder it's nearly impossible to understand medical bills!!!