Speed up TaskInstance bulk insert on PostgreSQL with unnest#66868
Draft
safaehar wants to merge 2 commits into
Draft
Speed up TaskInstance bulk insert on PostgreSQL with unnest#66868safaehar wants to merge 2 commits into
safaehar wants to merge 2 commits into
Conversation
DagRun._create_task_instances bulk-inserts every TaskInstance for a
DagRun in a single call. On PostgreSQL the ORM bulk_insert_mappings
path emits a multi-row INSERT ... VALUES (...), (...) with one bind
tuple per row, which scales poorly for large DagRuns (mapped task
expansion, wide DAGs).
This change branches on dialect and, on PostgreSQL, emits
INSERT INTO task_instance (<cols>)
SELECT * FROM unnest(:c1::t1[], :c2::t2[], ...)
so the driver sends one typed array per column instead of one bind
tuple per row. The statement is built once per dict-shape from
TaskInstance.__mapper__ (so new columns flow through automatically)
and cached at module scope. Other backends and the mutation-hook
path (hook_is_noop=False) are unchanged.
TaskInstance.insert_mapping now fills ``id`` and ``updated_at``
explicitly so the unnest path does not have to replicate
SQLAlchemy's column-default application; the values match the
existing column ``default=`` callables, so behavior is preserved
across all backends.
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
When the scheduler creates a
DagRun,DagRun._create_task_instancesbulk-inserts everyTaskInstancefor that run in a single call. On PostgreSQL this currently goes through SQLAlchemy ORM'sbulk_insert_mappings, which emits— one bind tuple per row, ~35 columns each. The wire payload scales with
rows × columns, which is costly for DagRuns with mapped-task expansion or wide DAGs.This PR adds a PostgreSQL-only fast path that emits instead
— one typed array per column, so the payload scales with
columns + rowsand the planner sees a single static statement regardless of batch size.The dispatch follows the existing dialect-branch precedent in
airflow/dag_processing/collection.py::activate_assets_if_possible. Other backends (MySQL, SQLite) and thetask_instance_mutation_hookpath (which needs per-object ORM access) are unchanged.Details
airflow/models/dagrun.py:_build_postgres_unnest_insert(keys)— builds theINSERT … SELECT * FROM unnest(…)statement fromTaskInstance.__mapper__. The SQL column list, ordering, and PG element types are all derived from the mapper (UtcDateTime → TIMESTAMP WITH TIME ZONE[],ExtendedJSON → JSONB[],ExecutorConfigType → BYTEA[], etc.), so new columns flow through without code changes here. The cast is injected by SQLAlchemy viabindparam(type_=postgresql.ARRAY(col.type))rather than hand-rolled — avoiding a real footgun (text()placeholder parsing breaks on:id::UUID[]without an intervening space)._bulk_insert_task_instance_dicts_postgres(task_dicts, session)— materializes the dict iterator, looks up the cached statement byfrozenset(keys), and executes with column-major arrays._create_task_instancesnow branches onget_dialect_name(session)inside thehook_is_nooparm.TaskInstance.insert_mappingnow fillsid(via the sameuuid7default as the column) andupdated_at(viatimezone.utcnow()) explicitly, so the unnest path does not need to replicate SQLAlchemy's column-default application. The pre-fill is behaviour-equivalent for non-Postgres backends becausebulk_insert_mappingswould have applied the same defaults.Tests
Added in
airflow-core/tests/unit/models/test_dagrun.py:TestPostgresUnnestBulkInsertbulk_insert_mappings)BYTEA[] / JSONB[] / TIMESTAMP WITH TIME ZONE[] / VARCHAR(1000)[]casts when compiled for postgrestask_display_name) even when the Python attr is_task_display_property_valuetest_create_task_instances_uses_unnest_path_on_postgres— dispatch goes through the helper, notbulk_insert_mappings.test_create_task_instances_uses_bulk_insert_mappings_on_non_postgres— sqlite/mysql keep the existing path.test_create_task_instances_mutation_hook_still_uses_bulk_save_objects— non-noop hook stays on the ORM path even on PG.All 8 new tests + the 167 other tests in
test_dagrun.pypass locally on SQLite.mypy-airflow-coreandruffare clean.Benchmarks
Driving motivation is a measured speedup on a downstream fork at Datadog. Numbers from infra-staging to follow before flipping out of draft.
Test plan
breeze testing core-tests --backend postgres --test-type Core -k "TestPostgresUnnest or test_create_task_instances"against postgres.airflow-core/newsfragments/pr_number.improvement.rst→<this-PR-number>.improvement.rstonce the PR number is assigned.Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines