Skip to content

Refactor FHIR ingest to flattened YAML-driven tables#2

Merged
evanfebrianto merged 9 commits intofeat/ehrmambafrom
feat/fhir-flattened-pipeline-876d
Apr 12, 2026
Merged

Refactor FHIR ingest to flattened YAML-driven tables#2
evanfebrianto merged 9 commits intofeat/ehrmambafrom
feat/fhir-flattened-pipeline-876d

Conversation

@evanfebrianto
Copy link
Copy Markdown
Owner

@evanfebrianto evanfebrianto commented Apr 11, 2026

Summary

Adds MIMIC-IV FHIR (NDJSON) dataset support to PyHealth with a flattened, YAML-driven ingestion pipeline.

Architecture

The pipeline follows PyHealth's standard 3-step pattern:

  1. Phase 1 — NDJSON → Flattened Parquet: Stream FHIR resources from *.ndjson.gz files, normalize each resource type (Patient, Encounter, Condition, Observation, MedicationRequest, Procedure) into a flat 2D Parquet table using buffered writes (50K rows/batch). Zero full-dataset-in-memory.
  2. Phase 2 — Standard PyHealth pipeline: Feed the prepared Parquet tables through BaseDataset via a custom load_table override, producing global_event_df.parquet. Downstream task processing operates on standard Patient objects.

What's included

  • Dataset: MIMIC4FHIRDataset — FHIR NDJSON ingest with YAML config (mimic4_fhir.yaml), streaming Parquet flattening, concept vocabulary (ConceptVocab), and CEHR timeline construction.
  • Task: MPFClinicalPredictionTask — patient-level mortality prediction with MPF boundary tokens and CEHR-aligned sequences.
  • Model: EHRMambaCEHR — single-stream Mamba classifier with CEHR embeddings (concept, time-delta, age, visit segment).
  • Tests: Full synthetic coverage — no real MIMIC-IV data required.
  • Example: End-to-end training script with quick-test mode, ablation grid, and real-data support.

Key design decisions

  • base_dataset.py is unmodified — all FHIR-specific logic (UTC tz-stripping, null patient_id handling) is self-contained in MIMIC4FHIRDataset.load_table.
  • Uses the existing Patient class — no custom FHIRPatient object.
  • Cache identity includes schema version and YAML digest for proper invalidation.

Tested

All tests pass with synthetic data: pixi run -e test test → 905 tests OK, 67 skipped.

Contributor

Name: Evan Febrianto
NetID / email: evanf3 / evanf3@illinois.edu

Contribution type

Full pipeline (dataset + task + model) for reproducing EHRMamba-style training on tokenized FHIR timelines from MIMIC-IV FHIR NDJSON exports.


Note

Medium Risk
Medium risk because it replaces the MIMIC-IV FHIR ingest/storage format and task tokenization inputs, which could change cohort selection, timestamps, and downstream features despite added tests.

Overview
Refactors MIMIC4FHIRDataset ingestion from writing a single sharded global_event_df of raw FHIR JSON to streaming NDJSON normalization into flattened per-resource Parquet tables (patient, encounter, condition, observation, medication_request, procedure) under flattened_tables/, then uses a new tables: schema in mimic4_fhir.yaml plus a dataset load_table() override to rebuild global_event_df via the standard BaseDataset pipeline.

Updates cohort limiting (max_patients) to filter the normalized tables, adds glob_patterns support (and targeted PhysioNet defaults), removes the legacy FHIRPatient object path so MPF sequence building/labeling operates directly on tabular Patient rows, and refreshes the example script/docs/tests accordingly (including deceased-boolean parsing safeguards and new flattened-column assertions).

Reviewed by Cursor Bugbot for commit a5b6aac. Bugbot is set up for automated code reviews on this repo. Configure here.

…nd improve documentation

Updated the MIMIC4FHIRDataset to accept multiple glob patterns for NDJSON file ingestion, allowing for more flexible file matching. Introduced a new utility function, sorted_ndjson_files, to handle file sorting and deduplication. Enhanced the YAML configuration for MIMIC-IV FHIR exports and improved related documentation to clarify usage and ingestion flow. Added tests to validate glob pattern functionality and ensure robust error handling.
…oad_table maintenance note

- Add functools and operator imports (latent NameError with multi-col timestamps)
- Rename encounter_id to resource_id in _flatten_resource_to_table_row (was misleading)
- Add maintenance comment mapping load_table deviations to BaseDataset.load_table
@evanfebrianto evanfebrianto force-pushed the feat/fhir-flattened-pipeline-876d branch from 69bd295 to 6eee918 Compare April 12, 2026 17:03
@evanfebrianto evanfebrianto marked this pull request as ready for review April 12, 2026 17:04
- Introduced _normalize_deceased_boolean_for_storage function to accurately map various representations of deceasedBoolean to stored values.
- Updated _flatten_resource_to_table_row to utilize the new normalization function, ensuring consistent handling of deceasedBoolean values.
- Added unit tests to validate the correct parsing and storage of deceasedBoolean, including edge cases for string and JSON boolean values.
- Added a timestamp field to the test case for deceasedBoolean in the MIMIC4FHIRDataset to ensure accurate representation of patient events. This change enhances the clarity and completeness of the test data used in unit tests.
…on_to_flat_tables

- Updated the stream_fhir_ndjson_to_flat_tables function to guarantee that writers are always closed in a finally block, even in the event of errors. This change enhances resource management and prevents potential file handling issues during NDJSON processing.
@evanfebrianto evanfebrianto merged commit 49a2363 into feat/ehrmamba Apr 12, 2026
1 check passed
@evanfebrianto evanfebrianto deleted the feat/fhir-flattened-pipeline-876d branch April 12, 2026 17:56
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit a5b6aac. Configure here.

encounter_visit_idx = {
encounter_id: visit_idx
for visit_idx, (_, encounter_id) in enumerate(encounter_rows)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encounter duplicate IDs silently drop earlier visit index

Low Severity

encounter_visit_idx and encounter_start_by_id are built from encounter_rows via dict comprehensions that iterate in sorted-time order. If two encounter rows share the same encounter_id (possible in messy FHIR exports or after deduplication failures), the later entry silently overwrites the earlier one. Events linked to that ID would receive the wrong visit_idx and start time, corrupting CEHR sequence visit segmentation without any warning.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a5b6aac. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant