# 🧠 ClaimGuard: Delta Lake Storage & Querying (Spark + PySpark)

This notebook demonstrates how to store, query, and manipulate CMS healthcare data using **Delta Lake format** in PySpark, enabling efficient, versioned, and production-grade data workflows.

---

## ⚙️ 1. Setup: Spark + Delta Lake in Google Colab
Install Java, Apache Spark, Delta Lake, and required Python packages.


In [None]:
# Install Java, Spark, Delta Lake, and dependencies
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar -xzf spark-3.3.2-bin-hadoop3.tgz
!pip install -q findspark
!pip install -q delta-spark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"
#Adding PYSPARK_SUBMIT_ARGS to env variables with packages to include
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.2.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'


# Initialize Spark
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ClaimGuard Delta") \
    .master("local[*]") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()


### 📥 Load and Convert Cleaned Data

Read the cleaned CMS dataset from Parquet format and save it in Delta Lake format for scalable storage and downstream querying.

In [None]:
df_spark = spark.read.parquet("cleaned_claimguard_data.parquet")
df_spark.show(5)

+------------+-------------------------+-------------------+--------------------+--------------------+--------------------+-------------+---------------------------+-------------------+-------------+------------------+-------------------------+----------------+-------------+-------------------------------+----------+--------------------+--------------------+----------------+-------------------+--------------+------------------------------+------------------------+-------------------------------+-------------------------------+------------------------------------+---------------------+------------------+-----------------+-----------+----------+--------------------------+-------------+----+-----------------------+------------------------+
|Provider_NPI|Provider_Last_Or_Org_Name|Provider_First_Name|Provider_Credentials|Provider_Entity_Code|    Provider_Street1|Provider_City|Provider_State_Abbreviation|Provider_State_FIPS|Provider_Zip5|Provider_RUCA_Code|Provider_RUCA_Description|Provider_

### Load and Explore Delta Table

Read the saved Delta table and inspect sample rows to ensure data loaded correctly.

In [None]:
delta_path = "/content/claimguard_delta"
df_spark.write.format("delta").mode("overwrite").save(delta_path)
print("✅ Saved as Delta at:", delta_path)

✅ Saved as Delta at: /content/claimguard_delta


In [None]:
df_delta = spark.read.format("delta").load(delta_path)
df_delta.select("Provider_Type", "Average_Medicare_Allowed_Amount").show(5)

+-------------+-------------------------------+
|Provider_Type|Average_Medicare_Allowed_Amount|
+-------------+-------------------------------+
|           62|                   3.4457692308|
|           62|                   29.115454545|
|           62|                   9.5868965517|
|           62|                   16.585263158|
|           62|                   7.6710526316|
+-------------+-------------------------------+
only showing top 5 rows



###  Basic Aggregations

Run grouped queries to summarize provider behavior by `Provider_Type`, such as average allowed amount and total services.

In [None]:
df_delta.groupBy("Provider_Type").agg(
    {"Average_Medicare_Allowed_Amount": "mean", "Total_Services": "sum"}
).show(10)


+-------------+-------------------+------------------------------------+
|Provider_Type|sum(Total_Services)|avg(Average_Medicare_Allowed_Amount)|
+-------------+-------------------+------------------------------------+
|           31|          2038857.5|                  100.67739697534164|
|           85|  5335348.600000001|                  105.69917522633529|
|           65|5.522865610000001E7|                   172.1520481641235|
|           53|       6.53324632E7|                   75.53394038264204|
|           78|      1.455997641E8|                   41.45128302260158|
|           34|          2603854.6|                  134.65000216499647|
|          101|       4.62016059E7|                   133.0504833698309|
|           81|       2.73132279E7|                   80.20079915324428|
|           28|         1.626272E7|                  137.31836290814545|
|           76|       3.71796642E7|                   45.98524676843427|
+-------------+-------------------+----------------

### Clean and Overwrite Delta Table

Remove rows with `Unknown` provider types and overwrite the Delta table with cleaned data.


In [None]:
df_filtered = df_delta.filter("Provider_Type != 'Unknown'")
df_filtered.write.format("delta").mode("overwrite").save(delta_path)

In [None]:
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
df_v0.show(5)

+------------+-------------------------+-------------------+--------------------+--------------------+----------------+-------------+---------------------------+-------------------+-------------+------------------+-------------------------+----------------+-------------+-------------------------------+----------+--------------------+--------------------+----------------+-------------------+--------------+------------------------------+------------------------+-------------------------------+-------------------------------+------------------------------------+---------------------+-----------------+------------------+------------+----------+--------------------------+-------------+----+-----------------------+------------------------+
|Provider_NPI|Provider_Last_Or_Org_Name|Provider_First_Name|Provider_Credentials|Provider_Entity_Code|Provider_Street1|Provider_City|Provider_State_Abbreviation|Provider_State_FIPS|Provider_Zip5|Provider_RUCA_Code|Provider_RUCA_Description|Provider_Country

### Update and Delete Rows

Use DeltaTable API to:
- Update: change provider type from `'Unknown'` to `'Other'`
- Delete: remove rows where `Average_Medicare_Payment_Amount = 0`


In [None]:
from delta.tables import DeltaTable

# Load table as DeltaTable object (for update/delete support)
delta_tbl = DeltaTable.forPath(spark, delta_path)

# Update rows where provider is "Unknown"
delta_tbl.update(
    condition="Provider_Type = 'Unknown'",
    set={"Provider_Type": "'Other'"}
)


In [None]:
# Delete rows where Medicare payment amount is zero
delta_tbl.delete("Average_Medicare_Payment_Amount = 0")

This command removes rows from the Delta table. Great for filtering out uninformative or dirty data.

### Append New Data

Simulate real-world batch ingestion by appending new rows to the Delta table.


In [None]:
# Let's say df_new is a new Spark DataFrame you generated
df_spark.limit(100).write.format("delta").mode("append").save(delta_path)

This adds new rows to the Delta table, creating a new version. Very useful in pipelines that collect data over time.

### Schema Evolution (Add New Columns)

Add a new derived column to the dataset and overwrite the Delta table using schema evolution to retain flexibility.


In [None]:
# Overwrite with a new column added
df_new = df_spark.withColumn("New_Feature", df_spark["Total_Services"] * 0.1)

df_new.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save(delta_path)

### Final Summary Queries

Analyze the updated Delta table by:
- Listing top provider types by frequency
- Showing average allowed amounts by provider type


In [None]:
# Top provider types by frequency
df_delta.groupBy("Provider_Type").count().orderBy("count", ascending=False).show(10)

# Average billing amount by provider type
df_delta.groupBy("Provider_Type").agg({
    "Average_Medicare_Allowed_Amount": "mean"
}).orderBy("avg(Average_Medicare_Allowed_Amount)", ascending=False).show(10)


+-------------+-------+
|Provider_Type|  count|
+-------------+-------+
|           24|1137764|
|           44| 868292|
|           62| 851917|
|           27| 813473|
|           79| 518182|
|           11| 385653|
|           78| 363800|
|           69| 268450|
|           65| 225800|
|           25| 225054|
+-------------+-------+
only showing top 10 rows

+-------------+------------------------------------+
|Provider_Type|avg(Average_Medicare_Allowed_Amount)|
+-------------+------------------------------------+
|            6|                  1340.2382018495653|
|           10|                  438.32553001481483|
|            3|                   393.0762001347046|
|           97|                  391.60537250813576|
|            5|                   364.6944896931984|
|           89|                   342.8490343792653|
|           22|                  341.76157582184885|
|           60|                   316.0292935428091|
|          102|                   266.1219179049365|
| 

##  Conclusion

In this notebook, we demonstrated how to process, store, and manage healthcare claims data using **Delta Lake with PySpark**. Here's a quick summary:

###  What we did:
- Set up Spark and Delta Lake in a scalable, cloud-compatible environment
- Converted cleaned CMS data from Parquet to **Delta format**
- Ran aggregation queries and explored provider-level billing behavior
- Performed **row-level updates**, **deletes**, and **schema evolution**
- Simulated **data versioning (time travel)** and **batch appends**

###  Why we did it:
Traditional formats like CSV or plain Parquet lack the ability to handle updates, rollbacks, and large-scale queries efficiently. Delta Lake adds **transactional capabilities** to your data lake, making it suitable for production-grade pipelines.

###  How this helps:
Delta format provides:
- Fast reads/writes for large healthcare datasets
- Auditability via versioning
- Cleaner ML workflows with schema evolution and update support

###  Project Integration:
This Delta table will serve as the **source of truth** for the modeling and monitoring stages in the **ClaimGuard** pipeline — enabling efficient feature extraction, drift detection, and retraining using MLflow and XGBoost.

>  **Next step:** Feed this Delta-backed dataset into an ML pipeline for embedding generation, service categorization, and billing prediction.

