_Updated date: November 14, 2025_

# üéì Databricks Workshop: HQRI Risk Adjustment & Analytics
**Healthcare Quality Reporting & Improvement - DBSQL Analytics Workshop**

---

## üìö Workshop Objectives

By the end of this workshop, you will be able to:

1. ‚úÖ Build **Medallion Architecture** pipelines for risk adjustment data
2. ‚úÖ Calculate **HCC (Hierarchical Condition Category) risk scores** for Medicare Advantage
3. ‚úÖ Create **encounter datamart** tables for CMS submissions
4. ‚úÖ Perform **data quality audits** for regulatory compliance
5. ‚úÖ Build **Gold layer analytics** for revenue impact and Star Ratings
6. ‚úÖ Forecast risk scores using **Databricks SQL (DBSQL)**

---

## üè• HQRI Use Case: Medicare Risk Adjustment

**HQRI (Healthcare Quality Reporting & Improvement)** manages Humana's risk adjustment and quality reporting for Medicare Advantage:

- üí∞ **Risk Score Calculations**: Determine CMS payments based on member health status
- üìä **Encounter Datamart**: Aggregate and validate claims/encounters for CMS
- ‚≠ê **Star Ratings**: Drive quality metrics that impact bonus payments
- ‚úÖ **Compliance & Audits**: Ensure federal regulatory compliance
- üìà **Revenue Optimization**: Accurate coding and forecasting maximize financial performance

### üóÇÔ∏è Dataset Overview

We'll work with **Medicare risk adjustment data**:
- **Members**: Medicare Advantage enrollees with demographics
- **Claims**: Medical encounters with diagnosis codes
- **Diagnoses**: ICD-10 codes mapped to HCC categories
- **Providers**: Healthcare providers submitting encounters
- **Procedures**: Services rendered and billed

---



# Databricks Medallion Pipeline for a Healthcare Payer


## Risk Adjustment & HCC Modeling Concepts

### üè• Medicare Risk Adjustment Overview

CMS uses **HCC (Hierarchical Condition Category)** models to adjust capitation payments based on beneficiary health status:

1. **ICD-10 Diagnosis Codes** ‚Üí mapped to **HCC categories**
2. **HCC categories** ‚Üí assigned **coefficient weights**
3. **Risk Score** = Sum of HCC weights + demographic factors (age, sex, disability status)
4. **Payment** = Base rate √ó Risk Score

**Example:**
- Member with diabetes (HCC 19, weight 0.318) and heart failure (HCC 85, weight 0.368)
- Base demographic score: 0.500
- **Total Risk Score = 0.500 + 0.318 + 0.368 = 1.186**
- If base payment = $10,000, **CMS pays: $10,000 √ó 1.186 = $11,860**

### ‚≠ê CMS Star Ratings Impact

Star Ratings (1-5 stars) impact bonus payments:
- **5 stars**: Up to 5% bonus payment
- **4+ stars**: Quality Bonus Payments (QBP)
- Ratings based on quality measures (clinical outcomes, patient experience, access)

### üìä HQRI Data Model

For risk adjustment, key tables include:
- **Encounters/Claims**: Medical services with diagnosis codes
- **Diagnosis-to-HCC Mapping**: ICD-10 ‚Üí HCC crosswalk
- **HCC Coefficients**: CMS model weights by year
- **Members**: Demographics and enrollment status
- **Risk Scores**: Calculated member-level risk scores

<div style="display: flex; justify-content: space-between;">
  <img src="https://user-gen-media-assets.s3.amazonaws.com/gpt4o_images/5c87faea-3e60-4f71-826d-42d04f6cdc0b.png" alt="Dimensional Model" width="400" height="350">
  <img src="https://user-gen-media-assets.s3.amazonaws.com/gpt4o_images/6826c275-d462-4c07-a978-43fe9c40f3ed.png" alt="Data Vault" width="400" height="350">
</div>

**Resources:**
- [CMS Risk Adjustment Model](https://www.cms.gov/medicare/payment/medicare-advantage-rates-statistics/risk-adjustment)
- [Implementing Dimensional Modeling on Databricks](https://www.databricks.com/blog/implementing-dimensional-data-warehouse-databricks-sql-part-1)







# üîÑ Why Migrate from SAS to Databricks?

### Common HQRI Challenges with SAS

Healthcare organizations using SAS for risk adjustment analytics face several limitations:

1. **üêå Performance Bottlenecks**
   - Single-server processing limits scalability
   - Large member populations (millions of records) cause memory issues
   - Batch processing takes hours or overnight

2. **üí∞ Cost Concerns**
   - Expensive annual licensing fees
   - Additional costs for SAS/ACCESS, SAS Enterprise Guide, SAS Visual Analytics
   - Hardware upgrades needed for growing data volumes

3. **üîß Development Complexity**
   - Multiple PROC steps required for simple operations
   - Limited modern SQL features (no EXPLODE, limited window functions)
   - SAS macros difficult to maintain and debug
   - Separate tools needed for visualization and dashboards

4. **‚òÅÔ∏è Cloud Migration Challenges**
   - Legacy on-premises architecture
   - Difficult integration with cloud data lakes
   - Limited real-time analytics capabilities

### Databricks Advantages for HQRI

| **Capability** | **Impact** |
|----------------|------------|
| **Distributed Processing** | Handle billions of encounters with sub-second queries |
| **Modern SQL** | Window functions, CTEs, EXPLODE - cleaner, more maintainable code |
| **Unified Platform** | SQL, Python, R, dashboards, ML - all in one place |
| **Delta Lake** | ACID transactions + time travel for audit compliance |
| **Unity Catalog** | Row/column-level security for PHI/PII protection |
| **Real-time Analytics** | Streaming + batch unified for immediate insights |
| **Cost Efficiency** | Pay-per-use vs. fixed licensing, auto-scaling |


---


# SETUP

Just run next couple of cells for setup!

In [0]:
dbutils.widgets.text("catalog", "my_catalog", "Catalog")
dbutils.widgets.text("bronze_db", "payer_bronze", "Bronze DB")
dbutils.widgets.text("silver_db", "payer_silver", "Silver DB")
dbutils.widgets.text("gold_db", "payer_gold", "Gold DB")

catalog = dbutils.widgets.get("catalog")
bronze_db = dbutils.widgets.get("bronze_db")
silver_db = dbutils.widgets.get("silver_db")
gold_db = dbutils.widgets.get("gold_db")

path = f"/Volumes/{catalog}/{bronze_db}/payer/files/"

print(f"Catalog: {catalog}")
print(f"Bronze DB: {bronze_db}")
print(f"Silver DB: {silver_db}")
print(f"Gold DB: {gold_db}")
print(f"Path: {path}")

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")

spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {bronze_db}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {silver_db}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {gold_db}")

spark.sql(f"CREATE VOLUME IF NOT EXISTS {bronze_db}.payer")

# Create the volume and folders
dbutils.fs.mkdirs(f"/Volumes/{catalog}/{bronze_db}/payer/files/claims")
dbutils.fs.mkdirs(f"/Volumes/{catalog}/{bronze_db}/payer/files/diagnosis")
dbutils.fs.mkdirs(f"/Volumes/{catalog}/{bronze_db}/payer/files/procedures")
dbutils.fs.mkdirs(f"/Volumes/{catalog}/{bronze_db}/payer/files/members")
dbutils.fs.mkdirs(f"/Volumes/{catalog}/{bronze_db}/payer/files/providers")
dbutils.fs.mkdirs(f"/Volumes/{catalog}/{bronze_db}/payer/downloads")

In [0]:
import requests
import zipfile
import io
import os
import shutil

# Define the URL of the ZIP file
url = "https://github.com/bigdatavik/databricksfirststeps/blob/6b225621c3c010a2734ab604efd79c15ec6c71b8/data/Payor_Archive.zip?raw=true"

# Download the ZIP file
response = requests.get(url)
zip_file = zipfile.ZipFile(io.BytesIO(response.content))

# Define the base path
base_path = f"/Volumes/{catalog}/{bronze_db}/payer/downloads" 

# Extract the ZIP file to the base path
zip_file.extractall(base_path)

# Define the paths
paths = {
    "claims.csv": f"{base_path}/claims",
    "diagnoses.csv": f"{base_path}/diagnosis",
    "procedures.csv": f"{base_path}/procedures",
    "member.csv": f"{base_path}/members",
    "providers.csv": f"{base_path}/providers"
}

# Create the destination directories if they do not exist
for dest_path in paths.values():
    os.makedirs(dest_path, exist_ok=True)

# Move the files to the respective directories
for file_name, dest_path in paths.items():
    source_file = f"{base_path}/{file_name}"
    if os.path.exists(source_file):
        os.rename(source_file, f"{dest_path}/{file_name}")


# Copy the files to the specified directories and print the paths
shutil.copy(f"{base_path}/claims/claims.csv", f"/Volumes/{catalog}/{bronze_db}/payer/files/claims/claims.csv")
print(f"Copied to /Volumes/{catalog}/{bronze_db}/payer/files/claims/claims.csv")

shutil.copy(f"{base_path}/diagnosis/diagnoses.csv", f"/Volumes/{catalog}/{bronze_db}/payer/files/diagnosis/diagnosis.csv")
print(f"Copied to /Volumes/{catalog}/{bronze_db}/payer/files/diagnosis/diagnosis.csv")

shutil.copy(f"{base_path}/procedures/procedures.csv", f"/Volumes/{catalog}/{bronze_db}/payer/files/procedures/procedures.csv")
print(f"Copied to /Volumes/{catalog}/{bronze_db}/payer/files/procedures/procedures.csv")

shutil.copy(f"{base_path}/members/member.csv", f"/Volumes/{catalog}/{bronze_db}/payer/files/members/members.csv")
print(f"Copied to /Volumes/{catalog}/{bronze_db}/payer/files/members/members.csv")

shutil.copy(f"{base_path}/providers/providers.csv", f"/Volumes/{catalog}/{bronze_db}/payer/files/providers/providers.csv")
print(f"Copied to /Volumes/{catalog}/{bronze_db}/payer/files/providers/providers.csv")



# üöÄ Let's Build Your First Data Pipeline!

---

## Workshop Roadmap

```
üì• Bronze Layer    ‚Üí    üîß Silver Layer    ‚Üí    ‚≠ê Gold Layer    ‚Üí    üìä Analytics
   (Raw Data)          (Cleaned Data)        (Business Tables)      (Insights)
```

In the following sections, we'll build a complete data pipeline following the **Medallion Architecture**:

1. **Bronze Layer**: Ingest raw CSV files into Delta tables
2. **Silver Layer**: Clean, deduplicate, and transform data
3. **Gold Layer**: Create enriched analytics tables
4. **Analytics**: Generate insights and visualizations

Let's get started! üéâ

# üì• Bronze/Silver Layers ‚Äì Streamlined Data Preparation

---

## Overview: Simplified Bronze & Silver

For HQRI analytics, we'll **streamline** Bronze and Silver layers to quickly get to Gold layer insights:

### Bronze Layer (Raw Data Landing)
- üìÇ Ingest encounter data "as-is" using `COPY INTO`
- üíæ Store in Delta Lake for audit trails
- ‚è±Ô∏è Maintain full history for compliance

### Silver Layer (Clean & Validate)
- üßπ Remove duplicates and validate data quality
- üîÑ Map ICD-10 codes to HCC categories
- ‚úÖ Apply business rules for CMS submission eligibility

> **üí° Focus**: We'll execute Bronze/Silver steps efficiently so we can spend more time on **Gold layer analytics** that drive business value!

---



## Step 1: Verify Source Files

Let's first check that our source files are available:

In [0]:
%sql
LIST '/Volumes/my_catalog/payer_bronze/payer/files/claims/'

## Step 2: Load Data with COPY INTO

### üìñ Understanding COPY INTO

`COPY INTO` is Databricks' recommended command for loading data from cloud storage into Delta tables.

**Key Benefits:**
- ‚úÖ **Idempotent**: Safely re-run without duplicating data
- ‚úÖ **Incremental**: Only loads new files automatically
- ‚úÖ **Schema Evolution**: Can merge new columns with `mergeSchema` option
- ‚úÖ **Atomic**: Either succeeds completely or rolls back

**Syntax:**
```sql
COPY INTO <table_name>
FROM '<source_path>'
FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS('mergeSchema' = 'true')
```

üìö **Learn More:**
- [COPY INTO Documentation](https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-copy-into)
- [COPY INTO Examples](https://learn.microsoft.com/en-us/azure/databricks/ingestion/cloud-object-storage/copy-into/)


### Loading Data with SQL

In [0]:
%sql
-- Load Claims Data into Bronze Table
CREATE TABLE IF NOT EXISTS payer_bronze.claims_raw;
COPY INTO payer_bronze.claims_raw FROM
(SELECT
*
FROM '/Volumes/my_catalog/payer_bronze/payer/files/claims/')
FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true',
               'inferSchema' = 'true',
               'delimiter' = ',')
COPY_OPTIONS ('mergeSchema' = 'true', 'force' = 'true');

-- NOTE: 'force = true' is used here for demo purposes only to reload all files every time. In production, omit this option so COPY INTO only processes new data files.


-- Load Diagnosis Data into Bronze Table
CREATE TABLE IF NOT EXISTS payer_bronze.diagnosis_raw;
COPY INTO payer_bronze.diagnosis_raw FROM
(SELECT
*
FROM '/Volumes/my_catalog/payer_bronze/payer/files/diagnosis/')

FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true',
               'inferSchema' = 'true',
               'delimiter' = ',')
COPY_OPTIONS ('mergeSchema' = 'true');


-- Load Members Data into Bronze Table
CREATE TABLE IF NOT EXISTS payer_bronze.members_raw;
COPY INTO payer_bronze.members_raw FROM
(SELECT
*
FROM '/Volumes/my_catalog/payer_bronze/payer/files/members/')

FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true',
               'inferSchema' = 'true',
               'delimiter' = ',')
COPY_OPTIONS ('mergeSchema' = 'true');


-- Load Procedures Data into Bronze Table
CREATE TABLE IF NOT EXISTS payer_bronze.procedures_raw;
COPY INTO payer_bronze.procedures_raw FROM
(SELECT
*
FROM '/Volumes/my_catalog/payer_bronze/payer/files/procedures/')
FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true',
               'inferSchema' = 'true',
               'delimiter' = ',')
COPY_OPTIONS ('mergeSchema' = 'true');


-- Load Providers Data into Bronze Table
CREATE TABLE IF NOT EXISTS payer_bronze.providers_raw;
COPY INTO payer_bronze.providers_raw FROM
(SELECT
*
FROM '/Volumes/my_catalog/payer_bronze/payer/files/providers/')
FILEFORMAT = CSV
FORMAT_OPTIONS('header' = 'true',
               'inferSchema' = 'true',
               'delimiter' = ',')
COPY_OPTIONS ('mergeSchema' = 'true');


### üêç Alternative: Loading Data with PySpark

While SQL is great for batch loading, PySpark gives you more programmatic control. Here's how to load the same data using PySpark:

In [0]:
# Example: Load data using PySpark
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

# Option 1: Let Spark infer the schema
claims_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/my_catalog/payer_bronze/payer/files/claims/")

# Display first 10 rows
display(claims_df.limit(10))

# Show schema
print("Claims Schema:")
claims_df.printSchema()

# Get row count
print(f"\nTotal rows loaded: {claims_df.count()}")

# Write to Delta table (this creates or replaces the table)
# claims_df.write \
#     .format("delta") \
#     .mode("overwrite") \
#     .saveAsTable("payer_bronze.claims_raw_pyspark")


## Silver Layer ‚Äì Transforms for Risk Adjustment

Now let's clean the data and prepare it for HCC risk scoring:

**Key Silver Transformations:**
- ‚úÖ Deduplicate encounters
- ‚úÖ Validate diagnosis codes (ICD-10 format)
- ‚úÖ Filter to eligible members (Medicare Advantage, active enrollment)
- ‚úÖ Map diagnoses to HCC categories

---

> **üí° Speed Focus**: We'll run these transforms to move to Gold layer analytics!



## Step 1: Transform Bronze to Silver (SQL)

Let's clean and transform our Bronze tables. We'll demonstrate with multiple examples using both **SQL** and **PySpark**.

In [0]:
%sql
-- Create silver schema
CREATE SCHEMA IF NOT EXISTS payer_silver;


-- Members: select relevant fields, cast types, remove duplicates
CREATE OR REPLACE TABLE payer_silver.members AS
SELECT
  DISTINCT CAST(member_id AS STRING) AS member_id,
  TRIM(first_name) AS first_name,
  TRIM(last_name) AS last_name,
  CAST(birth_date AS DATE) AS birth_date,
  gender,
  plan_id,
  CAST(effective_date AS DATE) AS effective_date
FROM payer_bronze.members_raw
WHERE member_id IS NOT NULL;


-- Claims: remove duplicates, prepare data
CREATE OR REPLACE TABLE payer_silver.claims AS
SELECT
  DISTINCT claim_id,
  member_id,
  provider_id,
  CAST(claim_date AS DATE) AS claim_date,
  ROUND(total_charge, 2) AS total_charge,
  LOWER(claim_status) AS claim_status
FROM payer_bronze.claims_raw
WHERE claim_id IS NOT NULL AND total_charge > 0;


-- Providers: deduplicate
CREATE OR REPLACE TABLE payer_silver.providers AS
SELECT
  DISTINCT provider_id,
  npi,
  provider_name,
  specialty,
  address,
  city,
  state
FROM payer_bronze.providers_raw
WHERE provider_id IS NOT NULL;


## Step 2: Transform with PySpark

Now let's see how to do the same transformations using PySpark. This approach is more flexible for complex business logic.

### Example: Transform Procedures Table with PySpark


In [0]:
from pyspark.sql.functions import col, trim, upper, round as spark_round, when, regexp_replace

# Read from Bronze
procedures_bronze = spark.table("payer_bronze.procedures_raw")

# Clean and cast the amount column
procedures_bronze_clean = procedures_bronze.withColumn(
    "amount_clean",
    regexp_replace(col("amount"), "[^0-9.]", "").cast("double")
)

# Apply transformations
procedures_silver = procedures_bronze_clean \
    .dropDuplicates(['claim_id', 'procedure_code']) \
    .filter(col("claim_id").isNotNull()) \
    .filter(col("amount_clean") > 0) \
    .select(
        col("claim_id"),
        upper(trim(col("procedure_code"))).alias("procedure_code"),
        trim(col("procedure_desc")).alias("procedure_desc"),
        spark_round(col("amount_clean"), 2).alias("amount"),
        when(col("amount_clean") < 100, "Low")
        .when(col("amount_clean") < 500, "Medium")
        .when(col("amount_clean") < 1000, "High")
        .otherwise("Very High").alias("cost_category")
    )

# Show sample data
print("Transformed Procedures (first 10 rows):")
display(procedures_silver.limit(10))

# Show statistics
print("\nCost Category Distribution:")
display(procedures_silver.groupBy("cost_category").count().orderBy("cost_category"))

# Write to Silver table
procedures_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("payer_silver.procedures")


# ü§ñ Using Databricks AI Assistant

---

Databricks AI Assistant can help you write code, understand data, and troubleshoot issues!

### How to Use AI Assistant:
1. Click the AI Assistant icon
2. Ask questions in natural language
3. Get code suggestions and explanations

### Example Prompts to Try:
- "How do I calculate the total claims by specialty?"
- "Show me how to create a window function for running totals"
- "What does spark.table() command do?"
- "Help me debug this PySpark error"

---



## üéØ YOUR TURN! (3 mins)
Ask Databricks Assistant: "How do I calculate the total claims by specialty in SQL?"

