# Assignment 02: Polars on EHR Event Logs

Build a Polars pipeline that summarizes diagnosis prevalence from synthetic EHR events. Use lazy scans, filtering, joins, and group-bys to compute site-level diabetes prevalence.

## Setup


In [None]:
import polars as pl
import yaml
from pathlib import Path
from datetime import datetime
from generate_test_data import generate_test_data

print(f"Polars version: {pl.__version__}")
print("Environment ready!")

## Configuration


In [None]:
with open("config.yaml") as f:
    config = yaml.safe_load(f)

print("Config loaded:")
print(f"  Patients: {config['data']['patients_path']}")
print(f"  Sites: {config['data']['sites_path']}")
print(f"  Events: {config['data']['events_path']}")
print(f"  ICD-10 lookup: {config['data']['icd10_path']}")

## Generate data


In [None]:
SIZE = config["data"]["size"]
DATA_DIR = Path(config["data"]["dir"])

# Create output directory if it doesn't exist
DATA_DIR.mkdir(parents=True, exist_ok=True)

# Generate data - "medium" takes ~10 seconds on my laptop
generate_test_data(size=SIZE, output_dir=DATA_DIR)

## Hints (optional)

- Distinct patient counts: call `.unique()` before `group_by()`.
  - Example: `events.select(["site_id", "patient_id"]).unique()`
- Prefix filter for ICD-10: `pl.col("code").str.starts_with(prefix)`
- Optional polish: `.fill_null(0)` after a left join, and `.round(3)` on prevalence

## Part 1: Lazy Data Loading

Use `pl.scan_parquet()` to create LazyFrames without loading data into memory.


In [None]:
# TODO: Scan patients, sites, events, and ICD-10 lookup
patients = pl.scan_parquet(config["data"]["patients_path"])
sites = pl.scan_parquet(config["data"]["sites_path"])
events = pl.scan_parquet(config["data"]["events_path"])
icd10 = pl.scan_parquet(config["data"]["icd10_path"])

# Check schemas (fast, still lazy)
if patients is not None:
    print("Patients schema:")
    print(patients.collect_schema())

if events is not None:
    print("Events schema:")
    print(events.collect_schema())

## Part 2: Filter and Prep Events

Filter to the assignment date window and extract ICD-10 diagnosis events.


In [None]:
start_date = datetime.fromisoformat(config["data"]["start_date"])
# TODO: parse event_ts to Datetime
# TODO: filter events to event_ts >= start_date
# TODO: filter to record_type == "ICD-10-CM" for diagnosis events
events_parsed = events.with_columns(pl.col("event_ts").str.strptime(pl.Datetime))
events_filtered = events_parsed.filter(pl.col("event_ts") >= start_date)
dx_events = events_filtered.filter(pl.col("record_type") == "ICD-10-CM")
dx_events.collect().head(5)

## Part 3: Diagnosis Prevalence by Site

Compute the percent of patients per site with a type 2 diabetes diagnosis.


In [30]:
prefix = config["data"]["diabetes_prefix"]

# TODO: Filter dx_events to ICD-10 codes starting with prefix
# TODO: total patients per site (unique patient_id from events_filtered)
# TODO: diabetes patients per site (unique patient_id from filtered dx)
# TODO: join counts + site names, calculate prevalence

dx_diabetes = dx_events.filter(pl.col("code").str.starts_with(prefix))
total_persite = events_filtered.select(["site_id", "patient_id"]).unique().group_by("site_id").agg(pl.count("patient_id").alias("patients_seen"))
diabetes_persite = dx_diabetes.select(["site_id", "patient_id"]).unique().group_by("site_id").agg(pl.count("patient_id").alias("diabetes_patients"))
dx_summary = total_persite.join(diabetes_persite, on="site_id", how="left")
dx_summary = dx_summary.join(sites.select(["site_id", "site_name","site_type"]), on="site_id", how="left")
dx_summary = dx_summary.with_columns((pl.col("diabetes_patients") / pl.col("patients_seen")).alias("diabetes_prevalence"))
if dx_summary is not None:
    print(dx_summary.collect_schema())

Schema({'site_id': String, 'patients_seen': UInt32, 'diabetes_patients': UInt32, 'site_name': String, 'site_type': String, 'diabetes_prevalence': Float64})


## Part 4: Collect and Export


In [31]:
# TODO: collect dx_summary using streaming engine
# TODO: create outputs directory
# TODO: write Parquet + CSV outputs using config paths
dx_summary = dx_summary.collect(engine="streaming")

outputs_dir = Path("outputs")
outputs_dir.mkdir(parents=True, exist_ok=True)

dx_summary.write_parquet(config["outputs"]["dx_summary_parquet"])
dx_summary.write_csv(config["outputs"]["dx_summary_csv"])

if dx_summary is not None:
    print("Outputs ready")

Outputs ready


## Validation


In [29]:
outputs = [
    config["outputs"]["dx_summary_parquet"],
    config["outputs"]["dx_summary_csv"],
]

missing = [path for path in outputs if not Path(path).exists()]
if missing:
    print("Missing outputs:", missing)
else:
    print("All outputs created")

All outputs created


## Next Steps (Optional)

1. Run `python -m pytest .github/tests/test_assignment.py -v` in your terminal.
2. Use exploratory data analysis (EDA) or visualization techniques to get a feel for the dataset
