Skip to content

feat(core): add per-mapper max_fan_out override for partition fan-out cap#67184

Draft
Lee-W wants to merge 28 commits into
apache:mainfrom
astronomer:partition-fanout-max-keys
Draft

feat(core): add per-mapper max_fan_out override for partition fan-out cap#67184
Lee-W wants to merge 28 commits into
apache:mainfrom
astronomer:partition-fanout-max-keys

Conversation

@Lee-W
Copy link
Copy Markdown
Member

@Lee-W Lee-W commented May 19, 2026

Why

a per-mapper cap so each PartitionMapper instance can override the global [scheduler] partition_fanout_max_keys (implemented in #66030)
Lives on the mapper rather than the Dag because one Dag can bind several mappers
with very different fan-out profiles.

closes: #65760

What

  • Add max_fan_out: int | None = None to PartitionMapper base (SDK + core). Validator rejects 0, negatives, non-int, and bool. None → global; positive int → override.
  • Thread the kwarg through every subclass that overrides __init__, and through both serialization paths (per-class .serialize() and the encoders.py singledispatch overrides). Field emitted only when non-None — pre-change payloads round-trip byte-identical.
  • assets/manager.py: read the cap per target Dag, swap a cap_source fragment into the audit-log extra (max_fan_out=N vs. existing [scheduler] partition_fanout_max_keys=N).

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Lee-W added 28 commits May 19, 2026 19:10
this is still not ideal, but at least it's not super wrong now
…t ordering

StartOfWeekMapper and StartOfQuarterMapper now derive their decode_downstream
regex from output_format itself, so users can re-order strftime directives
and {name} placeholders (e.g. "Q{quarter}/%Y") without having to override
decode_downstream. Malformed output_format — empty {}, non-identifier
placeholder names, duplicate %X directives, duplicate {name} placeholders —
raises ValueError at mapper construction instead of an opaque re.error from
deep inside a scheduler tick or UI route.
…ag_runs list

Drop the SQL "count distinct assets with any log" subquery and always
compute total_received via the Python rollup-aware helper. The list
endpoint previously returned different numbers for the same APDR
depending on whether the caller filtered by dag_id (rollup-aware,
counts upstream window keys) or queried globally (SQL approximation,
counts assets with any log) — same field, different semantics, very
confusing for any UI consumer.

The N+1 cost of per-Dag timetable loads was already paid in the
global branch for total_required, so adding a single batched log
fetch keeps the existing query budget while making the contract
identical across both views. _compute_received_count now skips
asset_ids that are no longer required (active=False) so the relaxed
log query doesn't over-count.
StartOfWeekMapper now always uses ISO weeks (Monday) and
StartOfMonthMapper always emits the 1st of the month. Custom
fiscal boundaries can still be expressed by pairing a user-defined
source mapper with the existing windows.
The next_run_assets and partitioned_dag_runs endpoints used to load
and deserialize the full timetable on every request just to read
mapper attributes (is_rollup) and required-key counts. Cache mapper
metadata per asset on DagModel during Dag sync via a new
``partition_mapper_info`` JSON column, so the UI resolves mapper
attributes from the cache and only loads the timetable when
``to_upstream`` evaluation for rollup mappers is actually needed.
``partition_mapper_info`` now iterates every asset in ``asset_condition``
and uses ``get_partition_mapper``, so a Dag configured with
``default_partition_mapper=RollupMapper(...)`` (the primary documented
pattern) is correctly reported as rollup. Previously the list was built
from ``partition_mapper_config`` only, leaving ``has_rollup_mappers``
False and silently disabling rollup UI behaviour.

Also: extract the shared ``load_partitioned_timetable`` helper and log
on deserialization failure; coerce NULL ``source_partition_key`` to
``""`` in the scheduler to match the UI normalisation.
Old serialized rows or hand-crafted partial dicts caused a KeyError on
DagModel.is_rollup_asset and has_rollup_mappers. Switch to .get() with
a False default so the read side is resilient to schema evolution.
Add docstrings explaining accepted strftime directives, round-trip
requirements, and why regex compilation happens eagerly at construction
time rather than lazily inside the scheduler loop.
Covers the previously untested MonthWindow case in
test_window_serialize_round_trip. Uses input_format="%Y-%m-%d" instead
of "%Y-%m" to prevent 29 day keys from collapsing to the same value and
masking decode failures.
DayWindow always generates 24 naive hourly steps. When paired with a
local-timezone source mapper, spring-forward gaps make one expected
upstream key unattainable so the rollup can never complete; fall-back
causes the extra hour to be excluded from the expected set.

Add a warning block to DayWindow's docstring, two tests (one pinning
the naive-24 invariant, one xfail documenting the spring-forward
under-yield), and a Known Limitations section to the AIP-76 newsfragment.
Clarify that inactive assets are filtered from the UI progress query
but their PartitionedAssetKeyLog rows are preserved, so re-activating
an asset automatically resumes rollup accumulation without data loss.
… time

