Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Jan 22, 2026

Optimizes RenderedTaskInstanceFields.delete_old_records() to avoid scanning the RTIF table when determining which records to keep. This fixes scheduler crashes caused by slow RTIF cleanup queries on DAGs with many dynamically mapped tasks.

Problem: For DAGs with 3k+ mapped task instances, the delete query was scanning ~100k RTIF records with a complex NOT EXISTS subquery containing a join, causing statement timeouts and high CPU utilization that prevented the scheduler from heartbeating.

This PR simplifies the query to only use the dag_run table for finding recent run_ids, avoiding the expensive RTIF table scan entirely.


Before (slow - scans RTIF table with join)

DELETE FROM rendered_task_instance_fields 
WHERE dag_id = :dag_id AND task_id = :task_id 
AND NOT EXISTS (
  SELECT 1 FROM (
    SELECT DISTINCT rtif.dag_id, rtif.task_id, rtif.run_id, dr.logical_date 
    FROM rendered_task_instance_fields rtif
    JOIN dag_run dr ON rtif.dag_id = dr.dag_id AND rtif.run_id = dr.run_id 
    WHERE rtif.dag_id = :dag_id AND rtif.task_id = :task_id 
    ORDER BY dr.logical_date DESC LIMIT 30
  ) AS keep
  WHERE keep.dag_id = rtif.dag_id AND keep.task_id = rtif.task_id AND keep.run_id = rtif.run_id
)

After (fast - only queries dag_run table)

DELETE FROM rendered_task_instance_fields 
WHERE dag_id = :dag_id AND task_id = :task_id 
AND run_id NOT IN (
  SELECT run_id FROM dag_run 
  WHERE dag_id = :dag_id 
  ORDER BY run_after DESC LIMIT 30
)

Benchmark Results (PostgreSQL)

Test Setup: 100 DAG runs x 50 mapped TIs = 5,000 RTIF records, keeping 30 most recent runs

Metric Old Query New Query Improvement
Execution Time 233 ms 5.5 ms 42x faster
Buffer Hits 14,618 113 129x fewer

Query Plan Comparison

Old Query - Nested loop with expensive heap fetches:

Delete on rendered_task_instance_fields (actual time=233.12..233.12 rows=0 loops=1)
  Buffers: shared hit=14618
  ->  Nested Loop Anti Join (actual time=51.43..232.91 rows=3500 loops=1)
        Rows Removed by Join Filter: 126750
        ->  Index Scan using rendered_task_instance_fields_pkey ...
        ->  Subquery Scan on anon_1 (actual time=0.010..0.065 rows=26 loops=5000)
              ->  Nested Loop (joins RTIF to dag_run)
                    Rows Removed by Join Filter: 495000
Execution Time: 233.44 ms

New Query - Simple index scan with hashed subplan:

Delete on rendered_task_instance_fields (actual time=5.12..5.12 rows=0 loops=1)
  Buffers: shared hit=113
  ->  Index Scan using rendered_task_instance_fields_pkey ...
        Filter: (NOT (hashed SubPlan 1))
        SubPlan 1
          ->  Limit (cost=8.16..8.17 rows=1 width=524)
                ->  Sort (Sort Key: run_after DESC)
                      ->  Index Scan using idx_dag_run_dag_id on dag_run
                            Index Cond: (dag_id = :dag_id)
Execution Time: 5.55 ms

Behavioral Change

The semantics change slightly:

Scenario Old Behavior New Behavior
Normal task (runs every dag run) Keep last 30 runs Keep last 30 runs
Sparse task (runs occasionally) Keep last 30 executions Keep records from last 30 dag runs

This imo is acceptable because of the following, but looking forward for what others think too:

  1. Most tasks run every DAG run (not sparse)
  2. The purpose is to limit storage, not preserve specific historical records
  3. The performance improvement (42x) far outweighs this edge case

Additional Changes

  • Uses run_after instead of logical_date for ordering (since logical_date can be NULL for manual runs)

How to Reproduce

To reproduce the benchmark, save this script and run inside breeze with PostgreSQL:

breeze run --backend postgres python dev/rtif_benchmark.py
Benchmark Script
"""
Benchmark: RTIF delete_old_records query performance.
Run: breeze run --backend postgres python dev/rtif_benchmark.py
"""
from __future__ import annotations

import time
from datetime import datetime, timedelta, timezone

from sqlalchemy import delete, exists, select, text

from airflow.models.dagrun import DagRun
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState

DAG_ID = "benchmark_dag"
TASK_ID = "benchmark_task"
NUM_RUNS = 100
MAP_INDEX_COUNT = 50
NUM_TO_KEEP = 30


def setup_test_data():
    from airflow.utils.db import initdb
    initdb()

    with create_session() as session:
        session.execute(text(f"DELETE FROM rendered_task_instance_fields WHERE dag_id = '{DAG_ID}'"))
        session.execute(text(f"DELETE FROM dag_run WHERE dag_id = '{DAG_ID}'"))
        session.execute(text("ALTER TABLE rendered_task_instance_fields DROP CONSTRAINT IF EXISTS rtif_ti_fkey"))
        session.commit()

        base_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
        for run_num in range(NUM_RUNS):
            run_id = f"run_{run_num:04d}"
            logical_date = base_date + timedelta(days=run_num)
            dr = DagRun(dag_id=DAG_ID, run_id=run_id, logical_date=logical_date,
                        data_interval=(logical_date, logical_date + timedelta(days=1)),
                        run_type="scheduled", state=DagRunState.SUCCESS)
            session.add(dr)
            session.flush()

            for map_idx in range(MAP_INDEX_COUNT):
                session.execute(text("""
                    INSERT INTO rendered_task_instance_fields 
                    (dag_id, task_id, run_id, map_index, rendered_fields)
                    VALUES (:dag_id, :task_id, :run_id, :map_index, :fields)
                """), {"dag_id": DAG_ID, "task_id": TASK_ID, "run_id": run_id, 
                       "map_index": map_idx, "fields": '{"cmd": "echo test"}'})
        session.commit()
    print(f"Created {NUM_RUNS} runs x {MAP_INDEX_COUNT} TIs = {NUM_RUNS * MAP_INDEX_COUNT} RTIF records")


def old_query():
    """Old query with NOT EXISTS and JOIN."""
    tis_to_keep = (
        select(RTIF.dag_id, RTIF.task_id, RTIF.run_id, DagRun.logical_date)
        .where(RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID)
        .join(RTIF.dag_run).distinct()
        .order_by(DagRun.logical_date.desc()).limit(NUM_TO_KEEP)
    ).subquery()
    return delete(RTIF).where(
        RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID,
        ~exists(1).where(tis_to_keep.c.dag_id == RTIF.dag_id, 
                         tis_to_keep.c.task_id == RTIF.task_id, 
                         tis_to_keep.c.run_id == RTIF.run_id))


def new_query():
    """New query with NOT IN on dag_run only."""
    run_ids = select(DagRun.run_id).where(DagRun.dag_id == DAG_ID).order_by(
        DagRun.run_after.desc()).limit(NUM_TO_KEEP)
    return delete(RTIF).where(
        RTIF.dag_id == DAG_ID, RTIF.task_id == TASK_ID,
        RTIF.run_id.not_in(run_ids.scalar_subquery()))


def benchmark(session, stmt, label):
    start = time.perf_counter()
    result = session.execute(stmt)
    elapsed = time.perf_counter() - start
    print(f"{label}: {result.rowcount} rows in {elapsed*1000:.2f} ms")
    return elapsed


def main():
    setup_test_data()
    with create_session() as session:
        old_time = benchmark(session, old_query(), "OLD")
        session.commit()

    setup_test_data()
    with create_session() as session:
        new_time = benchmark(session, new_query(), "NEW")
        session.commit()

    print(f"\nSpeedup: {old_time/new_time:.1f}x faster")


if __name__ == "__main__":
    main()

@kaxil kaxil requested a review from dstandish January 22, 2026 17:18
@kaxil kaxil changed the title Optimize RTIF delete_old_records query for DAGs with many mapped tasks Improve performance of rendered templates cleanup Jan 22, 2026
@kaxil kaxil added the full tests needed We need to run full set of tests for this PR to merge label Jan 22, 2026
@kaxil kaxil force-pushed the rtif-deletion-optimization branch from 8be4834 to ba544bf Compare January 22, 2026 17:22
@kaxil kaxil marked this pull request as ready for review January 22, 2026 17:27
@kaxil
Copy link
Member Author

kaxil commented Jan 22, 2026

Ofcourse it has to be MySQL!

sqlalchemy.exc.NotSupportedError: (MySQLdb.NotSupportedError) (1235, "This version of MySQL doesn't yet support 'LIMIT & IN/ALL/ANY/SOME subquery'")
[SQL: DELETE FROM rendered_task_instance_fields WHERE rendered_task_instance_fields.dag_id = %s AND rendered_task_instance_fields.task_id = %s AND (rendered_task_instance_fields.run_id NOT IN (SELECT dag_run.run_id 
FROM dag_run 
WHERE dag_run.dag_id = %s ORDER BY dag_run.run_after DESC 
 LIMIT %s))]
[parameters: ('test_dag', 'f', 'test_dag', 30)]

Copy link
Contributor

@dstandish dstandish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i never like to request changes, but i think you need to rename the config here so just wanted to signal that

@kaxil kaxil added this to the Airflow 3.2.0 milestone Jan 22, 2026
kaxil added 4 commits January 29, 2026 01:08
The delete_old_records query was scanning the RTIF table with a complex
NOT EXISTS subquery containing a join, causing statement timeouts and
scheduler crashes for DAGs with 3k+ mapped task instances (~100k RTIF
records).

Simplified the query to only use the dag_run table for finding recent
run_ids, avoiding the expensive RTIF table scan entirely. Benchmarks
show ~38x improvement (330ms -> 6ms for 5000 records).

Changes:
- Replaced NOT EXISTS with simple NOT IN on dag_run table
- Uses run_after instead of logical_date (avoids NULL for manual runs)
- Renamed config to max_num_rendered_ti_fields_per_dag_run with
  backward-compatible deprecation of old name
- Added test for sparse task behavior to document the semantic change

Note: Retention is now based on N most recent dag runs rather than N
most recent task executions. For sparse/conditional tasks, this may
result in fewer historical records being retained.
The delete_old_records query was scanning the RTIF table with a complex
NOT EXISTS subquery containing a join, causing statement timeouts and
scheduler crashes for DAGs with 3k+ mapped task instances (~100k RTIF
records).

Simplified the query to only use the dag_run table for finding recent
run_ids, avoiding the expensive RTIF table scan entirely. Benchmarks
show ~38x improvement (330ms -> 6ms for 5000 records).

Changes:
- Replaced NOT EXISTS with simple NOT IN on dag_run table
- Uses run_after instead of logical_date (avoids NULL for manual runs)
- Renamed config to max_num_rendered_ti_fields_per_dag_run with
  backward-compatible deprecation of old name
- Added test for sparse task behavior to document the semantic change

Note: Retention is now based on N most recent dag runs rather than N
most recent task executions. For sparse/conditional tasks, this may
result in fewer historical records being retained.
@kaxil kaxil force-pushed the rtif-deletion-optimization branch from 20195b1 to 68ddfdf Compare January 29, 2026 01:13
@kaxil kaxil requested a review from dstandish January 29, 2026 01:16
@kaxil kaxil merged commit 2db2896 into apache:main Jan 29, 2026
128 checks passed
@kaxil kaxil deleted the rtif-deletion-optimization branch January 29, 2026 16:29
sanchalitorpe-source pushed a commit to sanchalitorpe-source/airflow that referenced this pull request Jan 30, 2026
The delete_old_records query was scanning the RTIF table with a complex
NOT EXISTS subquery containing a join, causing statement timeouts and
scheduler crashes for DAGs with 3k+ mapped task instances (~100k RTIF
records).

Simplified the query to only use the dag_run table for finding recent
run_ids, avoiding the expensive RTIF table scan entirely. Benchmarks
show ~38x improvement (330ms -> 6ms for 5000 records).

Changes:
- Replaced NOT EXISTS with simple NOT IN on dag_run table
- Uses run_after instead of logical_date (avoids NULL for manual runs)
- Renamed config to max_num_rendered_ti_fields_per_dag_run with
  backward-compatible deprecation of old name
- Added test for sparse task behavior to document the semantic change

Note: Retention is now based on N most recent dag runs rather than N
most recent task executions. For sparse/conditional tasks, this may
result in fewer historical records being retained.

* Improve performance of rendered templates cleanup

The delete_old_records query was scanning the RTIF table with a complex
NOT EXISTS subquery containing a join, causing statement timeouts and
scheduler crashes for DAGs with 3k+ mapped task instances (~100k RTIF
records).

Simplified the query to only use the dag_run table for finding recent
run_ids, avoiding the expensive RTIF table scan entirely. Benchmarks
show ~38x improvement (330ms -> 6ms for 5000 records).

Changes:
- Replaced NOT EXISTS with simple NOT IN on dag_run table
- Uses run_after instead of logical_date (avoids NULL for manual runs)
- Renamed config to max_num_rendered_ti_fields_per_dag_run with
  backward-compatible deprecation of old name
- Added test for sparse task behavior to document the semantic change

Note: Retention is now based on N most recent dag runs rather than N
most recent task executions. For sparse/conditional tasks, this may
result in fewer historical records being retained.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants