In [1]:
# From notebooks/ folder, go up one level to find outputs/
import duckdb
import pandas as pd
con = duckdb.connect('../outputs/analytics.duckdb', read_only=True)

In [2]:
# 2. Checking all available tables
con.execute("SHOW TABLES").fetchdf()

Unnamed: 0,name
0,claims_raw
1,employees_raw
2,plans_raw


In [3]:
# 3. Checking total counts in staging tables
con.execute("""
    SELECT 
        'employees' as table_name, COUNT(*) as row_count FROM employees_raw
    UNION ALL
    SELECT 'plans', COUNT(*) FROM plans_raw
    UNION ALL
    SELECT 'claims', COUNT(*) FROM claims_raw
""").fetchdf()

Unnamed: 0,table_name,row_count
0,employees,50
1,plans,15
2,claims,200


In [4]:
# 4. Sampling employee data
con.execute("SELECT * FROM employees_raw LIMIT 10").fetchdf()

Unnamed: 0,person_id,full_name,title,email,company_ein,start_date,notes
0,1,Name1,VP HR,user1@bluehorizon.io,33-3333333,2022-03-29,notes: messy row?
1,2,Name2,HR Manager,user2@pinecrestfoods.com,33-3333333,2022-04-18,notes: messy row?
2,3,Name3,,user3@acme.com,11-1111111,2022-04-15,notes: messy row?
3,4,Name4,Sr Engineer,user4@acme.com,22-2222222,2022-01-04,notes: messy row?
4,5,Name5,Director Finance,user5@pinecrestfoods.com,22-2222222,2022-09-04,notes: messy row?
5,6,Name6,VP HR,user6@acme.com,22-2222222,2022-05-08,notes: messy row?
6,7,Name7,Sr Engineer,user7@nowhere.com,33-3333333,2022-09-19,notes: messy row?
7,8,Name8,Sr Engineer,user8@acme.com,22-2222222,2022-06-01,notes: messy row?
8,9,Name9,VP HR,user9@pinecrestfoods.com,22-2222222,2022-03-05,notes: messy row?
9,10,Name10,VP HR,user10@bluehorizon.io,11-1111111,2022-09-25,notes: messy row?


In [5]:
# 5. Employees by company with names
con.execute("""
    SELECT 
        company_ein,
        COUNT(*) as employee_count,
        COUNT(DISTINCT email) as unique_emails
    FROM employees_raw
    GROUP BY company_ein
    ORDER BY employee_count DESC
""").fetchdf()

Unnamed: 0,company_ein,employee_count,unique_emails
0,11-1111111,21,21
1,33-3333333,12,12
2,22-2222222,11,11
3,,6,6


In [6]:
# 6. Viewing all insurance plans
con.execute("""
    SELECT 
        company_ein,
        plan_type,
        carrier_name,
        start_date,
        end_date
    FROM plans_raw
    ORDER BY company_ein, start_date
""").fetchdf()

Unnamed: 0,company_ein,plan_type,carrier_name,start_date,end_date
0,11-1111111,Medical,Cigna,2022-01-03,2022-03-19
1,11-1111111,Vision,Cigna,2022-01-05,2022-06-26
2,11-1111111,Dental,Aetna,2022-02-14,2022-06-11
3,11-1111111,Medical,Aetna,2022-02-20,2022-07-30
4,11-1111111,Medical,Kaiser,2022-03-08,2022-08-21
5,22-2222222,Vision,Aetna,2022-01-01,2022-04-05
6,22-2222222,Dental,Kaiser,2022-02-21,2022-06-18
7,22-2222222,Vision,Cigna,2022-02-22,2022-08-14
8,22-2222222,Vision,Aetna,2022-05-04,2022-10-09
9,22-2222222,Dental,Aetna,2022-05-19,2022-11-04


In [7]:
# 7. Claims summary by company
con.execute("""
    SELECT 
        company_ein,
        COUNT(*) as total_claims,
        SUM(amount) as total_amount,
        AVG(amount) as avg_claim_amount,
        MIN(amount) as min_claim,
        MAX(amount) as max_claim
    FROM claims_raw
    GROUP BY company_ein
    ORDER BY total_amount DESC
""").fetchdf()

Unnamed: 0,company_ein,total_claims,total_amount,avg_claim_amount,min_claim,max_claim
0,11-1111111,77,194564.16,2526.807273,87.59,4981.02
1,22-2222222,67,168692.91,2517.804627,70.24,4896.88
2,33-3333333,56,118433.03,2114.875536,57.8,4784.42