Add PartitionMapper.__init_subclass__ that raises TypeError when a
subclass overrides exactly one side of the decode/encode pair. An
unpaired override silently breaks RollupMapper.to_upstream by producing
non-str members, causing the scheduler's upstream-window check to never
satisfy and leaving the Dag run held forever with no diagnostic.

MRO-based comparison (cls.method is not PartitionMapper.method) is used
rather than __dict__ lookup so intermediate base classes such as
_BaseTemporalMapper are handled correctly.
A bad partition mapper previously wrote a new Log row on every scheduler
tick (once per second), flooding the audit log. Add a process-level
_partition_audit_seen set to SchedulerJobRunner that deduplicates by
(dag_id, asset_name, asset_uri): after the first entry the scheduler
still logs the exception at ERROR level each tick (useful for ops) but
stops inserting into the Log table. The set resets on restart, so one
fresh entry is written after a config fix and re-deploy.

Also add three scheduler-side evidence tests:
- audit log deduplication across two consecutive ticks
- rollup survives a simulated scheduler restart with partial key arrival
- duplicate PAKL rows do not prevent rollup completion (set semantics)
Replace the hardcoded MAX_PARTITION_DAG_RUNS_PER_TICK=500 with a new
[scheduler] max_partition_dag_runs_to_create_per_loop config option
(default 500). The value is read once in SchedulerJobRunner.__init__
alongside the other self._* conf reads, per the invariant that all
conf access stays out of the scheduler loop.
Composes upstream_mapper + window + (optional) downstream_mapper,
symmetric to RollupMapper. New [scheduler] partition_fanout_max_keys
caps the downstream keys per upstream event.
… cap

Add a `max_fan_out: int | None = None` parameter to both the SDK and core
`PartitionMapper` base classes, threaded through every subclass constructor
that defines its own `__init__`. The validator rejects 0, negatives, floats,
strings, and booleans (bool is an int subclass — excluded explicitly).

Serialization is updated in two parallel paths:
- Per-class `serialize()` / `deserialize()` methods (custom-mapper path).
- `encoders.py` singledispatch overrides (built-in-mapper path).

Both paths write `max_fan_out` only when non-None, preserving byte-identical
output for default-constructed mappers (zero-bloat contract).

In `manager.py`, the single-shot `max_downstream_keys` read is deleted;
the mapper object is retained after the `to_downstream` call so its
`max_fan_out` attribute is readable at the boundary check. The effective
cap is computed per target-Dag as either the mapper's own cap or the
global `[scheduler] partition_fanout_max_keys`. The `Log.extra` string
uses `max_fan_out=N` when the per-mapper cap trips, and keeps the existing
`[scheduler] partition_fanout_max_keys=N` wording when the global trips.

Co-Authored-By: wei.lee@astronomer.io
@boring-cyborg boring-cyborg Bot added area:API Airflow's REST/HTTP API area:ConfigTemplates labels May 19, 2026
@boring-cyborg boring-cyborg Bot added area:DAG-processing area:db-migrations PRs with DB migration area:dev-tools area:Scheduler including HA (high availability) scheduler area:task-sdk area:UI Related to UI/UX. For Frontend Developers. backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch kind:documentation labels May 19, 2026
@Lee-W Lee-W changed the title feat(AIP-76): window feat(core): add per-mapper max_fan_out override for partition fan-out cap May 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:API Airflow's REST/HTTP API area:ConfigTemplates area:DAG-processing area:db-migrations PRs with DB migration area:dev-tools area:Scheduler including HA (high availability) scheduler area:task-sdk area:UI Related to UI/UX. For Frontend Developers. backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch kind:documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Fan-out: explosion cap and error behavior

1 participant