In [0]:
%pip install faker
dbutils.notebook.exit("Restarting Python to complete installation.")

In [0]:
# Initialize Spark
spark = SparkSession.builder.appName("HealthcareClaimsGenerator").getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS healthcare_claims")



DataFrame[]

In [0]:
# Synthetic Healthcare Claims Data Generator
# For use in Databricks

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import random
import datetime
from faker import Faker

# Initialize Faker to generate realistic data
fake = Faker()
Faker.seed(42)  # For reproducibility
random.seed(42)

# Initialize Spark
spark = SparkSession.builder.appName("HealthcareClaimsGenerator").getOrCreate()

# Define schemas for our data
patient_schema = StructType([
    StructField("patient_id", StringType(), False),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("email", StringType(), True),
    StructField("insurance_id", StringType(), True),
    StructField("created_date", TimestampType(), True),
    StructField("updated_date", TimestampType(), True)
])

provider_schema = StructType([
    StructField("provider_id", StringType(), False),
    StructField("provider_name", StringType(), True),
    StructField("provider_type", StringType(), True),
    StructField("npi_number", StringType(), True),
    StructField("specialty", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("email", StringType(), True),
    StructField("created_date", TimestampType(), True),
    StructField("updated_date", TimestampType(), True)
])

insurance_schema = StructType([
    StructField("insurance_id", StringType(), False),
    StructField("insurance_name", StringType(), True),
    StructField("plan_type", StringType(), True),
    StructField("plan_number", StringType(), True),
    StructField("contact_phone", StringType(), True),
    StructField("contact_email", StringType(), True),
    StructField("created_date", TimestampType(), True),
    StructField("updated_date", TimestampType(), True)
])

claims_schema = StructType([
    StructField("claim_id", StringType(), False),
    StructField("patient_id", StringType(), True),
    StructField("provider_id", StringType(), True),
    StructField("insurance_id", StringType(), True),
    StructField("claim_date", DateType(), True),
    StructField("admission_date", DateType(), True),
    StructField("discharge_date", DateType(), True),
    StructField("claim_type", StringType(), True),
    StructField("diagnosis_codes", ArrayType(StringType()), True),
    StructField("procedure_codes", ArrayType(StringType()), True),
    StructField("place_of_service", StringType(), True),
    StructField("total_charge", DoubleType(), True),
    StructField("copay_amount", DoubleType(), True),
    StructField("insurance_paid", DoubleType(), True),
    StructField("patient_responsibility", DoubleType(), True),
    StructField("claim_status", StringType(), True),
    StructField("denial_reason", StringType(), True),
    StructField("created_date", TimestampType(), True),
    StructField("updated_date", TimestampType(), True)
])

claim_details_schema = StructType([
    StructField("claim_detail_id", StringType(), False),
    StructField("claim_id", StringType(), True),
    StructField("service_date", DateType(), True),
    StructField("procedure_code", StringType(), True),
    StructField("diagnosis_code", StringType(), True),
    StructField("charge_amount", DoubleType(), True),
    StructField("units", IntegerType(), True),
    StructField("modifier", StringType(), True),
    StructField("revenue_code", StringType(), True),
    StructField("created_date", TimestampType(), True),
    StructField("updated_date", TimestampType(), True)
])

# Sample data lists for generation
insurance_companies = [
    ("Blue Cross Blue Shield", "PPO", "Commercial"),
    ("Aetna", "HMO", "Commercial"),
    ("UnitedHealthcare", "EPO", "Commercial"),
    ("Cigna", "POS", "Commercial"),
    ("Humana", "HDHP", "Commercial"),
    ("Medicare", "Part A", "Government"),
    ("Medicare", "Part B", "Government"),
    ("Medicaid", "Standard", "Government"),
    ("Tricare", "Prime", "Government"),
    ("Kaiser Permanente", "HMO", "Commercial")
]

provider_types = [
    "Hospital", "Primary Care", "Specialist", "Urgent Care", 
    "Outpatient Facility", "Laboratory", "Imaging Center", 
    "Pharmacy", "Home Health", "Skilled Nursing Facility"
]

specialties = [
    "Family Medicine", "Internal Medicine", "Pediatrics", "Cardiology",
    "Dermatology", "Endocrinology", "Gastroenterology", "Neurology",
    "Obstetrics and Gynecology", "Oncology", "Ophthalmology", "Orthopedics",
    "Psychiatry", "Pulmonology", "Radiology", "Urology", "General Surgery"
]

# ICD-10 Diagnosis codes (sample)
diagnosis_codes = [
    "I10", "E11.9", "J44.9", "F33.1", "M54.5", "K21.9", 
    "J45.909", "I25.10", "N18.9", "G47.00", "M17.9", "E78.5", 
    "J06.9", "R10.9", "Z23", "R07.9", "H60.501", "N39.0", "R51"
]

# CPT Procedure codes (sample)
procedure_codes = [
    "99213", "99214", "99215", "99203", "99204", "99205", 
    "90471", "90715", "J0696", "36415", "80053", "71045", 
    "93000", "20610", "29125", "G0008", "G0009", "97110" 
]

claim_types = ["Inpatient", "Outpatient", "Emergency", "Office Visit", "Laboratory", "Imaging", "Pharmacy"]
claim_statuses = ["Submitted", "In Process", "Pending", "Denied", "Paid", "Appealed", "Voided"]
denial_reasons = ["", "Medical necessity", "Non-covered service", "Duplicate claim", "Invalid code", "Timely filing", "Missing information"]
places_of_service = ["11", "21", "22", "23", "24", "31", "32", "81"]  # Standard CMS POS codes

# Function to generate a random date within range
def random_date(start_date, end_date):
    time_between = end_date - start_date
    days_between = time_between.days
    random_days = random.randrange(days_between)
    return start_date + datetime.timedelta(days=random_days)

# Function to generate patient data
def generate_patients(num_patients=1000):
    patients = []
    
    for i in range(1, num_patients + 1):
        patient_id = f"PT{str(i).zfill(6)}"
        gender = random.choice(["M", "F"])
        first_name = fake.first_name_male() if gender == "M" else fake.first_name_female()
        last_name = fake.last_name()
        
        # Generate date of birth (between 18 and 90 years old)
        dob = random_date(
            datetime.date.today() - datetime.timedelta(days=90*365),
            datetime.date.today() - datetime.timedelta(days=18*365)
        )
        
        # Generate insurance ID (will be linked to insurance table)
        insurance_id = f"INS{str(random.randint(1, 10)).zfill(4)}"
        
        # Generated timestamps for record creation and update
        created_date = datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 730))
        updated_date = created_date + datetime.timedelta(days=random.randint(0, 30))
        
        patients.append((
            patient_id,
            first_name,
            last_name,
            gender,
            dob,
            fake.street_address(),
            fake.city(),
            fake.state_abbr(),
            fake.zipcode(),
            fake.phone_number(),
            fake.email(),
            insurance_id,
            created_date,
            updated_date
        ))
    
    return spark.createDataFrame(patients, patient_schema)

# Function to generate provider data
def generate_providers(num_providers=100):
    providers = []
    
    for i in range(1, num_providers + 1):
        provider_id = f"PR{str(i).zfill(6)}"
        provider_type = random.choice(provider_types)
        specialty = random.choice(specialties) if provider_type in ["Primary Care", "Specialist"] else ""
        
        # Generate NPI (National Provider Identifier)
        npi = str(random.randint(1000000000, 9999999999))
        
        # Generated timestamps for record creation and update
        created_date = datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 1095))
        updated_date = created_date + datetime.timedelta(days=random.randint(0, 30))
        
        providers.append((
            provider_id,
            fake.company() + " " + provider_type,
            provider_type,
            npi,
            specialty,
            fake.street_address(),
            fake.city(),
            fake.state_abbr(),
            fake.zipcode(),
            fake.phone_number(),
            fake.company_email(),
            created_date,
            updated_date
        ))
    
    return spark.createDataFrame(providers, provider_schema)

# Function to generate insurance data
def generate_insurance(num_insurance=10):
    insurances = []
    
    for i in range(1, num_insurance + 1):
        insurance_id = f"INS{str(i).zfill(4)}"
        
        # Select a random insurance company and plan type
        company_info = random.choice(insurance_companies)
        company_name = company_info[0]
        plan_type = company_info[1]
        
        # Generated timestamps for record creation and update
        created_date = datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 1095))
        updated_date = created_date + datetime.timedelta(days=random.randint(0, 30))
        
        insurances.append((
            insurance_id,
            company_name,
            plan_type,
            fake.bothify(text="??###????"),  # Random alphanumeric plan number
            fake.phone_number(),
            fake.company_email(),
            created_date,
            updated_date
        ))
    
    return spark.createDataFrame(insurances, insurance_schema)

# Function to generate claims data
def generate_claims(num_claims=5000, patients_df=None, providers_df=None, insurance_df=None):
    if patients_df is None or providers_df is None or insurance_df is None:
        raise ValueError("Patient, provider, and insurance DataFrames are required")
    
    # Collect IDs to use for references
    patient_ids = [row.patient_id for row in patients_df.select("patient_id").collect()]
    provider_ids = [row.provider_id for row in providers_df.select("provider_id").collect()]
    insurance_ids = [row.insurance_id for row in insurance_df.select("insurance_id").collect()]
    
    claims = []
    
    for i in range(1, num_claims + 1):
        claim_id = f"CLM{str(i).zfill(7)}"
        patient_id = random.choice(patient_ids)
        provider_id = random.choice(provider_ids)
        
        # Find the patient's insurance ID
        patient_row = patients_df.filter(F.col("patient_id") == patient_id).first()
        insurance_id = patient_row.insurance_id
        
        # Generate claim dates
        claim_date = random_date(
            datetime.date.today() - datetime.timedelta(days=365),
            datetime.date.today() - datetime.timedelta(days=1)
        )
        
        claim_type = random.choice(claim_types)
        
        # For inpatient claims, add admission and discharge dates
        if claim_type == "Inpatient":
            admission_date = claim_date
            los = random.randint(1, 10)  # Length of stay
            discharge_date = admission_date + datetime.timedelta(days=los)
        else:
            admission_date = None
            discharge_date = None
        
        # Generate diagnosis and procedure codes
        num_dx = random.randint(1, 4)
        num_proc = random.randint(1, 3)
        
        dx_codes = random.sample(diagnosis_codes, num_dx)
        proc_codes = random.sample(procedure_codes, num_proc)
        
        # Generate financial data
        base_charge = random.uniform(100, 5000)
        if claim_type == "Inpatient":
            base_charge *= random.uniform(5, 20)
        
        total_charge = round(base_charge, 2)
        copay_amount = round(random.uniform(0, 50), 2)
        
        # Determine claim status
        claim_status = random.choice(claim_statuses)
        
        # For denied claims, add a denial reason
        if claim_status == "Denied":
            denial_reason = random.choice(denial_reasons[1:])  # Skip the empty reason
        else:
            denial_reason = ""
        
        # Calculate insurance and patient payments based on status
        if claim_status == "Paid":
            insurance_paid = round(total_charge * random.uniform(0.7, 0.95), 2)
            patient_responsibility = round(total_charge - insurance_paid - copay_amount, 2)
        elif claim_status == "Denied":
            insurance_paid = 0
            patient_responsibility = round(total_charge, 2)
        else:
            insurance_paid = 0
            patient_responsibility = 0
        
        # Place of service
        place_of_service = random.choice(places_of_service)
        
        # Generated timestamps for record creation and update
        created_date = datetime.datetime.combine(claim_date, datetime.datetime.min.time()) + datetime.timedelta(days=random.randint(1, 5))
        updated_date = created_date + datetime.timedelta(days=random.randint(0, 30))
        
        claims.append((
            claim_id,
            patient_id,
            provider_id,
            insurance_id,
            claim_date,
            admission_date,
            discharge_date,
            claim_type,
            dx_codes,
            proc_codes,
            place_of_service,
            total_charge,
            copay_amount,
            insurance_paid,
            patient_responsibility,
            claim_status,
            denial_reason,
            created_date,
            updated_date
        ))
    
    return spark.createDataFrame(claims, claims_schema)

# Function to generate claim details
def generate_claim_details(claims_df=None):
    if claims_df is None:
        raise ValueError("Claims DataFrame is required")
    
    claim_details = []
    detail_id_counter = 1
    
    # For each claim, generate 1 to 5 line items
    for claim in claims_df.collect():
        claim_id = claim.claim_id
        claim_date = claim.claim_date
        
        # Determine number of line items
        num_details = random.randint(1, 5)
        
        # Get the procedure codes from the claim
        procedure_codes_list = claim.procedure_codes
        diagnosis_codes_list = claim.diagnosis_codes
        
        # If we need more procedure codes than we have, use random ones
        if num_details > len(procedure_codes_list):
            extra_needed = num_details - len(procedure_codes_list)
            extra_codes = random.sample(procedure_codes, extra_needed)
            procedure_codes_list = procedure_codes_list + extra_codes
        
        # Create line items for this claim
        total_charge = 0
        
        for j in range(num_details):
            claim_detail_id = f"DTL{str(detail_id_counter).zfill(8)}"
            detail_id_counter += 1
            
            # Use a procedure code from the claim
            procedure_code = procedure_codes_list[j] if j < len(procedure_codes_list) else random.choice(procedure_codes)
            
            # Use a diagnosis code from the claim
            diagnosis_code = diagnosis_codes_list[0] if diagnosis_codes_list else random.choice(diagnosis_codes)
            
            # Generate units (typically 1, but can be more for certain services)
            units = random.randint(1, 3)
            
            # Generate charge amount
            charge_amount = round(random.uniform(50, 500), 2) * units
            total_charge += charge_amount
            
            # Service date (same as claim date or within a few days)
            service_date = claim_date + datetime.timedelta(days=random.randint(0, 3))
            
            # Modifiers and revenue codes
            modifier = random.choice(["", "25", "59", "XU", "TC", "26"])
            revenue_code = random.choice(["", "0100", "0120", "0250", "0270", "0300", "0320", "0450", "0510", "0636"])
            
            # Generated timestamps for record creation and update
            created_date = claim.created_date
            updated_date = claim.updated_date
            
            claim_details.append((
                claim_detail_id,
                claim_id,
                service_date,
                procedure_code,
                diagnosis_code,
                charge_amount,
                units,
                modifier,
                revenue_code,
                created_date,
                updated_date
            ))
    
    return spark.createDataFrame(claim_details, claim_details_schema)

# Generate the data
print("Generating patient data...")
patients_df = generate_patients(1000)

print("Generating provider data...")
providers_df = generate_providers(100)

print("Generating insurance data...")
insurance_df = generate_insurance(10)

print("Generating claims data...")
claims_df = generate_claims(5000, patients_df, providers_df, insurance_df)

print("Generating claim details...")
claim_details_df = generate_claim_details(claims_df)


print("Data generation complete!")

# Save data to Delta tables
print("Saving to Delta tables...")
patients_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.patients")
providers_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.providers")
insurance_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.insurance")
claims_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.claims")
claim_details_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.claim_details")

# Sample queries to verify data
print("\nSample patients:")
spark.sql("SELECT * FROM healthcare_claims.patients LIMIT 5").show(truncate=False)

print("\nClaim counts by status:")
spark.sql("""
  SELECT claim_status, COUNT(*) as claim_count 
  FROM healthcare_claims.claims 
  GROUP BY claim_status 
  ORDER BY claim_count DESC
""").show()

print("\nTop diagnosis codes:")
spark.sql("""
  SELECT diagnosis_code, COUNT(*) as frequency 
  FROM healthcare_claims.claim_details 
  GROUP BY diagnosis_code 
  ORDER BY frequency DESC 
  LIMIT 10
""").show()

print("\nAverage claim amount by claim type:")
spark.sql("""
  SELECT claim_type, 
         COUNT(*) as claim_count,
         ROUND(AVG(total_charge), 2) as avg_charge 
  FROM healthcare_claims.claims 
  GROUP BY claim_type 
  ORDER BY avg_charge DESC
""").show()

Generating patient data...
Generating provider data...
Generating insurance data...
Generating claims data...
Generating claim details...
Saving to Delta tables...


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-3035526575224895>, line 453[0m
[1;32m    451[0m [38;5;66;03m# Save data to Delta tables[39;00m
[1;32m    452[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mSaving to Delta tables...[39m[38;5;124m"[39m)
[0;32m--> 453[0m patients_df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124mhealthcare_claims.patients[39m[38;5;124m"[39m)
[1;32m    454[0m providers_df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5

In [0]:
# Save data to Delta tables
print("Saving to Delta tables...")
patients_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.patients")
providers_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.providers")
insurance_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.insurance")
claims_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.claims")
claim_details_df.write.format("delta").mode("overwrite").saveAsTable("healthcare_claims.claim_details")

# Sample queries to verify data
print("\nSample patients:")
spark.sql("SELECT * FROM healthcare_claims.patients LIMIT 5").show(truncate=False)

print("\nClaim counts by status:")
spark.sql("""
  SELECT claim_status, COUNT(*) as claim_count 
  FROM healthcare_claims.claims 
  GROUP BY claim_status 
  ORDER BY claim_count DESC
""").show()

print("\nTop diagnosis codes:")
spark.sql("""
  SELECT diagnosis_code, COUNT(*) as frequency 
  FROM healthcare_claims.claim_details 
  GROUP BY diagnosis_code 
  ORDER BY frequency DESC 
  LIMIT 10
""").show()

print("\nAverage claim amount by claim type:")
spark.sql("""
  SELECT claim_type, 
         COUNT(*) as claim_count,
         ROUND(AVG(total_charge), 2) as avg_charge 
  FROM healthcare_claims.claims 
  GROUP BY claim_type 
  ORDER BY avg_charge DESC
""").show()

Saving to Delta tables...

Sample patients:
+----------+----------+---------+------+-------------+-----------------------------+---------------+-----+--------+------------------+----------------------------+------------+--------------------------+--------------------------+
|patient_id|first_name|last_name|gender|date_of_birth|address                      |city           |state|zip_code|phone             |email                       |insurance_id|created_date              |updated_date              |
+----------+----------+---------+------+-------------+-----------------------------+---------------+-----+--------+------------------+----------------------------+------------+--------------------------+--------------------------+
|PT000001  |Mark      |Johnson  |M     |1937-06-26   |32181 Johnson Course Apt. 389|New Jamesside  |MT   |29394   |394.802.6542x351  |howardmaurice@example.com   |INS0005     |2024-05-31 18:05:06.42537 |2024-06-07 18:05:06.42537 |
|PT000002  |Robert    |Blair    