Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1036.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added canonical Stage 1 dataset-build substep and artifact specifications.
16 changes: 16 additions & 0 deletions docs/engineering/pipeline-map.md
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,22 @@ def impute_source_variables(data: Dict[str, Dict[int, np.ndarray]], state_fips:

Re-impute ACS/SIPP/ORG/SCF variables from donor surveys.

### `policyengine_us_data.build_datasets.artifacts.stage_1_artifact_specs`

```python
def stage_1_artifact_specs() -> tuple[DatasetArtifactSpec, ...]
```

Return all artifact specs known to the Stage 1 dataset build.

### `policyengine_us_data.build_datasets.specs.stage_1_step_specs`

```python
def stage_1_step_specs() -> tuple[DatasetBuildStepSpec, ...]
```

Return the canonical Stage 1 dataset-build substage specs.

### `policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder`

```python
Expand Down
6 changes: 3 additions & 3 deletions docs/engineering/skills/documentation_review.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ Check that changed pipeline behavior has a durable documentation surface:
- Edges describe real data, artifact, validation, or orchestration relationships.
- `status` and `stability` values are honest for transitional code.
- `validation_commands` are focused and point to existing tests or scripts.
- Generated docs build when decorator, Pydoc, or map source changes. PRs do not
need to refresh checked-in generated artifacts manually; the push workflow
publishes those artifacts from automation.
- Generated docs build when decorator, Pydoc, or map source changes. PRs that
change decorator metadata, Pydoc-facing source, or `docs/pipeline_map.yaml`
should refresh the checked-in generated artifacts in the same change.
- Stale architecture names, folder names, and artifact names are not preserved in
durable documentation sources or generated output.

Expand Down
21 changes: 13 additions & 8 deletions docs/engineering/skills/pipeline_docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ those flows.
- `docs/engineering/pipeline-map.md`

The generated JSON and Markdown files are published artifacts, not hand-authored
source. PRs should update decorators, docstrings, and `docs/pipeline_map.yaml`;
CI checks that the generated artifacts build. On pushes to `main`, automation
regenerates and commits the published artifacts with the version/changelog
commit.
source. PRs should update decorators, docstrings, and `docs/pipeline_map.yaml`,
then regenerate the checked-in artifacts in the same change so reviewers see the
pipeline docs that will ship. On pushes to `main`, automation may refresh those
artifacts again with the version/changelog commit.

## Annotation Rules

Expand Down Expand Up @@ -50,10 +50,15 @@ waypoint is being migrated, set `status="transitional"` and use

## Update Workflow

After adding or changing annotations or `docs/pipeline_map.yaml`, rely on the PR
`Pipeline docs build` check to prove the generated artifacts can be produced. To
inspect the generated outputs locally without touching tracked files, write them
to a temporary directory:
After adding or changing annotations or `docs/pipeline_map.yaml`, regenerate the
tracked pipeline docs:

```bash
uv run --no-sync --with pyyaml python scripts/extract_pipeline_docs.py
```

If you only need to inspect the generated outputs locally without touching
tracked files, write them to a temporary directory:

```bash
out_dir="$(mktemp -d)"
Expand Down
80 changes: 78 additions & 2 deletions docs/generated/pipeline_api.json
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@
"docstring": "Build all datasets with preemption-resilient checkpointing.\n\nArgs:\n upload: Whether to upload completed datasets.\n branch: Git branch to build from.\n sequential: Use sequential (non-parallel) execution.\n clear_checkpoints: Clear existing checkpoints before starting.\n skip_tests: Skip running the test suite (useful for calibration runs).\n skip_enhanced_cps: Skip enhanced_cps.py and small_enhanced_cps.py\n (useful for calibration runs that only need source_imputed H5).\n skip_stage_5: Skip source-imputed CPS and small enhanced CPS after\n enhanced_cps_2024.h5 is built.\n stage_only: Upload to HF staging only, without promoting a release.\n version: policyengine-us-data package version used for staging and\n dataset-build contracts.",
"id": "build_datasets",
"kind": "function",
"line": 569,
"line": 536,
"metadata": {
"api_refs": [
"modal_app.data_build.build_datasets"
Expand Down Expand Up @@ -999,7 +999,7 @@
"docstring": "Build CPS before PUF because PUF pension imputation loads CPS_2024.",
"id": "cps_puf_build_phase",
"kind": "function",
"line": 437,
"line": 404,
"metadata": {
"api_refs": [
"modal_app.data_build.run_cps_then_puf_phase"
Expand Down Expand Up @@ -3463,6 +3463,82 @@
"signature": "def reconcile_ss_subcomponents(data: Dict[str, Dict[int, np.ndarray]], n_cps: int, time_period: int) -> None",
"source_file": "policyengine_us_data/calibration/puf_impute.py"
},
"stage_1_dataset_artifact_specs": {
"docstring": "Return all artifact specs known to the Stage 1 dataset build.",
"id": "stage_1_dataset_artifact_specs",
"kind": "function",
"line": 230,
"metadata": {
"api_refs": [
"policyengine_us_data.build_datasets.artifacts.stage_1_artifact_specs"
],
"artifacts_out": [
"uprating_factors.csv",
"acs_2022.h5",
"irs_puf_2015.h5",
"cps_2024.h5",
"puf_2024.h5",
"extended_cps_2024.h5",
"enhanced_cps_2024.h5",
"enhanced_cps_2024.clone_diagnostics.json",
"calibration_log.csv",
"stratified_extended_cps_2024.h5",
"source_imputed_stratified_extended_cps_2024.h5",
"small_enhanced_cps_2024.h5",
"source_imputed_stratified_extended_cps.h5",
"policy_data.db",
"build_log.txt",
"data_build_checkpoint_stats.json"
],
"description": "Canonical artifact inventory for Stage 1 dataset-build outputs.",
"id": "stage_1_dataset_artifact_specs",
"label": "Stage 1 Dataset Artifact Specs",
"node_type": "library",
"pathways": [
"data_build",
"stage_contracts",
"pipeline_docs"
],
"source_file": "policyengine_us_data/build_datasets/artifacts.py",
"stability": "stable",
"status": "current",
"validation_commands": [
"uv run pytest tests/unit/test_build_dataset_specs.py"
]
},
"object_path": "policyengine_us_data.build_datasets.artifacts.stage_1_artifact_specs",
"signature": "def stage_1_artifact_specs() -> tuple[DatasetArtifactSpec, ...]",
"source_file": "policyengine_us_data/build_datasets/artifacts.py"
},
"stage_1_dataset_build_specs": {
"docstring": "Return the canonical Stage 1 dataset-build substage specs.",
"id": "stage_1_dataset_build_specs",
"kind": "function",
"line": 87,
"metadata": {
"api_refs": [
"policyengine_us_data.build_datasets.specs.stage_1_step_specs"
],
"description": "Canonical substage taxonomy for Stage 1 dataset-build contracts, step manifests, and pipeline documentation.",
"id": "stage_1_dataset_build_specs",
"label": "Stage 1 Dataset Build Specs",
"node_type": "library",
"pathways": [
"data_build",
"stage_contracts",
"pipeline_docs"
],
"source_file": "policyengine_us_data/build_datasets/specs.py",
"stability": "stable",
"status": "current",
"validation_commands": [
"uv run pytest tests/unit/test_build_dataset_specs.py"
]
},
"object_path": "policyengine_us_data.build_datasets.specs.stage_1_step_specs",
"signature": "def stage_1_step_specs() -> tuple[DatasetBuildStepSpec, ...]",
"source_file": "policyengine_us_data/build_datasets/specs.py"
},
"staging_upload": {
"docstring": "Upload files to HuggingFace staging only.\n\nGCS is updated during promote_publish, not here.\nPromote must be run separately via promote_publish.",
"id": "staging_upload",
Expand Down
58 changes: 58 additions & 0 deletions docs/generated/pipeline_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -1498,6 +1498,64 @@
"uv run pytest tests/unit/calibration/test_source_impute.py"
]
},
{
"api_refs": [
"policyengine_us_data.build_datasets.artifacts.stage_1_artifact_specs"
],
"artifacts_out": [
"uprating_factors.csv",
"acs_2022.h5",
"irs_puf_2015.h5",
"cps_2024.h5",
"puf_2024.h5",
"extended_cps_2024.h5",
"enhanced_cps_2024.h5",
"enhanced_cps_2024.clone_diagnostics.json",
"calibration_log.csv",
"stratified_extended_cps_2024.h5",
"source_imputed_stratified_extended_cps_2024.h5",
"small_enhanced_cps_2024.h5",
"source_imputed_stratified_extended_cps.h5",
"policy_data.db",
"build_log.txt",
"data_build_checkpoint_stats.json"
],
"description": "Canonical artifact inventory for Stage 1 dataset-build outputs.",
"id": "stage_1_dataset_artifact_specs",
"label": "Stage 1 Dataset Artifact Specs",
"node_type": "library",
"pathways": [
"data_build",
"stage_contracts",
"pipeline_docs"
],
"source_file": "policyengine_us_data/build_datasets/artifacts.py",
"stability": "stable",
"status": "current",
"validation_commands": [
"uv run pytest tests/unit/test_build_dataset_specs.py"
]
},
{
"api_refs": [
"policyengine_us_data.build_datasets.specs.stage_1_step_specs"
],
"description": "Canonical substage taxonomy for Stage 1 dataset-build contracts, step manifests, and pipeline documentation.",
"id": "stage_1_dataset_build_specs",
"label": "Stage 1 Dataset Build Specs",
"node_type": "library",
"pathways": [
"data_build",
"stage_contracts",
"pipeline_docs"
],
"source_file": "policyengine_us_data/build_datasets/specs.py",
"stability": "stable",
"status": "current",
"validation_commands": [
"uv run pytest tests/unit/test_build_dataset_specs.py"
]
},
{
"api_refs": [
"policyengine_us_data.calibration.unified_matrix_builder.UnifiedMatrixBuilder"
Expand Down
41 changes: 4 additions & 37 deletions modal_app/data_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from modal_app.images import cpu_image as image # noqa: E402
from policyengine_us_data.__version__ import __version__ as DATA_PACKAGE_VERSION # noqa: E402
from policyengine_us_data.build_datasets import stage_1_script_outputs # noqa: E402
from policyengine_us_data.pipeline_metadata import pipeline_node # noqa: E402
from policyengine_us_data.pipeline_schema import PipelineNode # noqa: E402
from policyengine_us_data.stage_contracts import ( # noqa: E402
Expand Down Expand Up @@ -95,43 +96,9 @@ def snapshot(self) -> dict[str, int]:
}


# Script to output file mapping for checkpointing
# Values can be a single file path (str) or a list of file paths
SCRIPT_OUTPUTS = {
"policyengine_us_data/utils/uprating.py": (
"policyengine_us_data/storage/uprating_factors.csv"
),
"policyengine_us_data/datasets/acs/acs.py": (
"policyengine_us_data/storage/acs_2022.h5"
),
"policyengine_us_data/datasets/puf/irs_puf.py": (
"policyengine_us_data/storage/irs_puf_2015.h5"
),
"policyengine_us_data/datasets/cps/cps.py": (
"policyengine_us_data/storage/cps_2024.h5"
),
"policyengine_us_data/datasets/puf/puf.py": (
"policyengine_us_data/storage/puf_2024.h5"
),
"policyengine_us_data/datasets/cps/extended_cps.py": (
"policyengine_us_data/storage/extended_cps_2024.h5"
),
# enhanced_cps.py produces both the dataset and calibration log
"policyengine_us_data/datasets/cps/enhanced_cps.py": [
"policyengine_us_data/storage/enhanced_cps_2024.h5",
"policyengine_us_data/storage/enhanced_cps_2024.clone_diagnostics.json",
"calibration_log.csv",
],
"policyengine_us_data/calibration/create_stratified_cps.py": (
"policyengine_us_data/storage/stratified_extended_cps_2024.h5"
),
"policyengine_us_data/calibration/create_source_imputed_cps.py": (
"policyengine_us_data/storage/source_imputed_stratified_extended_cps_2024.h5"
),
"policyengine_us_data/datasets/cps/small_enhanced_cps.py": (
"policyengine_us_data/storage/small_enhanced_cps_2024.h5"
),
}
# Script to output file mapping for checkpointing.
# Values can be a single file path (str) or a list of file paths.
SCRIPT_OUTPUTS = stage_1_script_outputs()

CPS_BUILD_SCRIPT = "policyengine_us_data/datasets/cps/cps.py"
PUF_BUILD_SCRIPT = "policyengine_us_data/datasets/puf/puf.py"
Expand Down
46 changes: 9 additions & 37 deletions modal_app/step_manifests/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
from dataclasses import dataclass
from typing import TypeAlias

from policyengine_us_data.build_datasets import (
STAGE_1_BUILD_DATASETS,
STAGE_1_BUILD_STEP_SPECS,
)


@dataclass(frozen=True)
class PipelineSubstepSpec:
Expand Down Expand Up @@ -32,44 +37,11 @@ def _substep(id: str, title: str, parent_id: str) -> PipelineSubstepSpec:


BUILD_DATASETS = PipelineStepSpec(
id="1_build_datasets",
id=STAGE_1_BUILD_DATASETS,
title="Build datasets",
substeps=(
_substep(
"1a_raw_data_download",
"Raw data download",
"1_build_datasets",
),
_substep(
"1b_base_dataset_construction",
"Base dataset construction",
"1_build_datasets",
),
_substep(
"1c_extended_cps_puf_clone",
"Extended CPS PUF clone",
"1_build_datasets",
),
_substep(
"1d_enhanced_cps_reweighting",
"Enhanced CPS reweighting",
"1_build_datasets",
),
_substep(
"1e_stratified_cps",
"Stratified CPS",
"1_build_datasets",
),
_substep(
"1f_source_imputation",
"Source imputation",
"1_build_datasets",
),
_substep(
"1g_stage_base_datasets",
"Stage base datasets",
"1_build_datasets",
),
substeps=tuple(
_substep(spec.id, spec.title, spec.parent_id)
for spec in STAGE_1_BUILD_STEP_SPECS
),
)
RAW_DATA_DOWNLOAD = BUILD_DATASETS.substeps[0]
Expand Down
27 changes: 27 additions & 0 deletions policyengine_us_data/build_datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Canonical Stage 1 dataset-build specifications."""

from .artifacts import (
DatasetArtifactSpec,
STAGE_1_ARTIFACT_SPECS,
stage_1_artifact_specs,
stage_1_contract_artifact_specs,
stage_1_script_outputs,
)
from .specs import (
DatasetBuildStepSpec,
STAGE_1_BUILD_DATASETS,
STAGE_1_BUILD_STEP_SPECS,
stage_1_step_specs,
)

__all__ = [
"DatasetArtifactSpec",
"DatasetBuildStepSpec",
"STAGE_1_ARTIFACT_SPECS",
"STAGE_1_BUILD_DATASETS",
"STAGE_1_BUILD_STEP_SPECS",
"stage_1_artifact_specs",
"stage_1_contract_artifact_specs",
"stage_1_script_outputs",
"stage_1_step_specs",
]
Loading