# ‚≠ê Gold Layer ‚Äì HQRI Risk Adjustment Analytics

---

## What is the HQRI Gold Layer?

The **Gold Layer** is where we deliver **business value** for HQRI. Here we create analytics tables that directly support:

- üí∞ **Risk Score Calculations**: Member-level HCC risk scores for CMS payments
- üìä **Encounter Datamart**: CMS submission-ready data with quality validations
- ‚≠ê **Star Ratings**: Quality metrics that impact bonus payments
- üìà **Revenue Forecasting**: Projected payments based on risk scores
- ‚úÖ **Compliance Audits**: Data quality metrics for regulatory requirements
- üë• **Member Stratification**: High-risk vs. low-risk population segmentation
- üè• **Provider Performance**: Risk capture rates by provider

> **üéØ HQRI Value**: Each Gold table directly answers a business question that impacts revenue, compliance, or quality!

---

## Example 1: HCC Risk Score Calculation

### üéØ Business Goal
Calculate member-level risk scores based on HCC categories to determine CMS payments.

**Risk Score Formula:**
```
Risk Score = Demographic Score + Œ£(HCC Coefficients)
```

### üìä SAS vs. Databricks Comparison

Let's compare how this is traditionally done in **SAS** vs. **Databricks SQL** and **PySpark**:

---

#### Traditional SAS Approach

```sas
/* SAS: HCC Risk Score Calculation */
/* Step 1: Create HCC reference table */
DATA work.hcc_reference;
    INPUT icd10_code $ 1-10 diagnosis_desc $ 12-50 hcc_category hcc_coefficient;
    DATALINES;
E11.9      Type 2 Diabetes                    19  0.318
I50.9      Heart Failure                      85  0.368
I10        Hypertension                       0   0.000
J44.9      COPD                              111  0.328
N18.3      CKD Stage 3                       138  0.237
;
RUN;

/* Step 2: Merge claims with diagnoses and HCC mapping */
PROC SQL;
    CREATE TABLE work.diagnosis_with_hcc AS
    SELECT 
        d.claim_id,
        d.diagnosis_code,
        d.diagnosis_desc,
        h.hcc_category,
        h.hcc_coefficient,
        CASE 
            WHEN h.hcc_category IS NOT NULL AND h.hcc_category > 0 
            THEN 1 ELSE 0 
        END AS is_hcc
    FROM work.diagnosis_raw AS d
    LEFT JOIN work.hcc_reference AS h
        ON UPCASE(STRIP(d.diagnosis_code)) = UPCASE(STRIP(h.icd10_code));
QUIT;

/* Step 3: Calculate member-level risk scores */
PROC SQL;
    CREATE TABLE work.member_hccs AS
    SELECT DISTINCT
        c.member_id,
        m.first_name,
        m.last_name,
        m.birth_date,
        m.gender,
        m.plan_id,
        dh.hcc_category,
        dh.hcc_coefficient,
        dh.diagnosis_code,
        YEAR(TODAY()) - YEAR(m.birth_date) AS age
    FROM work.claims AS c
    INNER JOIN work.members AS m 
        ON c.member_id = m.member_id
    INNER JOIN work.diagnosis_with_hcc AS dh 
        ON c.claim_id = dh.claim_id
    WHERE dh.is_hcc = 1;
QUIT;

/* Step 4: Aggregate and calculate final risk scores */
PROC SQL;
    CREATE TABLE work.member_risk_scores AS
    SELECT 
        member_id,
        first_name,
        last_name,
        birth_date,
        gender,
        plan_id,
        age,
        CASE 
            WHEN age < 65 THEN 0.350
            WHEN age BETWEEN 65 AND 69 THEN 0.450
            WHEN age BETWEEN 70 AND 74 THEN 0.550
            WHEN age BETWEEN 75 AND 79 THEN 0.650
            ELSE 0.750
        END AS demographic_score,
        SUM(hcc_coefficient) AS hcc_score,
        COUNT(DISTINCT hcc_category) AS hcc_count,
        CALCULATED demographic_score + CALCULATED hcc_score AS total_risk_score,
        ROUND((CALCULATED demographic_score + CALCULATED hcc_score) * 10000, 0.01) 
            AS projected_annual_payment
    FROM work.member_hccs
    GROUP BY member_id, first_name, last_name, birth_date, gender, plan_id, age
    ORDER BY total_risk_score DESC;
QUIT;
```

**SAS Challenges:**
- ‚ùå Multiple PROC SQL steps required
- ‚ùå Intermediate tables clutter WORK library
- ‚ùå Limited scalability with large datasets
- ‚ùå No automatic optimization or parallelization
- ‚ùå Complex syntax for array aggregations (HCC categories list)

---

Now let's see how **Databricks SQL** simplifies this!

In [0]:
%sql
-- Create gold schema
CREATE SCHEMA IF NOT EXISTS payer_gold;

-- Step 1a: Create HCC Mapping Reference Table (simulated for demo)
CREATE OR REPLACE TABLE payer_gold.hcc_reference (
  icd10_code STRING,
  diagnosis_desc STRING,
  hcc_category INT,
  hcc_coefficient DOUBLE
);

INSERT INTO payer_gold.hcc_reference VALUES
('E11.9', 'Type 2 Diabetes', 19, 0.318),
('I50.9', 'Heart Failure', 85, 0.368),
('I10', 'Hypertension', 0, 0.000),
('J44.9', 'COPD', 111, 0.328),
('N18.3', 'CKD Stage 3', 138, 0.237),
('F32.9', 'Depression', 59, 0.309),
('E78.5', 'Hyperlipidemia', 0, 0.000),
('I25.10', 'CAD', 88, 0.184);

-- Step 1b: Join Diagnoses to HCC Categories
CREATE OR REPLACE TABLE payer_gold.diagnosis_with_hcc AS
SELECT
  d.claim_id,
  d.diagnosis_code,
  d.diagnosis_desc,
  h.hcc_category,
  h.hcc_coefficient,
  CASE WHEN h.hcc_category IS NOT NULL AND h.hcc_category > 0 THEN 1 ELSE 0 END as is_hcc
FROM payer_bronze.diagnosis_raw d
LEFT JOIN payer_gold.hcc_reference h 
  ON UPPER(TRIM(d.diagnosis_code)) = UPPER(TRIM(h.icd10_code));

-- Step 1c: Calculate Member-Level HCC Risk Scores
CREATE OR REPLACE TABLE payer_gold.member_risk_scores AS
WITH member_hccs AS (
  SELECT DISTINCT
    c.member_id,
    m.first_name,
    m.last_name,
    m.birth_date,
    m.gender,
    m.plan_id,
    dh.hcc_category,
    dh.hcc_coefficient,
    dh.diagnosis_code,
    YEAR(CURRENT_DATE()) - YEAR(m.birth_date) as age
  FROM payer_silver.claims c
  INNER JOIN payer_silver.members m ON c.member_id = m.member_id
  INNER JOIN payer_gold.diagnosis_with_hcc dh ON c.claim_id = dh.claim_id
  WHERE dh.is_hcc = 1
),
member_scores AS (
  SELECT
    member_id,
    first_name,
    last_name,
    birth_date,
    gender,
    plan_id,
    age,
    CASE 
      WHEN age < 65 THEN 0.350
      WHEN age BETWEEN 65 AND 69 THEN 0.450
      WHEN age BETWEEN 70 AND 74 THEN 0.550
      WHEN age BETWEEN 75 AND 79 THEN 0.650
      ELSE 0.750
    END as demographic_score,
    SUM(hcc_coefficient) as hcc_score,
    COUNT(DISTINCT hcc_category) as hcc_count,
    COLLECT_SET(hcc_category) as hcc_categories,
    COLLECT_SET(diagnosis_code) as diagnosis_codes
  FROM member_hccs
  GROUP BY member_id, first_name, last_name, birth_date, gender, plan_id, age
)
SELECT
  *,
  demographic_score + hcc_score as total_risk_score,
  ROUND((demographic_score + hcc_score) * 10000, 2) as projected_annual_payment
FROM member_scores
ORDER BY total_risk_score DESC;

### üöÄ Databricks SQL Advantages

**Databricks SQL Benefits:**
- ‚úÖ **Single SQL Statement**: All logic in one CREATE TABLE AS with CTEs
- ‚úÖ **Advanced Functions**: COLLECT_SET() for array aggregation (no SAS equivalent)
- ‚úÖ **Automatic Optimization**: Query engine optimizes joins and aggregations
- ‚úÖ **Scalability**: Distributed processing handles billions of rows
- ‚úÖ **Unity Catalog**: Built-in governance, lineage tracking, and access control
- ‚úÖ **Delta Lake**: ACID transactions, time travel, schema evolution
- ‚úÖ **Real-time Refresh**: Can be scheduled or triggered automatically

### üí° PySpark Alternative

For complex business logic or programmatic control, use PySpark:


In [0]:
# PySpark Example: HCC Risk Score Calculation
from pyspark.sql.functions import (
    col, year, current_date, sum as _sum, count, countDistinct,
    collect_set, when, round as spark_round, lit
)

# Read tables
claims = spark.table("payer_silver.claims")
members = spark.table("payer_silver.members")
diagnosis_with_hcc = spark.table("payer_gold.diagnosis_with_hcc")

# Join and calculate member HCCs
member_hccs = claims \
    .join(members, "member_id") \
    .join(diagnosis_with_hcc, "claim_id") \
    .filter(col("is_hcc") == 1) \
    .withColumn("age", year(current_date()) - year(col("birth_date"))) \
    .select(
        "member_id", "first_name", "last_name", "birth_date", "gender", 
        "plan_id", "age", "hcc_category", "hcc_coefficient", "diagnosis_code"
    ) \
    .distinct()

# Calculate demographic scores and aggregate HCC scores
member_risk_scores_pyspark = member_hccs \
    .withColumn("demographic_score",
        when(col("age") < 65, 0.350)
        .when(col("age") <= 69, 0.450)
        .when(col("age") <= 74, 0.550)
        .when(col("age") <= 79, 0.650)
        .otherwise(0.750)
    ) \
    .groupBy("member_id", "first_name", "last_name", "birth_date", 
             "gender", "plan_id", "age", "demographic_score") \
    .agg(
        _sum("hcc_coefficient").alias("hcc_score"),
        countDistinct("hcc_category").alias("hcc_count"),
        collect_set("hcc_category").alias("hcc_categories"),
        collect_set("diagnosis_code").alias("diagnosis_codes")
    ) \
    .withColumn("total_risk_score", col("demographic_score") + col("hcc_score")) \
    .withColumn("projected_annual_payment", 
                spark_round(col("total_risk_score") * 10000, 2)) \
    .orderBy(col("total_risk_score").desc())

# Display results
print("üìä PySpark Risk Score Calculation - Top 10 Members:")
display(member_risk_scores_pyspark.limit(10))

# Optionally write to table
# member_risk_scores_pyspark.write.format("delta").mode("overwrite") \
#     .saveAsTable("payer_gold.member_risk_scores_pyspark")


## üéØ YOUR TURN! (3 mins)
Ask Databricks Assistant: "Calculate top 20 members by risk score in SQL"


## Example 2: Revenue Forecast & Impact Analysis

### üí∞ Business Goal
Project total CMS revenue based on risk scores to support financial planning.

**Key Metrics:**
- Total member population
- Average risk score
- Projected annual revenue
- Revenue by plan and risk tier


### üìä SAS vs. Databricks SQL Comparison

#### Traditional SAS Approach

```sas
/* SAS: Revenue Forecast by Plan */
PROC SQL;
    CREATE TABLE work.revenue_forecast AS
    SELECT 
        plan_id,
        COUNT(DISTINCT member_id) AS total_members,
        ROUND(AVG(total_risk_score), 0.001) AS avg_risk_score,
        ROUND(MIN(total_risk_score), 0.001) AS min_risk_score,
        ROUND(MAX(total_risk_score), 0.001) AS max_risk_score,
        SUM(projected_annual_payment) AS total_projected_revenue,
        ROUND(AVG(projected_annual_payment), 0.01) AS avg_payment_per_member,
        SUM(CASE WHEN total_risk_score >= 1.5 THEN 1 ELSE 0 END) AS high_risk_members,
        SUM(CASE WHEN total_risk_score < 1.0 THEN 1 ELSE 0 END) AS low_risk_members
    FROM work.member_risk_scores
    GROUP BY plan_id
    ORDER BY total_projected_revenue DESC;
QUIT;

/* Export to Excel for reporting */
PROC EXPORT DATA=work.revenue_forecast
    OUTFILE='/path/to/revenue_forecast.xlsx'
    DBMS=XLSX REPLACE;
RUN;

/* Generate summary report */
PROC PRINT DATA=work.revenue_forecast;
    TITLE 'Revenue Forecast by Plan';
RUN;
```

**SAS Limitations:**
- ‚ùå Manual export steps for reporting
- ‚ùå No real-time dashboard integration
- ‚ùå Limited to single-server processing
- ‚ùå Requires additional tools for visualization
- ‚ùå Static reports need manual refresh

#### Databricks SQL Approach



## üéØ YOUR TURN! (3 mins)
Work with Databricks Assistant and convert below SAS code to SQL. Create new table called "revenue_forecast" under "payer_gold" schema (if you have time, convert to Pyspark as well):

```**sas**
PROC SQL;
    CREATE TABLE work.revenue_forecast AS
    SELECT 
        plan_id,
        COUNT(DISTINCT member_id) AS total_members,
        ROUND(AVG(total_risk_score), 0.001) AS avg_risk_score,
        ROUND(MIN(total_risk_score), 0.001) AS min_risk_score,
        ROUND(MAX(total_risk_score), 0.001) AS max_risk_score,
        SUM(projected_annual_payment) AS total_projected_revenue,
        ROUND(AVG(projected_annual_payment), 0.01) AS avg_payment_per_member,
        SUM(CASE WHEN total_risk_score >= 1.5 THEN 1 ELSE 0 END) AS high_risk_members,
        SUM(CASE WHEN total_risk_score < 1.0 THEN 1 ELSE 0 END) AS low_risk_members
    FROM work.member_risk_scores
    GROUP BY plan_id
    ORDER BY total_projected_revenue DESC;
QUIT;
```


In [0]:
%sql
-- -- Display revenue forecast
-- SELECT * FROM payer_gold.revenue_forecast;



## Example 3: HCC Distribution Analysis

### üìà Business Goal
Understand which HCC categories drive the most revenue and identify coding opportunities.

This helps HQRI:
- Identify high-value diagnoses for provider education
- Monitor HCC capture rates
- Find gaps in documentation


### üìä SAS vs. Databricks SQL Comparison

#### Traditional SAS Approach

```sas
/* SAS: HCC Distribution Analysis */
/* Step 1: Unnest HCC categories from member_risk_scores */
/* Note: SAS doesn't have native array explosion like SQL EXPLODE */
/* Must use DATA step with ARRAY processing */

DATA work.hcc_exploded;
    SET work.member_risk_scores;
    ARRAY hccs hcc_cat1-hcc_cat10;  /* Assumes max 10 HCCs per member */
    
    DO i = 1 TO DIM(hccs);
        IF hccs[i] NE . THEN DO;
            hcc_category = hccs[i];
            OUTPUT;
        END;
    END;
    DROP hcc_cat1-hcc_cat10 i;
RUN;

/* Step 2: Join with HCC reference and aggregate */
PROC SQL;
    CREATE TABLE work.hcc_distribution AS
    SELECT 
        he.hcc_category,
        r.diagnosis_desc,
        r.hcc_coefficient,
        COUNT(DISTINCT he.member_id) AS member_count,
        COUNT(DISTINCT he.plan_id) AS plan_count,
        ROUND(r.hcc_coefficient * COUNT(DISTINCT he.member_id) * 10000, 0.01) 
            AS total_revenue_impact,
        ROUND(r.hcc_coefficient * 10000, 0.01) AS revenue_per_member
    FROM work.hcc_exploded AS he
    INNER JOIN work.hcc_reference AS r 
        ON he.hcc_category = r.hcc_category
    GROUP BY he.hcc_category, r.diagnosis_desc, r.hcc_coefficient
    ORDER BY total_revenue_impact DESC;
QUIT;
```

**SAS Challenges:**
- ‚ùå No native EXPLODE function - requires manual array processing
- ‚ùå Must pre-define array size (max HCCs per member)
- ‚ùå Two-step process: DATA step + PROC SQL
- ‚ùå Complex logic for dynamic array sizes
- ‚ùå Performance issues with large datasets

#### Databricks SQL Approach - Single Statement!


## üéØ YOUR TURN! (3 mins)
Work with Databricks Assistant and convert below SAS code to SQL. Create new table called "hcc_distribution" under "payer_gold" schema (if you have time, convert to Pyspark as well):

```sas
/* SAS: HCC Distribution Analysis */
/* Step 1: Unnest HCC categories from member_risk_scores */
/* Note: SAS doesn't have native array explosion like SQL EXPLODE */
/* Must use DATA step with ARRAY processing */

DATA work.hcc_exploded;
    SET work.member_risk_scores;
    ARRAY hccs hcc_cat1-hcc_cat10;  /* Assumes max 10 HCCs per member */
    
    DO i = 1 TO DIM(hccs);
        IF hccs[i] NE . THEN DO;
            hcc_category = hccs[i];
            OUTPUT;
        END;
    END;
    DROP hcc_cat1-hcc_cat10 i;
RUN;

/* Step 2: Join with HCC reference and aggregate */
PROC SQL;
    CREATE TABLE work.hcc_distribution AS
    SELECT 
        he.hcc_category,
        r.diagnosis_desc,
        r.hcc_coefficient,
        COUNT(DISTINCT he.member_id) AS member_count,
        COUNT(DISTINCT he.plan_id) AS plan_count,
        ROUND(r.hcc_coefficient * COUNT(DISTINCT he.member_id) * 10000, 0.01) 
            AS total_revenue_impact,
        ROUND(r.hcc_coefficient * 10000, 0.01) AS revenue_per_member
    FROM work.hcc_exploded AS he
    INNER JOIN work.hcc_reference AS r 
        ON he.hcc_category = r.hcc_category
    GROUP BY he.hcc_category, r.diagnosis_desc, r.hcc_coefficient
    ORDER BY total_revenue_impact DESC;
QUIT;
```

In [0]:
%sql
-- -- View HCC distribution
-- SELECT * FROM payer_gold.hcc_distribution;



## Example 4: Data Quality & Compliance Audit

### ‚úÖ Business Goal
Ensure encounter data meets CMS submission standards and identify data quality issues.

**CMS Requirements:**
- Valid diagnosis codes (ICD-10 format)
- Complete member demographics
- Valid provider NPIs
- Service dates within coverage period


### üìä SAS vs. Databricks SQL Comparison

#### Traditional SAS Approach

```sas
/* SAS: Data Quality Audit - Requires Multiple Queries */

/* Query 1: Total Encounters */
PROC SQL;
    CREATE TABLE work.audit_total AS
    SELECT 
        'Total Encounters' AS metric,
        COUNT(*) AS record_count,
        . AS pct_of_total,
        'INFO' AS severity
    FROM work.claims;
QUIT;

/* Query 2: Encounters Missing Diagnosis */
PROC SQL;
    CREATE TABLE work.audit_missing_dx AS
    SELECT 
        'Encounters Missing Diagnosis' AS metric,
        COUNT(DISTINCT c.claim_id) AS record_count,
        CALCULATED record_count * 100.0 / 
            (SELECT COUNT(*) FROM work.claims) AS pct_of_total,
        'ERROR' AS severity
    FROM work.claims AS c
    LEFT JOIN work.diagnosis_raw AS d 
        ON c.claim_id = d.claim_id
    WHERE d.claim_id IS NULL;
QUIT;

/* Query 3: HCC Mapping Rate */
PROC SQL;
    CREATE TABLE work.audit_hcc_mapped AS
    SELECT 
        'Diagnoses Mapped to HCC' AS metric,
        COUNT(*) AS record_count,
        CALCULATED record_count * 100.0 / 
            (SELECT COUNT(*) FROM work.diagnosis_raw) AS pct_of_total,
        'INFO' AS severity
    FROM work.diagnosis_with_hcc
    WHERE is_hcc = 1;
QUIT;

/* Repeat for remaining checks... */

/* Combine all audit results */
DATA work.data_quality_audit;
    SET work.audit_total
        work.audit_missing_dx
        work.audit_hcc_mapped
        /* ... other audit tables ... */;
        
    /* Add status flag */
    IF severity = 'ERROR' AND record_count > 0 THEN status = 'FAIL';
    ELSE IF severity = 'WARNING' AND pct_of_total > 5 THEN status = 'REVIEW';
    ELSE status = 'PASS';
RUN;

/* Sort and display */
PROC SORT DATA=work.data_quality_audit;
    BY severity DESCENDING pct_of_total;
RUN;
```

**SAS Challenges:**
- ‚ùå Must create separate queries for each audit check
- ‚ùå Manual combination with DATA step
- ‚ùå Multiple intermediate tables clutter workspace
- ‚ùå Difficult to maintain as audit rules grow
- ‚ùå No UNION ALL equivalent in single PROC SQL
- ‚ùå Subquery limitations in SELECT clause

#### Databricks SQL Approach - Elegant & Maintainable!


In [0]:
%sql
-- Data Quality Audit for CMS Submission
CREATE OR REPLACE TABLE payer_gold.data_quality_audit AS
WITH encounter_checks AS (
  SELECT
    'Total Encounters' as metric,
    COUNT(*) as record_count,
    NULL as pct_of_total,
    'INFO' as severity
  FROM payer_silver.claims

  UNION ALL

  SELECT
    'Encounters Missing Diagnosis',
    COUNT(DISTINCT c.claim_id),
    ROUND(
      COUNT(DISTINCT c.claim_id) * 100.0 / (SELECT COUNT(*) FROM payer_silver.claims),
      2
    ),
    'ERROR'
  FROM payer_silver.claims c
  LEFT JOIN payer_bronze.diagnosis_raw d ON c.claim_id = d.claim_id
  WHERE d.claim_id IS NULL

  UNION ALL

  SELECT
    'Diagnoses Mapped to HCC',
    COUNT(*),
    ROUND(
      COUNT(*) * 100.0 / (SELECT COUNT(*) FROM payer_bronze.diagnosis_raw),
      2
    ),
    'INFO'
  FROM payer_gold.diagnosis_with_hcc
  WHERE is_hcc = 1

  UNION ALL

  SELECT
    'Members Without Risk Scores',
    COUNT(DISTINCT m.member_id),
    ROUND(
      COUNT(DISTINCT m.member_id) * 100.0 / (SELECT COUNT(*) FROM payer_silver.members),
      2
    ),
    'WARNING'
  FROM payer_silver.members m
  LEFT JOIN payer_gold.member_risk_scores r ON m.member_id = r.member_id
  WHERE r.member_id IS NULL

  UNION ALL

  SELECT
    'Providers Missing NPI',
    COUNT(*),
    ROUND(
      COUNT(*) * 100.0 / (SELECT COUNT(*) FROM payer_silver.providers),
      2
    ),
    'ERROR'
  FROM payer_silver.providers
  WHERE npi IS NULL OR TRIM(npi) = ''

  UNION ALL

  SELECT
    'Claims with Invalid Status',
    COUNT(*),
    ROUND(
      COUNT(*) * 100.0 / (SELECT COUNT(*) FROM payer_silver.claims),
      2
    ),
    'WARNING'
  FROM payer_silver.claims
  WHERE claim_status NOT IN ('approved', 'paid', 'pending')
)
SELECT
  metric,
  record_count,
  COALESCE(pct_of_total, 0.0) as pct_of_total,
  severity,
  CASE
    WHEN severity = 'ERROR' AND record_count > 0 THEN 'FAIL'
    WHEN severity = 'WARNING' AND pct_of_total > 5 THEN 'REVIEW'
    ELSE 'PASS'
  END as status
FROM encounter_checks
ORDER BY
  CASE severity WHEN 'ERROR' THEN 1 WHEN 'WARNING' THEN 2 ELSE 3 END,
  pct_of_total DESC;

In [0]:
%sql
-- View data quality audit results
SELECT * FROM payer_gold.data_quality_audit;


## Example 5: Member Risk Stratification

### üë• Business Goal
Segment members by risk level to support care management and intervention programs.

**Risk Tiers:**
- **Very High Risk** (Score > 2.0): Intensive care management
- **High Risk** (Score 1.5-2.0): Enhanced monitoring
- **Moderate Risk** (Score 1.0-1.5): Standard care
- **Low Risk** (Score < 1.0): Preventive care focus


In [0]:
%sql
-- Member Risk Stratification
CREATE OR REPLACE TABLE payer_gold.member_risk_stratification AS
SELECT
  member_id,
  CONCAT(first_name, ' ', last_name) as member_name,
  age,
  gender,
  plan_id,
  total_risk_score,
  hcc_count,
  projected_annual_payment,
  CASE
    WHEN total_risk_score >= 2.0 THEN 'Very High Risk'
    WHEN total_risk_score >= 1.5 THEN 'High Risk'
    WHEN total_risk_score >= 1.0 THEN 'Moderate Risk'
    ELSE 'Low Risk'
  END as risk_tier,
  CASE
    WHEN total_risk_score >= 2.0 THEN 'Intensive Care Management Required'
    WHEN total_risk_score >= 1.5 THEN 'Enhanced Monitoring Recommended'
    WHEN total_risk_score >= 1.0 THEN 'Standard Care Protocol'
    ELSE 'Preventive Care Focus'
  END as care_recommendation,
  hcc_categories as active_hcc_list
FROM payer_gold.member_risk_scores;


In [0]:
%sql
-- Summary statistics by risk tier
SELECT
  risk_tier,
  COUNT(*) as member_count,
  ROUND(AVG(age), 1) as avg_age,
  ROUND(AVG(total_risk_score), 3) as avg_risk_score,
  ROUND(AVG(hcc_count), 1) as avg_hcc_count,
  SUM(projected_annual_payment) as total_revenue,
  ROUND(AVG(projected_annual_payment), 2) as avg_payment_per_member
FROM payer_gold.member_risk_stratification
GROUP BY risk_tier
ORDER BY avg_risk_score DESC;



## Example 6: Provider Performance on Risk Capture

### üè• Business Goal
Identify which providers excel at documenting HCC conditions to guide provider education.

**Key Metrics:**
- Members per provider
- Average risk score of provider's panel
- HCC capture rate
- Revenue attributed to provider


### üìä SAS vs. PySpark Comparison

#### Traditional SAS Approach

```sas
/* SAS: Provider Performance on Risk Capture */
PROC SQL;
    CREATE TABLE work.provider_performance AS
    SELECT 
        p.provider_id,
        p.provider_name,
        p.specialty,
        p.city,
        p.state,
        COUNT(DISTINCT c.member_id) AS unique_members,
        COUNT(c.claim_id) AS total_encounters,
        ROUND(AVG(m.total_risk_score), 0.001) AS avg_member_risk_score,
        SUM(dh.hcc_coefficient) AS total_hcc_value,
        COUNT(DISTINCT dh.hcc_category) AS unique_hccs_captured,
        ROUND(SUM(m.projected_annual_payment), 0.01) AS attributed_revenue,
        ROUND(COUNT(DISTINCT dh.hcc_category) / COUNT(DISTINCT c.member_id), 0.01) 
            AS hcc_capture_rate
    FROM work.claims AS c
    INNER JOIN work.providers AS p 
        ON c.provider_id = p.provider_id
    INNER JOIN work.member_risk_scores AS m 
        ON c.member_id = m.member_id
    INNER JOIN work.diagnosis_with_hcc AS dh 
        ON c.claim_id = dh.claim_id
    GROUP BY p.provider_id, p.provider_name, p.specialty, p.city, p.state
    ORDER BY attributed_revenue DESC;
QUIT;

/* Create top providers report */
PROC PRINT DATA=work.provider_performance(OBS=20);
    TITLE 'Top 20 Providers by Risk Capture Performance';
    VAR provider_name specialty unique_members avg_member_risk_score 
        unique_hccs_captured attributed_revenue hcc_capture_rate;
RUN;
```

**SAS Limitations:**
- ‚ùå Must calculate derived metrics (hcc_capture_rate) in the same SELECT
- ‚ùå No withColumn() equivalent for cleaner syntax
- ‚ùå Limited to single-server memory for large joins
- ‚ùå Static output - no interactive display
- ‚ùå Requires separate PROC PRINT for visualization

#### PySpark Approach - More Flexible & Scalable!


In [0]:
from pyspark.sql.functions import count, countDistinct, avg, sum, round as spark_round, col

# Provider Performance Analysis
claims = spark.table("payer_silver.claims")
providers = spark.table("payer_silver.providers")
members = spark.table("payer_gold.member_risk_scores")
diagnosis_hcc = spark.table("payer_gold.diagnosis_with_hcc")

# Join claims with risk scores
provider_performance = claims \
    .join(providers, "provider_id") \
    .join(members, "member_id") \
    .join(diagnosis_hcc, "claim_id") \
    .groupBy(
        "provider_id",
        "provider_name",
        "specialty",
        "city",
        "state"
    ) \
    .agg(
        countDistinct("member_id").alias("unique_members"),
        count("claim_id").alias("total_encounters"),
        spark_round(avg("total_risk_score"), 3).alias("avg_member_risk_score"),
        sum(col("hcc_coefficient")).alias("total_hcc_value"),
        countDistinct(col("hcc_category")).alias("unique_hccs_captured"),
        spark_round(sum("projected_annual_payment"), 2).alias("attributed_revenue")
    ) \
    .withColumn(
        "hcc_capture_rate",
        spark_round(col("unique_hccs_captured") / col("unique_members"), 2)
    ) \
    .orderBy(col("attributed_revenue").desc())

# Display top providers
print("üè• Top 20 Providers by Risk Capture Performance:")
display(provider_performance.limit(20))

# Save to Gold table
provider_performance.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("payer_gold.provider_risk_capture_performance")



## Example 7: Encounter Datamart for CMS Submission

### üìä Business Goal
Create a CMS-ready encounter datamart with all required fields and validations.

**CMS Submission Requirements:**
- Valid member enrollment
- Complete encounter details (dates, provider, diagnosis)
- Proper diagnosis code formatting (ICD-10)
- Service within coverage period
- Provider has valid NPI


In [0]:
%sql
-- Encounter Datamart for CMS Submission
CREATE OR REPLACE TABLE payer_gold.encounter_datamart_cms AS
SELECT
  -- Encounter identifiers
  c.claim_id as encounter_id,
  c.claim_date as encounter_date,
  c.claim_status as encounter_status,
  
  -- Member information
  m.member_id,
  m.first_name,
  m.last_name,
  m.birth_date,
  m.gender,
  m.plan_id,
  YEAR(CURRENT_DATE()) - YEAR(m.birth_date) as member_age,
  
  -- Provider information
  p.provider_id,
  p.npi as provider_npi,
  p.provider_name,
  p.specialty as provider_specialty,
  p.state as provider_state,
  
  -- Diagnosis information
  d.diagnosis_code as icd10_code,
  d.diagnosis_desc,
  d.hcc_category,
  d.hcc_coefficient,
  d.is_hcc,
  
  -- Claim financial
  c.total_charge,
  
  -- Data quality flags
  CASE 
    WHEN m.member_id IS NULL THEN 'FAIL: Missing Member'
    WHEN p.npi IS NULL OR TRIM(p.npi) = '' THEN 'FAIL: Missing Provider NPI'
    WHEN d.diagnosis_code IS NULL THEN 'FAIL: Missing Diagnosis'
    WHEN c.claim_date < m.effective_date THEN 'FAIL: Service Before Coverage'
    WHEN c.claim_status NOT IN ('approved', 'paid') THEN 'WARNING: Invalid Status'
    ELSE 'PASS'
  END as submission_validation_status,
  
  -- Submission flag
  CASE 
    WHEN m.member_id IS NOT NULL 
     AND p.npi IS NOT NULL 
     AND TRIM(p.npi) != ''
     AND d.diagnosis_code IS NOT NULL
     AND c.claim_date >= m.effective_date
     AND c.claim_status IN ('approved', 'paid')
    THEN 1 
    ELSE 0 
  END as cms_submission_ready,
  
  CURRENT_TIMESTAMP() as datamart_created_at
  
FROM payer_silver.claims c
INNER JOIN payer_silver.members m ON c.member_id = m.member_id
INNER JOIN payer_silver.providers p ON c.provider_id = p.provider_id
LEFT JOIN payer_gold.diagnosis_with_hcc d ON c.claim_id = d.claim_id;


In [0]:
%sql
-- CMS Submission Readiness Summary
SELECT
  submission_validation_status,
  COUNT(*) as encounter_count,
  ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as pct_of_total,
  SUM(cms_submission_ready) as ready_for_submission
FROM payer_gold.encounter_datamart_cms
GROUP BY submission_validation_status
ORDER BY encounter_count DESC;



## üîÑ Migration Summary: SAS to Databricks

### Key Advantages of Databricks Over SAS for Gold Layer Analytics

| Feature | SAS | Databricks SQL/PySpark |
|---------|-----|------------------------|
| **Array Operations** | Manual ARRAY processing | Native COLLECT_SET(), EXPLODE() |
| **Query Complexity** | Multiple PROC SQL steps | Single CTE-based queries |
| **Scalability** | Single-server memory limits | Distributed processing (petabyte scale) |
| **Modern Functions** | Limited window functions | Full SQL:2016 compliance |
| **Real-time Analytics** | Batch-only | Streaming + batch unified |
| **Visualization** | Separate tools (PROC GPLOT, SAS VA) | Built-in interactive dashboards |
| **Governance** | Manual security setup | Unity Catalog (row/column security) |
| **Version Control** | Limited | Full Delta Lake time travel |
| **Cloud Native** | Legacy architecture | Modern cloud-optimized |
| **Cost** | Expensive licensing | Pay-per-use consumption |
| **Programming** | SAS language only | SQL, Python, R, Scala, Java |

### üí° Migration Best Practices

1. **Start with Simple Queries**: Begin with straightforward PROC SQL ‚Üí Databricks SQL translations
2. **Leverage CTEs**: Replace multi-step SAS DATA/PROC steps with Common Table Expressions
3. **Use PySpark for Complex Logic**: When business rules are complex, PySpark offers more flexibility than SAS macros
4. **Embrace Modern Functions**: COLLECT_SET, EXPLODE, and window functions simplify code
5. **Unity Catalog = SAS Library**: Map SAS libraries to Unity Catalog schemas
6. **Delta Lake = SAS Datasets**: Delta tables provide ACID transactions like SAS datasets, but at scale

---


## Example 8: Understanding Lazy Evaluation & Deterministic Execution

### üéØ Learning Goal
Understand Spark's **lazy evaluation** model and best practices to ensure **deterministic execution** in production pipelines.

---

### What is Lazy Evaluation?

**Lazy Evaluation** means Spark doesn't execute transformations immediately. Instead, it builds a **logical plan** (DAG - Directed Acyclic Graph) and only executes when an **action** is called.

#### Transformations (Lazy) vs Actions (Eager)

| **Transformations (Lazy)** | **Actions (Eager)** |
|----------------------------|---------------------|
| `.select()`, `.filter()`, `.where()` | `.count()`, `.collect()`, `.show()` |
| `.join()`, `.groupBy()`, `.agg()` | `.write()`, `.saveAsTable()` |
| `.withColumn()`, `.drop()`, `.distinct()` | `.first()`, `.take()`, `.foreach()` |
| `.union()`, `.orderBy()` | `.display()` (Databricks specific) |

**Key Insight:** Transformations return DataFrames but don't execute. Actions trigger the entire execution plan.

---

### Why Does This Matter for HQRI Pipelines?

1. **Non-Deterministic Behavior**: If you recompute the same transformation multiple times without caching, you may get different results (e.g., reading streaming data, non-deterministic UDFs).

2. **Performance Issues**: Re-reading and re-transforming large datasets (like claims with millions of rows) wastes compute resources.

3. **Debugging Difficulty**: Without forcing evaluation at checkpoints, errors propagate and are harder to trace.

4. **Data Quality**: For compliance audits, you need deterministic, repeatable results.

---

### Best Practices for Deterministic Execution

1. **Cache intermediate results** that are reused multiple times
2. **Use checkpointing** for very wide transformations
3. **Force evaluation with `.count()`** at critical pipeline stages
4. **Write to Delta tables** to persist intermediate results
5. **Use `.explain()`** to understand execution plans

Let's see these practices in action!


### üß™ Demo 1: Lazy Evaluation in Action

Let's demonstrate how Spark builds execution plans without executing transformations.


In [0]:
import time
from pyspark.sql.functions import col, count, sum as _sum, round as spark_round

print("=" * 80)
print("DEMONSTRATION: Lazy Evaluation")
print("=" * 80)

# Read claims data
print("\n1Ô∏è‚É£ Reading claims table (TRANSFORMATION - no execution yet)...")
start_time = time.time()
claims_df = spark.table("payer_silver.claims")
elapsed = time.time() - start_time
print(f"   ‚è±Ô∏è Time elapsed: {elapsed:.4f} seconds")
print("   ‚úÖ DataFrame created instantly - no data read yet!")

# Apply multiple transformations
print("\n2Ô∏è‚É£ Applying transformations (LAZY - still no execution)...")
start_time = time.time()

# Filter to approved claims
approved_claims = claims_df.filter(col("claim_status") == "approved")
print("   ‚Ä¢ Filtered to approved claims")

# Add computed column
claims_with_flag = approved_claims.withColumn(
    "high_value_flag",
    col("total_charge") > 1000
)
print("   ‚Ä¢ Added high-value flag column")

# Group and aggregate
claims_summary = claims_with_flag.groupBy("member_id").agg(
    count("*").alias("total_claims"),
    _sum("total_charge").alias("total_charges"),
    spark_round(_sum("total_charge") / count("*"), 2).alias("avg_claim_amount")
)
print("   ‚Ä¢ Grouped by member_id and aggregated")

elapsed = time.time() - start_time
print(f"\n   ‚è±Ô∏è Time elapsed for 3 transformations: {elapsed:.4f} seconds")
print("   ‚úÖ All transformations completed instantly!")
print("   üí° No actual computation happened - Spark just built an execution plan")

# Now trigger execution with an ACTION
print("\n3Ô∏è‚É£ Triggering execution with .count() ACTION...")
start_time = time.time()
record_count = claims_summary.count()
elapsed = time.time() - start_time
print(f"   ‚è±Ô∏è Time elapsed: {elapsed:.4f} seconds")
print(f"   ‚úÖ Action executed! Found {record_count:,} members with approved claims")
print("   üí° This is when Spark actually read data and performed all transformations")

print("\n" + "=" * 80)
print("KEY TAKEAWAY: Transformations are lazy, actions are eager!")
print("=" * 80)


### üîç Demo 2: Understanding Execution Plans with `.explain()`

Use `.explain()` to see Spark's execution plan before running expensive operations.


In [0]:
from pyspark.sql.functions import col, year, current_date

print("=" * 80)
print("VIEWING EXECUTION PLAN with .explain()")
print("=" * 80)

# Create a complex query
members = spark.table("payer_silver.members")
claims = spark.table("payer_silver.claims")

complex_query = members \
    .withColumn("age", year(current_date()) - year(col("birth_date"))) \
    .filter(col("age") >= 65) \
    .join(claims, "member_id") \
    .filter(col("total_charge") > 500) \
    .groupBy("plan_id") \
    .count()

print("\nüìã Physical Execution Plan:")
print("-" * 80)
complex_query.explain(mode="simple")

print("\n" + "=" * 80)
print("üí° KEY INSIGHTS FROM EXECUTION PLAN:")
print("=" * 80)
print("1. Spark optimizes the query before execution (Catalyst Optimizer)")
print("2. Filters are pushed down early (reducing data to process)")
print("3. Joins are optimized based on data size")
print("4. You can spot expensive operations (e.g., SortMergeJoin, Shuffle)")
print("\nüí° Use explain() to optimize your queries BEFORE running them on production data!")
print("=" * 80)


### ‚ö° Demo 3: Forcing Deterministic Execution with Caching

**Problem:** If you use the same DataFrame multiple times without caching, Spark recomputes it every time (non-deterministic, slow).

**Solution:** Use `.cache()` or `.persist()` to materialize intermediate results in memory.

_Note: Caching and persist operations are not supported on Databricks SQL Serverless clusters (as of Nov/14/2025).
_

In [0]:
import time
from pyspark.sql.functions import col, count, avg, sum as _sum, year, current_date

print("=" * 80)
print("DEMONSTRATION: Caching for Deterministic Execution & Performance")
print("=" * 80)

# Create a complex transformation that we'll use multiple times
members = spark.table("payer_silver.members")
claims = spark.table("payer_silver.claims")

# Complex intermediate result
print("\n1Ô∏è‚É£ Creating complex intermediate DataFrame (members + claims)...")
intermediate_df = members.join(
    claims, "member_id"
).filter(
    col("total_charge") > 0
).withColumn(
    "age", year(current_date()) - year(col("birth_date"))
)

print("   ‚úÖ DataFrame created (no execution yet - lazy!)")

# All operations will recompute the DataFrame
print("\n" + "=" * 80)
print("‚ùå CACHING NOT SUPPORTED ON SERVERLESS - All operations recompute")
print("=" * 80)

print("\n2Ô∏è‚É£ First operation: Count records...")
start_time = time.time()
count_result = intermediate_df.count()
time_1 = time.time() - start_time
print(f"   ‚è±Ô∏è Time: {time_1:.4f} seconds | Result: {count_result:,} records")

print("\n3Ô∏è‚É£ Second operation: Calculate average charge...")
start_time = time.time()
avg_charge = intermediate_df.agg(
    avg("total_charge").alias("avg_charge")
).collect()[0]["avg_charge"]
time_2 = time.time() - start_time
print(f"   ‚è±Ô∏è Time: {time_2:.4f} seconds | Result: ${avg_charge:.2f}")

print("\n4Ô∏è‚É£ Third operation: Group by plan_id...")
start_time = time.time()
plan_count = intermediate_df.groupBy("plan_id").count().count()
time_3 = time.time() - start_time
print(f"   ‚è±Ô∏è Time: {time_3:.4f} seconds | Result: {plan_count} plans")

total_time_no_cache = time_1 + time_2 + time_3
print(f"\n‚è±Ô∏è Total time: {total_time_no_cache:.4f} seconds")
print("üí° Each operation re-read and re-joined the data!")

print("\n" + "=" * 80)
print("üìä PERFORMANCE COMPARISON")
print("=" * 80)
print(f"Total time: {total_time_no_cache:.4f} seconds")
print("\nüí° Caching is not available on serverless clusters. Use a classic or pro cluster for caching support.")
print("=" * 80)

### üíæ Demo 4: Checkpointing & Writing to Delta for Production Pipelines

**Caching** is great for interactive analysis, but for **production ETL pipelines**, you need durable, fault-tolerant checkpoints.

#### When to Use Each Technique:

| Technique | Use Case | Durability | Performance |
|-----------|----------|------------|-------------|
| **`.cache()`** | Interactive analysis, multiple operations on same data | ‚ùå Memory-only (lost on cluster restart) | ‚ö° Fastest (in-memory) |
| **`.persist()`** | More control over storage level (memory, disk, etc.) | ‚ö†Ô∏è Configurable | ‚ö° Fast |
| **`.checkpoint()`** | Long lineage chains, fault tolerance | ‚úÖ Durable (disk/cloud storage) | üê¢ Slower (writes to disk) |
| **`.write.saveAsTable()`** | Production pipelines, audit trails | ‚úÖ Durable (Delta Lake) | üê¢ Slower (but enables time travel!) |

Let's demonstrate best practices for production HQRI pipelines.


### üÜö Demo 5: SQL vs. PySpark - Lazy Evaluation Differences

While both **Databricks SQL** and **PySpark** use Spark's execution engine, they handle lazy evaluation slightly differently.


In [0]:
print("=" * 80)
print("SQL vs PySpark: Lazy Evaluation Differences")
print("=" * 80)

print("\nüìä DATABRICKS SQL:")
print("-" * 80)
print("‚Ä¢ CREATE TABLE AS SELECT (CTAS) is an ACTION - executes immediately")
print("‚Ä¢ CREATE OR REPLACE VIEW is LAZY - defines query but doesn't execute")
print("‚Ä¢ SELECT queries in notebooks execute immediately (action)")
print("‚Ä¢ CACHE TABLE forces materialization")

print("\nüêç PYSPARK:")
print("-" * 80)
print("‚Ä¢ Transformations (.select, .filter, .join) are LAZY")
print("‚Ä¢ Actions (.count, .show, .collect, .write) execute immediately")
print("‚Ä¢ .createOrReplaceTempView() is LAZY (just registers view)")
print("‚Ä¢ .cache() is LAZY until first action")

print("\n" + "=" * 80)
print("EXAMPLE: Creating Gold Tables with Different Approaches")
print("=" * 80)

# Approach 1: SQL CTAS (executes immediately)
print("\n1Ô∏è‚É£ SQL CTAS - Executes Immediately")
print("-" * 80)

import time
start = time.time()
spark.sql("""
    CREATE OR REPLACE TABLE payer_gold.member_summary_sql AS
    SELECT 
        member_id,
        first_name,
        last_name,
        plan_id,
        YEAR(CURRENT_DATE()) - YEAR(birth_date) as age
    FROM payer_silver.members
    WHERE member_id IS NOT NULL
""")
elapsed = time.time() - start
row_count = spark.sql("SELECT COUNT(*) as cnt FROM payer_gold.member_summary_sql").collect()[0]["cnt"]
print(f"   ‚è±Ô∏è Time: {elapsed:.4f} seconds")
print(f"   ‚úÖ Table created with {row_count:,} rows")
print("   üí° CTAS executed immediately - table is ready to query")

# Approach 2: PySpark Lazy + Write (deferred until write)
print("\n2Ô∏è‚É£ PySpark Transformations + Write")
print("-" * 80)

start = time.time()
members = spark.table("payer_silver.members")
member_summary_df = members \
    .filter(col("member_id").isNotNull()) \
    .select(
        "member_id",
        "first_name", 
        "last_name",
        "plan_id",
        (year(current_date()) - year(col("birth_date"))).alias("age")
    )
transform_time = time.time() - start
print(f"   ‚è±Ô∏è Transformation time: {transform_time:.4f} seconds")
print("   üí° Transformations completed instantly (lazy!)")

start = time.time()
member_summary_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("payer_gold.member_summary_pyspark")
write_time = time.time() - start
print(f"   ‚è±Ô∏è Write time: {write_time:.4f} seconds")
print("   ‚úÖ Table written to Delta Lake")

# Approach 3: SQL VIEW (lazy - no execution)
print("\n3Ô∏è‚É£ SQL VIEW - Lazy Definition")
print("-" * 80)

start = time.time()
spark.sql("""
    CREATE OR REPLACE VIEW payer_gold.member_summary_view AS
    SELECT 
        member_id,
        first_name,
        last_name,
        plan_id,
        YEAR(CURRENT_DATE()) - YEAR(birth_date) as age
    FROM payer_silver.members
    WHERE member_id IS NOT NULL
""")
elapsed = time.time() - start
print(f"   ‚è±Ô∏è Time: {elapsed:.4f} seconds")
print("   ‚úÖ View created instantly (no data processed)")
print("   üí° View is just a query definition - executes when queried")

# Query the view (this executes it)
start = time.time()
view_count = spark.sql("SELECT COUNT(*) as cnt FROM payer_gold.member_summary_view").collect()[0]["cnt"]
elapsed = time.time() - start
print(f"\n   Querying the view:")
print(f"   ‚è±Ô∏è Time: {elapsed:.4f} seconds")
print(f"   ‚úÖ Result: {view_count:,} rows")
print("   üí° View executes query every time it's queried (recomputes data)")

print("\n" + "=" * 80)
print("üéØ KEY TAKEAWAYS")
print("=" * 80)
print("1. SQL CTAS creates tables immediately (good for materialization)")
print("2. SQL VIEWs defer execution (good for dynamic queries)")
print("3. PySpark transformations are lazy (gives control over execution)")
print("4. Use CTAS/saveAsTable for Gold layer tables (deterministic)")
print("5. Use VIEWs for frequently-changing business logic")
print("=" * 80)


## üéØ YOUR TURN! Practice Exercise (After today's session)

### Exercise: Build a Deterministic Provider Performance Pipeline

**Scenario:** You need to create a Gold layer table that calculates provider performance metrics. The pipeline needs to be:
- ‚úÖ Deterministic (reproducible results)
- ‚úÖ Performant (uses caching appropriately)
- ‚úÖ Production-ready (includes checkpoints and validation)

**Requirements:**

1. **Stage 1:** Join claims with diagnosis and HCC data
   - Filter to only approved claims with HCC categories
   - Checkpoint this intermediate result to `payer_gold.claims_with_hcc_checkpoint`
   - Validate with `.count()`

