# Assignment 02: Polars on EHR Event Logs

Build a Polars pipeline that summarizes diagnosis prevalence from synthetic EHR events. Use lazy scans, filtering, joins, and group-bys to compute site-level diabetes prevalence.

## Setup


In [1]:
import polars as pl
import yaml
from pathlib import Path
from datetime import datetime
from generate_test_data import generate_test_data

print(f"Polars version: {pl.__version__}")
print("Environment ready!")

Polars version: 1.33.1
Environment ready!


## Configuration


In [2]:
with open("config.yaml") as f:
    config = yaml.safe_load(f)

print("Config loaded:")
print(f"  Patients: {config['data']['patients_path']}")
print(f"  Sites: {config['data']['sites_path']}")
print(f"  Events: {config['data']['events_path']}")
print(f"  ICD-10 lookup: {config['data']['icd10_path']}")

Config loaded:
  Patients: data/patients.parquet
  Sites: data/sites.parquet
  Events: data/events.parquet
  ICD-10 lookup: data/icd10_codes.parquet


## Generate data


In [3]:
SIZE = config["data"]["size"]
DATA_DIR = Path(config["data"]["dir"])

# Create output directory if it doesn't exist
DATA_DIR.mkdir(parents=True, exist_ok=True)


In [4]:

# Generate data - "medium" takes ~10 seconds on my laptop
generate_test_data(size=SIZE, output_dir=DATA_DIR)

INFO Loading codebooks
INFO Generating sites
INFO Generating patients
INFO Generating events


KeyboardInterrupt: 

## Hints (optional)

- Distinct patient counts: call `.unique()` before `group_by()`.
  - Example: `events.select(["site_id", "patient_id"]).unique()`
- Prefix filter for ICD-10: `pl.col("code").str.starts_with(prefix)`
- Optional polish: `.fill_null(0)` after a left join, and `.round(3)` on prevalence

## Part 1: Lazy Data Loading

Use `pl.scan_parquet()` to create LazyFrames without loading data into memory.


In [5]:
# TODO: Scan patients, sites, events, and ICD-10 lookup
patients = pl.scan_parquet("data/patients.parquet")
sites = pl.scan_parquet("data/sites.parquet")
events = pl.scan_parquet("data/events.parquet")
icd10 = pl.scan_parquet("data/icd10_codes.parquet")

# Check schemas (fast, still lazy)
if patients is not None:
    print("Patients schema:")
    print(patients.collect_schema())

if events is not None:
    print("Events schema:")
    print(events.collect_schema())

Patients schema:
Schema({'patient_id': String, 'dob': String, 'gender': String, 'zip_code': String, 'home_site_id': String})
Events schema:
Schema({'event_id': String, 'patient_id': String, 'site_id': String, 'event_ts': String, 'record_type': String, 'code': String})


## Part 2: Filter and Prep Events

Filter to the assignment date window and extract ICD-10 diagnosis events.


In [6]:
start_date = datetime.fromisoformat(config["data"]["start_date"])

# TODO: parse event_ts to Datetime
# TODO: filter events to event_ts >= start_date
# TODO: filter to record_type == "ICD-10-CM" for diagnosis events
events_datetime = events.with_columns(pl.col("event_ts").str.to_datetime())

events_filtered = events_datetime.filter(pl.col("event_ts") >= start_date)

dx_events = events_filtered.filter(pl.col("record_type") == "ICD-10-CM")

## Part 3: Diagnosis Prevalence by Site

Compute the percent of patients per site with a type 2 diabetes diagnosis.


In [29]:
prefix = config["data"]["diabetes_prefix"]

# TODO: Filter dx_events to ICD-10 codes starting with prefix
dx_filter = dx_events.filter(pl.col("code").str.starts_with(prefix))
# TODO: total patients per site (unique patient_id from events_filtered)
total_patients = events_filtered.group_by("site_id").agg([pl.col("patient_id").n_unique().alias("total_pateints")])
# TODO: diabetes patients per site (unique patient_id from filtered dx)
diabetes_total = dx_filter.group_by("site_id").agg([pl.col("patient_id").n_unique().alias("total_diabetes")])
# TODO: join counts + site names, calculate prevalence

dx_summary = diabetes_total.join(total_patients,on='site_id', how='full')
#dx_summary['prevalence'] = dx_summary['total_diabetes_pateints']/dx_summary["total_pateints"]
if dx_summary is not None:
    print(dx_summary.collect_schema())

Schema({'site_id': String, 'total_diabetes': UInt32, 'site_id_right': String, 'total_pateints': UInt32})


## Part 4: Collect and Export


In [None]:
# TODO: collect dx_summary using streaming engine
t0 = datetime.perf_counter()
streaming = dx_summary.collect(engine="streaming")
t_default = datetime.perf_counter() - t0
# TODO: create outputs directory
OUTPUT_DIR = Path()
# TODO: write Parquet + CSV outputs using config paths
dx_summary.sink_parquet("outputs/diabetes_by_site.parquet")

if dx_summary is not None:
    print("Outputs ready")

## Validation


In [None]:
outputs = [
    config["outputs"]["dx_summary_parquet"],
    config["outputs"]["dx_summary_csv"],
]

missing = [path for path in outputs if not Path(path).exists()]
if missing:
    print("Missing outputs:", missing)
else:
    print("All outputs created")

## Next Steps (Optional)

1. Run `python -m pytest .github/tests/test_assignment.py -v` in your terminal.
2. Use exploratory data analysis (EDA) or visualization techniques to get a feel for the dataset