In [8]:
# 8. Claims over time (daily aggregation)
con.execute("""
    SELECT 
        DATE_TRUNC('day', service_date) as date,
        COUNT(*) as claim_count,
        SUM(amount) as daily_total
    FROM claims_raw
    GROUP BY DATE_TRUNC('day', service_date)
    ORDER BY date
""").fetchdf()

Unnamed: 0,date,claim_count,daily_total
0,2022-01-02,1,2956.18
1,2022-01-04,1,1338.55
2,2022-01-06,2,4257.43
3,2022-01-07,2,4548.59
4,2022-01-08,1,921.66
...,...,...,...
139,2022-10-22,1,3252.27
140,2022-10-23,1,417.54
141,2022-10-25,1,1781.61
142,2022-10-26,1,3172.99


In [9]:
# 9. Top 10 highest claims
con.execute("""
    SELECT 
        claim_id,
        company_ein,
        service_date,
        amount,
        claim_type
    FROM claims_raw
    ORDER BY amount DESC
    LIMIT 10
""").fetchdf()

Unnamed: 0,claim_id,company_ein,service_date,amount,claim_type
0,85,11-1111111,2022-10-18,4981.02,Medical
1,108,22-2222222,2022-09-15,4896.88,Vision
2,114,11-1111111,2022-06-11,4810.79,Vision
3,176,33-3333333,2022-09-22,4784.42,Dental
4,183,11-1111111,2022-01-26,4735.07,Vision
5,134,22-2222222,2022-03-14,4699.83,Vision
6,141,22-2222222,2022-06-21,4692.6,Dental
7,150,22-2222222,2022-08-21,4643.91,Medical
8,139,33-3333333,2022-06-11,4620.01,Dental
9,138,11-1111111,2022-06-20,4602.14,Medical


In [10]:
# 10. Employees with missing data
con.execute("""
    SELECT 
        COUNT(*) as total_employees,
        SUM(CASE WHEN company_ein IS NULL THEN 1 ELSE 0 END) as missing_ein,
        SUM(CASE WHEN email IS NULL THEN 1 ELSE 0 END) as missing_email,
        SUM(CASE WHEN title IS NULL THEN 1 ELSE 0 END) as missing_title
    FROM employees_raw
""").fetchdf()

Unnamed: 0,total_employees,missing_ein,missing_email,missing_title
0,50,6.0,0.0,8.0


In [11]:

# 11. Plan coverage timeline by company
con.execute("""
    SELECT 
        company_ein,
        plan_type,
        MIN(start_date) as first_coverage,
        MAX(end_date) as last_coverage,
        COUNT(*) as plan_changes
    FROM plans_raw
    GROUP BY company_ein, plan_type
    ORDER BY company_ein, plan_type
""").fetchdf()

Unnamed: 0,company_ein,plan_type,first_coverage,last_coverage,plan_changes
0,11-1111111,Dental,2022-02-14,2022-06-11,1
1,11-1111111,Medical,2022-01-03,2022-08-21,3
2,11-1111111,Vision,2022-01-05,2022-06-26,1
3,22-2222222,Dental,2022-02-21,2022-11-04,2
4,22-2222222,Vision,2022-01-01,2022-10-09,3
5,33-3333333,Dental,2022-06-26,2022-12-17,1
6,33-3333333,Medical,2022-01-27,2022-09-21,2
7,33-3333333,Vision,2022-02-04,2022-08-14,2


In [12]:
# 12. Email domains distribution
con.execute("""
    SELECT 
        SUBSTRING(email, POSITION('@' IN email) + 1) as email_domain,
        COUNT(*) as employee_count
    FROM employees_raw
    WHERE email IS NOT NULL
    GROUP BY email_domain
    ORDER BY employee_count DESC
""").fetchdf()

Unnamed: 0,email_domain,employee_count
0,pinecrestfoods.com,16
1,acme.com,13
2,nowhere.com,11
3,bluehorizon.io,10


In [13]:
# Query 1: Plan Gap Detection
plan_gaps_query = """
WITH company_name_mapping AS (
    SELECT * FROM (VALUES
        ('11-1111111', 'Hillwinds Health'),
        ('22-2222222', 'Summit Solutions'),
        ('33-3333333', 'Peak Performers')
    ) AS t (company_ein, company_name)
),

plans_with_names AS (
    SELECT
        cnm.company_name,
        p.plan_type,
        p.carrier_name AS carrier,
        CAST(p.start_date AS DATE) AS start_date,
        CAST(p.end_date AS DATE) AS end_date
    FROM plans_raw p
    JOIN company_name_mapping cnm ON p.company_ein = cnm.company_ein
),

coverage_periods AS (
    SELECT
        *,
        SUM(is_new_period) OVER (PARTITION BY company_name, plan_type ORDER BY start_date) AS period_group
    FROM (
        SELECT
            *,
            CASE 
                WHEN start_date > (LAG(end_date, 1, '1900-01-01') OVER (PARTITION BY company_name, plan_type ORDER BY start_date) + INTERVAL '1' DAY) THEN 1 
                ELSE 0 
            END AS is_new_period
        FROM plans_with_names
    )
),

stitched_periods AS (
    SELECT
        company_name,
        plan_type,
        MAX(end_date) AS period_end,
        FIRST(carrier) AS previous_carrier,
        LEAD(MIN(start_date), 1) OVER (PARTITION BY company_name, plan_type ORDER BY MIN(start_date)) AS next_period_start,
        LEAD(FIRST(carrier), 1) OVER (PARTITION BY company_name, plan_type ORDER BY MIN(start_date)) AS next_carrier
    FROM coverage_periods
    GROUP BY company_name, plan_type, period_group
)

SELECT
    company_name,
    period_end AS gap_start,
    next_period_start AS gap_end,
    (next_period_start - period_end) AS gap_length_days,
    previous_carrier,
    next_carrier
FROM stitched_periods
WHERE next_period_start IS NOT NULL AND (next_period_start - period_end) > 7
ORDER BY company_name, gap_start;
"""

print("=" * 80)
print("PLAN GAP DETECTION (Gaps > 7 days)")
print("=" * 80)
plan_gaps = con.execute(plan_gaps_query).fetchdf()
print(plan_gaps)
print(f"\nTotal gaps found: {len(plan_gaps)}")

PLAN GAP DETECTION (Gaps > 7 days)
      company_name  gap_start    gap_end  gap_length_days previous_carrier  \
0  Peak Performers 2022-03-24 2022-06-27               95            Aetna   

  next_carrier  
0        Cigna  

Total gaps found: 1


In [14]:
# Query 2: Claims Cost Spike (Rolling 90-day, >200% increase)
claims_spike_query = """
WITH company_name_mapping AS (
    SELECT * FROM (VALUES
        ('11-1111111', 'Hillwinds Health'),
        ('22-2222222', 'Summit Solutions'),
        ('33-3333333', 'Peak Performers')
    ) AS t (company_ein, company_name)
),

claims_with_company AS (
    SELECT
        cnm.company_name,
        c.service_date,
        c.amount AS claim_cost
    FROM claims_raw c
    JOIN company_name_mapping cnm ON c.company_ein = cnm.company_ein
),

daily_costs AS (
    SELECT
        company_name,
        CAST(service_date AS DATE) AS service_date,
        SUM(claim_cost) AS total_daily_cost
    FROM claims_with_company
    GROUP BY 1, 2
),

rolling_windows AS (
    SELECT
        company_name,
        service_date AS window_end,
        (service_date - INTERVAL '89' DAY) AS window_start,
        SUM(total_daily_cost) OVER (
            PARTITION BY company_name 
            ORDER BY service_date 
            ROWS BETWEEN 89 PRECEDING AND CURRENT ROW
        ) AS current_90d_cost,
        SUM(total_daily_cost) OVER (
            PARTITION BY company_name 
            ORDER BY service_date 
            ROWS BETWEEN 179 PRECEDING AND 90 PRECEDING
        ) AS prev_90d_cost
    FROM daily_costs
)

SELECT
    company_name,
    window_start,
    window_end,
    prev_90d_cost,
    current_90d_cost,
    ROUND(CASE 
        WHEN prev_90d_cost = 0 THEN NULL 
        ELSE (current_90d_cost - prev_90d_cost) / prev_90d_cost * 100 
    END, 2) AS pct_change
FROM rolling_windows
WHERE prev_90d_cost IS NOT NULL AND prev_90d_cost > 0 AND (current_90d_cost - prev_90d_cost) / prev_90d_cost > 2.0
ORDER BY company_name, window_end;
"""

print("\n" + "=" * 80)
print("CLAIMS COST SPIKES (>200% increase in rolling 90-day window)")
print("=" * 80)
cost_spikes = con.execute(claims_spike_query).fetchdf()
print(cost_spikes)
print(f"\nTotal spikes found: {len(cost_spikes)}")


CLAIMS COST SPIKES (>200% increase in rolling 90-day window)
Empty DataFrame
Columns: [company_name, window_start, window_end, prev_90d_cost, current_90d_cost, pct_change]
Index: []

Total spikes found: 0


In [15]:
# Query 3: Employee Roster Mismatch
roster_mismatch_query = """
WITH company_name_mapping AS (
    SELECT * FROM (VALUES
        ('11-1111111', 'Hillwinds Health'),
        ('22-2222222', 'Summit Solutions'),
        ('33-3333333', 'Peak Performers')
    ) AS t (company_ein, company_name)
),

company_expected_counts AS (
    SELECT company_name, expected_employees
    FROM (VALUES
        ('Hillwinds Health', 60),
        ('Summit Solutions', 45),
        ('Peak Performers', 40)
    ) AS t (company_name, expected_employees)
),

observed_employee_counts AS (
    SELECT
        cnm.company_name,
        COUNT(DISTINCT e.person_id) AS observed_employees
    FROM employees_raw e
    JOIN company_name_mapping cnm ON e.company_ein = cnm.company_ein
    GROUP BY cnm.company_name
),

comparison AS (
    SELECT
        cec.company_name,
        cec.expected_employees AS expected,
        oec.observed_employees AS observed,
        ROUND(ABS(oec.observed_employees - cec.expected_employees) * 100.0 / cec.expected_employees, 2) AS pct_diff
    FROM company_expected_counts cec
    JOIN observed_employee_counts oec ON cec.company_name = oec.company_name
)

SELECT
    company_name,
    expected,
    observed,
    pct_diff,
    CASE
        WHEN pct_diff > 100 THEN 'Critical'
        WHEN pct_diff > 50 THEN 'High'
        WHEN pct_diff > 20 THEN 'Medium'
        ELSE 'Low'
    END AS severity
FROM comparison
ORDER BY pct_diff DESC;
"""

print("\n" + "=" * 80)
print("EMPLOYEE ROSTER MISMATCH")
print("=" * 80)
roster_mismatch = con.execute(roster_mismatch_query).fetchdf()
print(roster_mismatch)
print(f"\nCompanies analyzed: {len(roster_mismatch)}")


EMPLOYEE ROSTER MISMATCH
       company_name  expected  observed  pct_diff severity
0  Summit Solutions        45        11     75.56     High
1   Peak Performers        40        12     70.00     High
2  Hillwinds Health        60        21     65.00     High

Companies analyzed: 3


In [17]:
# Load the cleaned parquet data from ETL
clean_data = pd.read_parquet('../outputs/clean_data.parquet')
print(f"Clean records: {len(clean_data)}")
clean_data.head()

Clean records: 50


Unnamed: 0,person_id,full_name,title,email,company_ein,start_date,notes,row_id,personal_email,phone_number
0,1,Name1,VP HR,user1@bluehorizon.io,33-3333333,2022-03-29,notes: messy row?,0,,
1,2,Name2,HR Manager,user2@pinecrestfoods.com,33-3333333,2022-04-18,notes: messy row?,1,,
2,3,Name3,Unknown,user3@acme.com,11-1111111,2022-04-15,notes: messy row?,2,,
3,4,Name4,Sr Engineer,user4@acme.com,22-2222222,2022-01-04,notes: messy row?,3,,
4,5,Name5,Director Finance,user5@pinecrestfoods.com,22-2222222,2022-09-04,notes: messy row?,4,,


In [19]:
# Load validation errors
validation_errors = pd.read_csv('../outputs/validation_errors.csv')
print(f"Validation errors: {len(validation_errors)}")
validation_errors

Validation errors: 8


Unnamed: 0,row_id,field,error_reason
0,2,title,Missing title - no value to forward/backward fill
1,16,title,Missing title - no value to forward/backward fill
2,21,title,Missing title - no value to forward/backward fill
3,22,title,Missing title - no value to forward/backward fill
4,27,title,Missing title - no value to forward/backward fill
5,31,title,Missing title - no value to forward/backward fill
6,32,title,Missing title - no value to forward/backward fill
7,45,title,Missing title - no value to forward/backward fill