2. **Stage 2:** Calculate provider-level metrics from the checkpoint
   - Metrics needed:
     - Total claims per provider
     - Unique HCC categories captured
     - Average HCC coefficient per claim
     - Total attributed revenue
   - Cache this result for multiple uses

3. **Stage 3:** Generate two outputs from cached data
   - High performers: Providers with > 5 unique HCCs
   - Specialty summary: Average metrics by specialty

4. **Clean up:** Unpersist cache when done

**Starter Code:**


In [0]:
# TODO: Complete this exercise following best practices for deterministic execution

from pyspark.sql.functions import col, count, countDistinct, avg, sum as _sum, round as spark_round

print("üéØ EXERCISE: Provider Performance Pipeline")
print("=" * 80)

# Stage 1: Join and checkpoint
print("\nüìã Stage 1: Creating claims with HCC data...")
# TODO: Read tables
claims = spark.table("payer_silver.claims")
providers = spark.table("payer_silver.providers")
diagnosis_hcc = spark.table("payer_gold.diagnosis_with_hcc")

# TODO: Join claims with providers and diagnosis_hcc
# TODO: Filter to approved claims with HCC categories (is_hcc == 1)
# TODO: Write to checkpoint table 'payer_gold.claims_with_hcc_checkpoint'
# TODO: Validate with .count()

# YOUR CODE HERE:
claims_with_hcc = None  # Replace with your transformation

# Uncomment when ready:
# claims_with_hcc.write.format("delta").mode("overwrite").saveAsTable("payer_gold.claims_with_hcc_checkpoint")
# stage1_count = spark.table("payer_gold.claims_with_hcc_checkpoint").count()
# print(f"‚úÖ Stage 1 complete: {stage1_count:,} claims with HCC data")


# Stage 2: Calculate provider metrics and cache
print("\nüìã Stage 2: Calculating provider metrics...")
# TODO: Read from checkpoint
# TODO: Group by provider_id, provider_name, specialty
# TODO: Calculate metrics (total claims, unique HCCs, avg coefficient, total revenue)
# TODO: Cache the result
# TODO: Trigger caching with .count()

# YOUR CODE HERE:
provider_metrics = None  # Replace with your aggregation

# Uncomment when ready:
# provider_metrics_cached = provider_metrics.cache()
# stage2_count = provider_metrics_cached.count()
# print(f"‚úÖ Stage 2 complete: {stage2_count:,} providers cached")


# Stage 3: Generate outputs from cached data
print("\nüìã Stage 3: Generating reports...")
# TODO: Filter to high performers (> 5 unique HCCs)
# TODO: Calculate specialty summary

# YOUR CODE HERE:
# high_performers = provider_metrics_cached.filter(...)
# specialty_summary = provider_metrics_cached.groupBy(...)

# print(f"‚úÖ Found {high_performers.count()} high-performing providers")
# print(f"‚úÖ Created summary for {specialty_summary.count()} specialties")


# Stage 4: Clean up
# TODO: Unpersist cache
# YOUR CODE HERE:
# provider_metrics_cached.unpersist()
# print("\n‚úÖ Cache cleared")

print("\n" + "=" * 80)
print("üí° TIP: Check the solution in the next cell!")
print("=" * 80)


### üí° Solution: Provider Performance Pipeline




In [0]:
from pyspark.sql.functions import (
    col, count, countDistinct, avg, sum as _sum, round as spark_round
)

print("‚úÖ SOLUTION: Provider Performance Pipeline")
print("=" * 80)

# Stage 1: Join and checkpoint
print("\nüìã Stage 1: Creating claims with HCC data...")

claims = spark.table("payer_silver.claims")
providers = spark.table("payer_silver.providers")
diagnosis_hcc = spark.table("payer_gold.diagnosis_with_hcc")

claims_with_hcc = (
    claims
    .join(providers, "provider_id")
    .join(diagnosis_hcc, "claim_id")
    .filter(col("claim_status") == "approved")
    .filter(col("is_hcc") == 1)
    .select(
        "claim_id",
        "provider_id",
        "provider_name",
        "specialty",
        "hcc_category",
        "hcc_coefficient",
        "total_charge"
    )
)

claims_with_hcc.write.format("delta").mode("overwrite").option(
    "overwriteSchema", "true"
).saveAsTable("payer_gold.claims_with_hcc_checkpoint")

stage1_count = spark.table("payer_gold.claims_with_hcc_checkpoint").count()
print(f"‚úÖ Stage 1 complete: {stage1_count:,} claims with HCC data checkpointed")

# Stage 2: Calculate provider metrics (no cache)
print("\nüìã Stage 2: Calculating provider metrics...")

claims_checkpoint = spark.table("payer_gold.claims_with_hcc_checkpoint")

provider_metrics = claims_checkpoint.groupBy(
    "provider_id",
    "provider_name",
    "specialty"
).agg(
    count("claim_id").alias("total_claims"),
    countDistinct("hcc_category").alias("unique_hcc_count"),
    spark_round(avg("hcc_coefficient"), 3).alias("avg_hcc_coefficient"),
    spark_round(_sum("hcc_coefficient") * 10000, 2).alias("total_attributed_revenue")
)

stage2_count = provider_metrics.count()
print(f"‚úÖ Stage 2 complete: {stage2_count:,} providers with metrics")

# Stage 3: Generate outputs from provider_metrics (no cache)
print("\nüìã Stage 3: Generating reports...")

high_performers = provider_metrics.filter(col("unique_hcc_count") > 5)
high_performer_count = high_performers.count()
print(f"‚úÖ Found {high_performer_count:,} high-performing providers")

print("\nüèÜ Top 5 High Performers:")
display(
    high_performers.orderBy(col("unique_hcc_count").desc()).limit(5)
)

specialty_summary = provider_metrics.groupBy("specialty").agg(
    count("provider_id").alias("provider_count"),
    spark_round(avg("total_claims"), 1).alias("avg_claims_per_provider"),
    spark_round(avg("unique_hcc_count"), 1).alias("avg_hccs_captured"),
    spark_round(avg("total_attributed_revenue"), 2).alias("avg_revenue_per_provider")
)
specialty_count = specialty_summary.count()
print(f"\n‚úÖ Created summary for {specialty_count} specialties")

print("\nüìä Specialty Performance Summary:")
display(
    specialty_summary.orderBy(col("avg_revenue_per_provider").desc())
)

# Stage 4: Clean up (no cache to unpersist)
print("\nüìã Stage 4: Cleanup...")

print("\n" + "=" * 80)
print("üéâ Pipeline complete! All stages used deterministic execution practices.")
print("=" * 80)
print("\nüìù Key Practices Demonstrated:")
print("   1. ‚úÖ Wrote checkpoint to Delta table (durable)")
print("   2. ‚úÖ Validated each stage with .count()")
print("   3. ‚úÖ Read from checkpoint (deterministic)")
print("=" * 80)

# AI/BI

Intelligent analytics for everyone!

Databricks AI/BI is a new type of business intelligence product designed to provide a deep understanding of your data's semantics, enabling self-service data analysis for everyone in your organization. AI/BI is built on a compound AI system that draws insights from the full lifecycle of your data across the Databricks platform, including ETL pipelines, lineage, and other queries.

<img src="https://www.databricks.com/sites/default/files/2025-05/hero-image-ai-bi-v2-2x.png?v=1748417271" alt="Managed Tables" width="600" height="500">

# Genie

Talk with your data

Now everyone can get insights from data simply by asking questions in natural language.

<img src="https://www.databricks.com/sites/default/files/2025-06/ai-bi-genie-hero.png?v=1749162682" alt="Managed Tables" width="600" height="500">


# üéì Workshop Summary & Next Steps

## üéâ Congratulations!

You've completed the Databricks Healthcare Payer Analytics Workshop! Let's review what you learned:

---

## ‚úÖ What You Accomplished

### 1. **Medallion Architecture**
- ‚úÖ Built a complete **Bronze ‚Üí Silver ‚Üí Gold** pipeline
- ‚úÖ Understood data quality improvement at each layer
- ‚úÖ Created analytics-ready datasets

### 2. **Data Engineering Skills**
- ‚úÖ Loaded data using **COPY INTO**
- ‚úÖ Transformed data with **SQL and PySpark**
- ‚úÖ Applied data quality checks and validations
- ‚úÖ Created aggregations and derived metrics

### 3. **Analytics & Visualization**
- ‚úÖ Generated business insights from data
- ‚úÖ Created interactive visualizations
- ‚úÖ Performed statistical analysis
- ‚úÖ Built executive dashboards

### 4. **Databricks Platform**
- ‚úÖ Worked with **Unity Catalog**
- ‚úÖ Used **Delta Lake** for reliable data storage
- ‚úÖ Leveraged **AI Assistant** for code help
- ‚úÖ Applied performance optimization techniques

### 5. **Lazy Evaluation & Deterministic Execution**
- ‚úÖ Understood **Spark's lazy evaluation** model (transformations vs actions)
- ‚úÖ Used **`.cache()`** and **`.persist()`** for performance optimization
- ‚úÖ Implemented **checkpointing** with Delta tables for production pipelines
- ‚úÖ Applied **`.count()`** to force evaluation at critical pipeline stages
- ‚úÖ Used **`.explain()`** to analyze and optimize execution plans
- ‚úÖ Built **deterministic, reproducible** pipelines for compliance
- ‚úÖ Practiced proper **memory management** (unpersist caches)
- ‚úÖ Compared **SQL CTAS vs VIEWs** for different use cases

### 6. **Refer to the Best Practices!**
Best practices notebook: _**[Reference] Best Practices**_

---

## üìù Feedback

We'd love to hear your thoughts on this workshop!

**What worked well?** What could be improved? **What topics do you want to learn next?**

---

## üôè Thank You!

Thank you for participating in this workshop. We hope you found it valuable and are excited to continue your Databricks journey! üöÄ

---
