From d0e22ad6d30d55fa9818967a1dd78963f660a614 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 14:44:04 +0530 Subject: [PATCH 01/18] restore unique constraint on logical date and make it nullable --- .../0032_3_0_0_drop_execution_date_unique.py | 28 +++++++++++++++++-- airflow/models/dagrun.py | 3 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 1 - docs/apache-airflow/migrations-ref.rst | 2 +- 5 files changed, 29 insertions(+), 7 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py index 399cc8aff91f3..023df3d77bedf 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py @@ -17,9 +17,9 @@ # under the License. """ -Drop ``execution_date`` unique constraint on DagRun. +Make logical_date nullable. -The column has also been renamed to logical_date, although the Python model is +The column has been renamed to logical_date, although the Python model is not changed. This allows us to not need to fix all the Python code at once, but still do the two changes in one migration instead of two. @@ -49,11 +49,21 @@ def upgrade(): "execution_date", new_column_name="logical_date", existing_type=TIMESTAMP(timezone=True), - existing_nullable=False, + existing_nullable=True, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.create_unique_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) + batch_op.create_unique_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) + def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -63,6 +73,18 @@ def downgrade(): existing_type=TIMESTAMP(timezone=True), existing_nullable=False, ) + + with op.batch_alter_table("dag_run", schema=None) as batch_op: + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) + batch_op.drop_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) + with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 35d8af4322c49..826f2a6d4f18c 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -130,7 +130,7 @@ class DagRun(Base, LoggingMixin): id = Column(Integer, primary_key=True) dag_id = Column(StringID(), nullable=False) queued_at = Column(UtcDateTime) - logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=True) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) _state = Column("state", String(50), default=DagRunState.QUEUED) @@ -179,6 +179,7 @@ class DagRun(Base, LoggingMixin): __table_args__ = ( Index("dag_id_state", dag_id, _state), UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"), + UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"), Index("idx_dag_run_dag_id", dag_id), Index( "idx_dag_run_running_dags", diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index b058ae34b3783..52fe755bd20b1 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -eb25e0718c9382cdbb02368c9c3e29c90da06ddaba8e8e92d9fc53417b714039 \ No newline at end of file +4afe2ab3d7a9ab2f43e5355b6a720082c81f98f9100e074700e2b30600ce1a1d \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 92ab9bfb7855c..21b055d371d5f 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1852,7 +1852,6 @@ logical_date [TIMESTAMP] - NOT NULL queued_at diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 62013ff8f799c..95572283cf800 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -86,7 +86,7 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Drop ``execution_date`` unique constraint on DagRun. | +| ``1cdc775ca98f`` | ``a2c32e6c7729`` | ``3.0.0`` | Make logical_date nullable. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``a2c32e6c7729`` | ``0bfc26bc256e`` | ``3.0.0`` | Add triggered_by field to DagRun. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From 2b11aed1d773a1b6f5d1f2b8e4735f67673c46b2 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 15:06:06 +0530 Subject: [PATCH 02/18] restore unique constraint on logical date and make it nullable --- ...tion_date_to_logical_date_and_nullable.py} | 30 ++++++++----------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 14 insertions(+), 18 deletions(-) rename airflow/migrations/versions/{0032_3_0_0_drop_execution_date_unique.py => 0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py} (83%) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py similarity index 83% rename from airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py rename to airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 023df3d77bedf..175840b56de96 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -51,18 +51,17 @@ def upgrade(): existing_type=TIMESTAMP(timezone=True), existing_nullable=True, ) - with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.create_unique_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) + batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_logical_date_key", columns=["dag_id", "logical_date"], ) + batch_op.create_unique_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) def downgrade(): @@ -75,17 +74,14 @@ def downgrade(): ) with op.batch_alter_table("dag_run", schema=None) as batch_op: - with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) - batch_op.drop_constraint( - "dag_run_dag_id_logical_date_key", - columns=["dag_id", "logical_date"], - ) - - with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_constraint( + "dag_run_dag_id_run_id_key", + columns=["dag_id", "run_id"], + ) + batch_op.drop_constraint( + "dag_run_dag_id_logical_date_key", + columns=["dag_id", "logical_date"], + ) batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", columns=["dag_id", "execution_date"], diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 52fe755bd20b1..39b515342f97c 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -4afe2ab3d7a9ab2f43e5355b6a720082c81f98f9100e074700e2b30600ce1a1d \ No newline at end of file +8134c08556ea8dd84625f77c5fa0e3f06dc75393026c07c53b60b364e697d08b \ No newline at end of file From 237360de42e244bf230dd732a66aa68f9168011d Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 15:12:23 +0530 Subject: [PATCH 03/18] fix migration file --- ...name_execution_date_to_logical_date_and_nullable.py | 10 ++-------- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 175840b56de96..1269cf77c5c28 100644 --- a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -74,14 +74,8 @@ def downgrade(): ) with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) - batch_op.drop_constraint( - "dag_run_dag_id_logical_date_key", - columns=["dag_id", "logical_date"], - ) + batch_op.drop_constraint("dag_run_dag_id_run_id_key", type_="unique") + batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", columns=["dag_id", "execution_date"], diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 39b515342f97c..e515f06d962cb 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -8134c08556ea8dd84625f77c5fa0e3f06dc75393026c07c53b60b364e697d08b \ No newline at end of file +4517b7d80a3ec9653cec714d7959c961767e154117a2bd5c136c76545467b446 \ No newline at end of file From 18efafba30d482bfcfcc5ca8c1d929d22a41e529 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Wed, 29 Jan 2025 15:36:17 +0530 Subject: [PATCH 04/18] fix migration file --- ...0_0_rename_execution_date_to_logical_date_and_nullable.py | 5 ----- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py index 1269cf77c5c28..b559bd2102485 100644 --- a/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py +++ b/airflow/migrations/versions/0032_3_0_0_rename_execution_date_to_logical_date_and_nullable.py @@ -58,10 +58,6 @@ def upgrade(): "dag_run_dag_id_logical_date_key", columns=["dag_id", "logical_date"], ) - batch_op.create_unique_constraint( - "dag_run_dag_id_run_id_key", - columns=["dag_id", "run_id"], - ) def downgrade(): @@ -74,7 +70,6 @@ def downgrade(): ) with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.drop_constraint("dag_run_dag_id_run_id_key", type_="unique") batch_op.drop_constraint("dag_run_dag_id_logical_date_key", type_="unique") batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index e515f06d962cb..ce2da0b1c30ae 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -4517b7d80a3ec9653cec714d7959c961767e154117a2bd5c136c76545467b446 \ No newline at end of file +1a1ce116676867566f504fdd398304ca1a2cb27cfa9f2e3799b2c2eb9cd38119 \ No newline at end of file From cefc9a340276359695b28d6e16acec416773e960 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Thu, 30 Jan 2025 21:41:49 +0530 Subject: [PATCH 05/18] refactor backfill reprocess logic --- airflow/models/backfill.py | 27 +++++++++++++++++++++++++-- tests/models/test_backfill.py | 24 ++++++++++++------------ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 1564c4feb59a5..5bb5376530250 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -141,6 +140,7 @@ class BackfillDagRunExceptionReason(str, Enum): IN_FLIGHT = "in flight" ALREADY_EXISTS = "already exists" UNKNOWN = "unknown" + CLEARED_RUN = "cleared run" class BackfillDagRun(Base): @@ -194,7 +194,11 @@ def _get_latest_dag_run_row_query(info, session): def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None: non_create_reason = None if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + if dr.clear_number == 0: + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + else: + non_create_reason = BackfillDagRunExceptionReason.CLEARED_RUN + elif reprocess_behavior is ReprocessBehavior.NONE: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS elif reprocess_behavior is ReprocessBehavior.FAILED: @@ -262,7 +266,23 @@ def _create_backfill_dag_run( dag_run_conf, backfill_sort_ordinal, session, + from_date, + to_date, ): + from airflow.models import DAG + + dr = session.scalar(_get_latest_dag_run_row_query(info, session)) + if ( + dr + and dr.state not in {DagRunState.RUNNING} + and reprocess_behavior in {ReprocessBehavior.COMPLETED, ReprocessBehavior.FAILED} + ): + DAG.clear_dags( + [dag], + start_date=from_date, + end_date=to_date, + dag_run_state=DagRunState.QUEUED, + ) with session.begin_nested() as nested: dr = session.scalar( with_row_locks( @@ -277,6 +297,7 @@ def _create_backfill_dag_run( # which releases the lock on the latest dag run, since we # are not creating a new one nested.rollback() + print("inside non creation") session.add( BackfillDagRun( backfill_id=backfill_id, @@ -411,6 +432,8 @@ def _create_backfill( reprocess_behavior=br.reprocess_behavior, backfill_sort_ordinal=backfill_sort_ordinal, session=session, + from_date=from_date, + to_date=to_date, ) log.info( "created backfill dag run dag_id=%s backfill_id=%s, info=%s", diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index 7b1625e1043ad..a670a30afb64b 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -159,6 +159,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.NONE, { "2021-01-01": 1, + "2021-01-03": 1, + "2021-01-04": 1, + "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -166,9 +170,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.FAILED, { "2021-01-01": 1, - "2021-01-02": 1, "2021-01-03": 1, + "2021-01-04": 1, "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -176,11 +181,10 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.COMPLETED, { "2021-01-01": 1, - "2021-01-02": 1, "2021-01-03": 1, "2021-01-04": 1, - "2021-01-05": 1, "2021-01-06": 1, + "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -205,12 +209,8 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session): # whether a dag run is created for backfill depends on # the last run for a logical date ("2021-01-02", ["failed"]), - ("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed" - ("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success" - ("2021-01-05", ["success", "success"]), - ("2021-01-06", ["failed", "failed"]), - ("2021-01-07", ["running", "running"]), - ("2021-01-08", ["failed", "running"]), + ("2021-01-05", ["success"]), + ("2021-01-08", ["running"]), ] for state in states ] @@ -266,12 +266,12 @@ def _get_bdr(date): # 2021-01-04 is "failed" so it may or may not be reprocessed depending # on the configuration - bdr = _get_bdr("2021-01-04") + bdr = _get_bdr("2021-01-05") actual_reason = bdr.exception_reason if reprocess_behavior is ReprocessBehavior.FAILED: - assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS + assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN elif reprocess_behavior is ReprocessBehavior.COMPLETED: - assert actual_reason is None + assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN elif reprocess_behavior is ReprocessBehavior.NONE: assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS From 8f846067613a258af7cb3135b9c4443f5f4aef37 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Thu, 30 Jan 2025 23:08:48 +0530 Subject: [PATCH 06/18] fixing tests --- airflow/models/backfill.py | 3 +-- tests/www/views/test_views_tasks.py | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 5bb5376530250..0c8d79421c7bb 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -140,7 +140,7 @@ class BackfillDagRunExceptionReason(str, Enum): IN_FLIGHT = "in flight" ALREADY_EXISTS = "already exists" UNKNOWN = "unknown" - CLEARED_RUN = "cleared run" + CLEARED_RUN = "cleared existing run" class BackfillDagRun(Base): @@ -297,7 +297,6 @@ def _create_backfill_dag_run( # which releases the lock on the latest dag run, since we # are not creating a new one nested.rollback() - print("inside non creation") session.add( BackfillDagRun( backfill_id=backfill_id, diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py index 44c4316058171..dfac162f95e59 100644 --- a/tests/www/views/test_views_tasks.py +++ b/tests/www/views/test_views_tasks.py @@ -396,6 +396,7 @@ def test_rendered_k8s_without_k8s(admin_client): def test_tree_trigger_origin_tree_view(app, admin_client): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + clear_db_runs() app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id="test", run_type=DagRunType.SCHEDULED, @@ -415,6 +416,7 @@ def test_tree_trigger_origin_tree_view(app, admin_client): def test_graph_trigger_origin_grid_view(app, admin_client): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + clear_db_runs() app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id="test", run_type=DagRunType.SCHEDULED, @@ -434,6 +436,7 @@ def test_graph_trigger_origin_grid_view(app, admin_client): def test_gantt_trigger_origin_grid_view(app, admin_client): triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {} + clear_db_runs() app.dag_bag.get_dag("example_bash_operator").create_dagrun( run_id="test", run_type=DagRunType.SCHEDULED, From 322b25c27b45aea71d9c6f2acfaf2aa1fd0670a1 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Sat, 1 Feb 2025 17:42:33 +0530 Subject: [PATCH 07/18] fix tests --- .../core_api/routes/public/test_assets.py | 18 +++++++++---- .../core_api/routes/public/test_dag_run.py | 27 +++---------------- .../routes/public/test_task_instances.py | 20 +++++++++----- 3 files changed, 29 insertions(+), 36 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index a48c0da87fc7a..0e9d5d051b511 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -149,7 +149,7 @@ def _create_dag_run(session, num: int = 2): dag_id="source_dag_id", run_id=f"source_run_id_{i}", run_type=DagRunType.MANUAL, - logical_date=DEFAULT_DATE, + logical_date=DEFAULT_DATE + timedelta(days=i), start_date=DEFAULT_DATE, data_interval=(DEFAULT_DATE, DEFAULT_DATE), external_trigger=True, @@ -554,7 +554,9 @@ def test_should_respond_200(self, test_client, session): { "run_id": "source_run_id_1", "dag_id": "source_dag_id", - "logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + "logical_date": from_datetime_to_zulu_without_ms( + DEFAULT_DATE + timedelta(days=1) + ), "start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "state": "success", @@ -579,7 +581,9 @@ def test_should_respond_200(self, test_client, session): { "run_id": "source_run_id_2", "dag_id": "source_dag_id", - "logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + "logical_date": from_datetime_to_zulu_without_ms( + DEFAULT_DATE + timedelta(days=2) + ), "start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "state": "success", @@ -722,7 +726,9 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "run_id": "source_run_id_1", "dag_id": "source_dag_id", - "logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + "logical_date": from_datetime_to_zulu_without_ms( + DEFAULT_DATE + timedelta(days=1) + ), "start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "state": "success", @@ -747,7 +753,9 @@ def test_should_mask_sensitive_extra(self, test_client, session): { "run_id": "source_run_id_2", "dag_id": "source_dag_id", - "logical_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), + "logical_date": from_datetime_to_zulu_without_ms( + DEFAULT_DATE + timedelta(days=2) + ), "start_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "end_date": from_datetime_to_zulu_without_ms(DEFAULT_DATE), "state": "success", diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index b316b0119dd19..ef3049311e87c 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1319,10 +1319,9 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio ) @time_machine.travel(timezone.utcnow(), tick=False) - def test_should_response_200_for_duplicate_logical_date(self, test_client): + def test_should_response_409_for_duplicate_logical_date(self, test_client): RUN_ID_1 = "random_1" RUN_ID_2 = "random_2" - now = timezone.utcnow().isoformat().replace("+00:00", "Z") note = "duplicate logical date test" response_1 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", @@ -1333,28 +1332,8 @@ def test_should_response_200_for_duplicate_logical_date(self, test_client): json={"dag_run_id": RUN_ID_2, "note": note}, ) - assert response_1.status_code == response_2.status_code == 200 - body1 = response_1.json() - body2 = response_2.json() - - for each_run_id, each_body in [(RUN_ID_1, body1), (RUN_ID_2, body2)]: - assert each_body == { - "dag_run_id": each_run_id, - "dag_id": DAG1_ID, - "logical_date": now, - "queued_at": now, - "start_date": None, - "end_date": None, - "data_interval_start": now, - "data_interval_end": now, - "last_scheduling_decision": None, - "run_type": "manual", - "state": "queued", - "external_trigger": True, - "triggered_by": "rest_api", - "conf": {}, - "note": note, - } + assert response_1.status_code == 200 + assert response_2.status_code == 409 @pytest.mark.parametrize( "data_interval_start, data_interval_end", diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py b/tests/api_fastapi/core_api/routes/public/test_task_instances.py index 80cf61dc684cf..1d29cd5d74964 100644 --- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py @@ -20,6 +20,7 @@ import datetime as dt import itertools import os +from datetime import timedelta from unittest import mock import pendulum @@ -1022,9 +1023,15 @@ def test_should_respond_200_for_dag_id_filter(self, test_client, session): assert count == len(response.json()["task_instances"]) @pytest.mark.parametrize( - "order_by_field", ["start_date", "logical_date", "data_interval_start", "data_interval_end"] + "order_by_field, date", + [ + ("start_date", DEFAULT_DATETIME_1 + timedelta(days=20)), + ("logical_date", DEFAULT_DATETIME_2), + ("data_interval_start", DEFAULT_DATETIME_1 + timedelta(days=5)), + ("data_interval_end", DEFAULT_DATETIME_2 + timedelta(days=8)), + ], ) - def test_should_respond_200_for_order_by(self, order_by_field, test_client, session): + def test_should_respond_200_for_order_by(self, order_by_field, date, test_client, session): dag_id = "example_python_operator" dag_runs = [ @@ -1032,10 +1039,10 @@ def test_should_respond_200_for_order_by(self, order_by_field, test_client, sess dag_id=dag_id, run_id=f"run_{i}", run_type=DagRunType.MANUAL, - logical_date=DEFAULT_DATETIME_1 + dt.timedelta(days=i), + logical_date=date + dt.timedelta(days=i), data_interval=( - DEFAULT_DATETIME_1 + dt.timedelta(days=i), - DEFAULT_DATETIME_1 + dt.timedelta(days=i, hours=1), + date + dt.timedelta(days=i), + date + dt.timedelta(days=i, hours=1), ), ) for i in range(10) @@ -1046,8 +1053,7 @@ def test_should_respond_200_for_order_by(self, order_by_field, test_client, sess self.create_task_instances( session, task_instances=[ - {"run_id": f"run_{i}", "start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 1))} - for i in range(10) + {"run_id": f"run_{i}", "start_date": date + dt.timedelta(minutes=(i + 1))} for i in range(10) ], dag_id=dag_id, ) From c3e29c77ffbff391596333d8a7a476097fc91df3 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 10:49:18 +0530 Subject: [PATCH 08/18] remove default date from logical date in dag run model --- airflow/models/dag.py | 5 +++-- airflow/models/dagrun.py | 12 +++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a0f6e901dc3e9..509602a75ffa8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1719,7 +1719,7 @@ def create_dagrun( self, *, run_id: str, - logical_date: datetime, + logical_date: datetime | None, data_interval: tuple[datetime, datetime], conf: dict | None = None, run_type: DagRunType, @@ -1743,7 +1743,8 @@ def create_dagrun( :meta private: """ - logical_date = timezone.coerce_datetime(logical_date) + if logical_date is not None: + logical_date = timezone.coerce_datetime(logical_date) if data_interval and not isinstance(data_interval, DataInterval): data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index c1f0c8d24e60e..c366a70e22142 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -130,7 +130,7 @@ class DagRun(Base, LoggingMixin): id = Column(Integer, primary_key=True) dag_id = Column(StringID(), nullable=False) queued_at = Column(UtcDateTime) - logical_date = Column(UtcDateTime, default=timezone.utcnow, nullable=True) + logical_date = Column(UtcDateTime, nullable=True) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) _state = Column("state", String(50), default=DagRunState.QUEUED) @@ -1305,8 +1305,14 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None: def task_filter(task: Operator) -> bool: return task.task_id not in task_ids and ( self.run_type == DagRunType.BACKFILL_JOB - or (task.start_date is None or task.start_date <= self.logical_date) - and (task.end_date is None or self.logical_date <= task.end_date) + or ( + task.start_date is None + or (self.logical_date is not None and task.start_date <= self.logical_date) + ) + and ( + task.end_date is None + or (self.logical_date is not None and self.logical_date <= task.end_date) + ) ) created_counts: dict[str, int] = defaultdict(int) From 2b4251d7e8658192721b9fecf3fb6b6a52a3d732 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 16:34:25 +0530 Subject: [PATCH 09/18] fix task_filter --- airflow/models/dagrun.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index c366a70e22142..c1ddc9a808270 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1307,12 +1307,10 @@ def task_filter(task: Operator) -> bool: self.run_type == DagRunType.BACKFILL_JOB or ( task.start_date is None - or (self.logical_date is not None and task.start_date <= self.logical_date) - ) - and ( - task.end_date is None - or (self.logical_date is not None and self.logical_date <= task.end_date) + or self.logical_date is None + or task.start_date <= self.logical_date ) + and (task.end_date is None or self.logical_date is None or self.logical_date <= task.end_date) ) created_counts: dict[str, int] = defaultdict(int) From 90595d68a4a1c7bfbccdf838bcde2ab84541c9ec Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 18:25:32 +0530 Subject: [PATCH 10/18] fix failing tests --- tests/models/test_taskinstance.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index ab43b61cb95e9..4487fbf64413a 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2303,7 +2303,13 @@ def test_outlet_assets(self, create_task_instance, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag1.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag1.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag1.get_task("producing_task_1") task.bash_command = "echo 1" # make it go faster @@ -2362,7 +2368,13 @@ def test_outlet_assets_failed(self, create_task_instance, testing_dag_bundle): dagbag.collect_dags(only_if_updated=False, safe_mode=False) dagbag.sync_to_db("testing", None, session=session) run_id = str(uuid4()) - dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag_with_fail_task.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag_with_fail_task.get_task("fail_task") ti = TaskInstance(task, run_id=run_id) @@ -2421,7 +2433,13 @@ def test_outlet_assets_skipped(self, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) + dr = DagRun( + dag_with_skip_task.dag_id, + run_id=run_id, + run_type="manual", + state=DagRunState.RUNNING, + logical_date=timezone.utcnow(), + ) session.merge(dr) task = dag_with_skip_task.get_task("skip_task") ti = TaskInstance(task, run_id=run_id) From fd4c069a1e5d8d9c138042ea8b0b3ad26a91be8f Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 3 Feb 2025 23:09:07 +0530 Subject: [PATCH 11/18] make logical date as required field --- .../core_api/datamodels/dag_run.py | 10 +---- .../core_api/openapi/v1-generated.yaml | 8 ++++ .../core_api/routes/public/dag_run.py | 6 ++- .../ui/openapi-gen/requests/schemas.gen.ts | 13 ++++++ airflow/ui/openapi-gen/requests/types.gen.ts | 1 + airflow/ui/src/queries/useTrigger.ts | 2 + .../core_api/routes/public/test_dag_run.py | 42 ++++++++++++++----- 7 files changed, 63 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 78e0254f62240..314216f22ae3e 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,11 +20,10 @@ from datetime import datetime from enum import Enum -from pydantic import AwareDatetime, Field, NonNegativeInt, computed_field, model_validator +from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel from airflow.models import DagRun -from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -82,6 +81,7 @@ class TriggerDAGRunPostBody(StrictBaseModel): """Trigger DAG Run Serializer for POST body.""" dag_run_id: str | None = None + logical_date: AwareDatetime | None data_interval_start: AwareDatetime | None = None data_interval_end: AwareDatetime | None = None @@ -102,12 +102,6 @@ def validate_dag_run_id(self): self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) return self - # Mypy issue https://github.com/python/mypy/issues/1362 - @computed_field # type: ignore[misc] - @property - def logical_date(self) -> datetime: - return timezone.utcnow() - class DAGRunsBatchBody(StrictBaseModel): """List DAG Runs body for batch endpoint.""" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index d06d09428a5d6..96d81a1ac2f71 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -10284,6 +10284,12 @@ components: - type: string - type: 'null' title: Dag Run Id + logical_date: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date data_interval_start: anyOf: - type: string @@ -10306,6 +10312,8 @@ components: title: Note additionalProperties: false type: object + required: + - logical_date title: TriggerDAGRunPostBody description: Trigger DAG Run Serializer for POST body. TriggerResponse: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 80db78d87f512..601101f7dcc7e 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -66,6 +66,7 @@ from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval +from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -351,7 +352,10 @@ def trigger_dag_run( f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", ) - logical_date = pendulum.instance(body.logical_date) + if body.logical_date is not None: + logical_date = pendulum.instance(body.logical_date) + else: + logical_date = pendulum.instance(timezone.utcnow()) try: dag: DAG = request.app.state.dag_bag.get_dag(dag_id) diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index bbb7a033c3167..ccef2940cb764 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -5752,6 +5752,18 @@ export const $TriggerDAGRunPostBody = { ], title: "Dag Run Id", }, + logical_date: { + anyOf: [ + { + type: "string", + format: "date-time", + }, + { + type: "null", + }, + ], + title: "Logical Date", + }, data_interval_start: { anyOf: [ { @@ -5794,6 +5806,7 @@ export const $TriggerDAGRunPostBody = { }, additionalProperties: false, type: "object", + required: ["logical_date"], title: "TriggerDAGRunPostBody", description: "Trigger DAG Run Serializer for POST body.", } as const; diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index e687dacaa0188..cf24770927ff7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1402,6 +1402,7 @@ export type TimeDelta = { */ export type TriggerDAGRunPostBody = { dag_run_id?: string | null; + logical_date: string | null; data_interval_start?: string | null; data_interval_end?: string | null; conf?: { diff --git a/airflow/ui/src/queries/useTrigger.ts b/airflow/ui/src/queries/useTrigger.ts index 0a6e6f492abb9..fa94bd97229df 100644 --- a/airflow/ui/src/queries/useTrigger.ts +++ b/airflow/ui/src/queries/useTrigger.ts @@ -101,6 +101,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce const checkDagRunId = dagRunRequestBody.dagRunId === "" ? undefined : dagRunRequestBody.dagRunId; const checkNote = dagRunRequestBody.note === "" ? undefined : dagRunRequestBody.note; + const logicalDate = new Date().toISOString(); mutate({ dagId, @@ -109,6 +110,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce dag_run_id: checkDagRunId, data_interval_end: formattedDataIntervalEnd, data_interval_start: formattedDataIntervalStart, + logical_date: logicalDate, note: checkNote, }, }); diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 21955cbe1ba45..bd475f7034dbe 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1145,7 +1145,7 @@ def test_should_respond_200( ): fixed_now = timezone.utcnow().isoformat() - request_json = {"note": note} + request_json = {"note": note, "logical_date": fixed_now} if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id if data_interval_start is not None: @@ -1289,29 +1289,34 @@ def test_should_respond_200( ], ) def test_invalid_data(self, test_client, post_body, expected_detail): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") + post_body["logical_date"] = now response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json=post_body) assert response.status_code == 422 assert response.json() == expected_detail @mock.patch("airflow.models.DAG.create_dagrun") def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, test_client): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") error_message = "Encountered Error" mock_create_dagrun.side_effect = ValueError(error_message) - response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={}) + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"logical_date": now}) assert response.status_code == 400 assert response.json() == {"detail": error_message} def test_should_respond_404_if_a_dag_is_inactive(self, test_client, session): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") self._dags_for_trigger_tests(session) - response = test_client.post("/public/dags/inactive/dagRuns", json={}) + response = test_client.post("/public/dags/inactive/dagRuns", json={"logical_date": now}) assert response.status_code == 404 assert response.json()["detail"] == "DAG with dag_id: 'inactive' not found" def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, session): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") self._dags_for_trigger_tests(session) - response = test_client.post("/public/dags/import_errors/dagRuns", json={}) + response = test_client.post("/public/dags/import_errors/dagRuns", json={"logical_date": now}) assert response.status_code == 400 assert ( response.json()["detail"] @@ -1320,16 +1325,17 @@ def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, sessio @time_machine.travel(timezone.utcnow(), tick=False) def test_should_response_409_for_duplicate_logical_date(self, test_client): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") RUN_ID_1 = "random_1" RUN_ID_2 = "random_2" note = "duplicate logical date test" response_1 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_1, "note": note}, + json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now}, ) response_2 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_2, "note": note}, + json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now}, ) assert response_1.status_code == 200 @@ -1351,9 +1357,14 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): def test_should_response_422_for_missing_start_date_or_end_date( self, test_client, data_interval_start, data_interval_end ): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") response = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"data_interval_start": data_interval_start, "data_interval_end": data_interval_end}, + json={ + "data_interval_start": data_interval_start, + "data_interval_end": data_interval_end, + "logical_date": now, + }, ) assert response.status_code == 422 assert ( @@ -1362,21 +1373,32 @@ def test_should_response_422_for_missing_start_date_or_end_date( ) def test_raises_validation_error_for_invalid_params(self, test_client): + now = timezone.utcnow().isoformat().replace("+00:00", "Z") response = test_client.post( f"/public/dags/{DAG2_ID}/dagRuns", - json={"conf": {"validated_number": 5000}}, + json={"conf": {"validated_number": 5000}, "logical_date": now}, ) assert response.status_code == 400 assert "Invalid input for param validated_number" in response.json()["detail"] def test_response_404(self, test_client): - response = test_client.post("/public/dags/randoms/dagRuns", json={}) + now = timezone.utcnow().isoformat().replace("+00:00", "Z") + response = test_client.post("/public/dags/randoms/dagRuns", json={"logical_date": now}) assert response.status_code == 404 assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found" def test_response_409(self, test_client): - response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID}) + now = timezone.utcnow().isoformat().replace("+00:00", "Z") + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID, "logical_date": now} + ) assert response.status_code == 409 response_json = response.json() assert "detail" in response_json assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"] + + def test_null_logical_data(self, test_client): + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": "test", "logical_date": None} + ) + assert response.status_code == 200 From 78f8f43fc189c470da882ee743f5659064a31d1d Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Tue, 4 Feb 2025 00:23:13 +0530 Subject: [PATCH 12/18] refactor --- .../core_api/datamodels/dag_run.py | 5 ++++- .../core_api/routes/public/dag_run.py | 21 +++++++++---------- airflow/ui/src/queries/useTrigger.ts | 3 +-- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 314216f22ae3e..d5d668e25507b 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -20,6 +20,7 @@ from datetime import datetime from enum import Enum +import pendulum from pydantic import AwareDatetime, Field, NonNegativeInt, model_validator from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel @@ -99,7 +100,9 @@ def check_data_intervals(cls, values): @model_validator(mode="after") def validate_dag_run_id(self): if not self.dag_run_id: - self.dag_run_id = DagRun.generate_run_id(DagRunType.MANUAL, self.logical_date) + self.dag_run_id = DagRun.generate_run_id( + DagRunType.MANUAL, self.logical_date if self.logical_date is not None else pendulum.now("UTC") + ) return self diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 601101f7dcc7e..3d4085be72973 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -66,7 +66,6 @@ from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval -from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -352,10 +351,7 @@ def trigger_dag_run( f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", ) - if body.logical_date is not None: - logical_date = pendulum.instance(body.logical_date) - else: - logical_date = pendulum.instance(timezone.utcnow()) + logical_date = pendulum.instance(body.logical_date) if body.logical_date is not None else None try: dag: DAG = request.app.state.dag_bag.get_dag(dag_id) @@ -366,16 +362,19 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date) + data_interval = dag.timetable.infer_manual_data_interval( + run_after=logical_date or pendulum.now("UTC") + ) - if body.dag_run_id: - run_id = body.dag_run_id - else: - run_id = dag.timetable.generate_run_id( + run_id = ( + body.dag_run_id + if body.dag_run_id + else dag.timetable.generate_run_id( run_type=DagRunType.MANUAL, - logical_date=logical_date, + logical_date=logical_date or pendulum.now("UTC"), data_interval=data_interval, ) + ) dag_run = dag.create_dagrun( run_id=run_id, diff --git a/airflow/ui/src/queries/useTrigger.ts b/airflow/ui/src/queries/useTrigger.ts index fa94bd97229df..19d6f22ad43a6 100644 --- a/airflow/ui/src/queries/useTrigger.ts +++ b/airflow/ui/src/queries/useTrigger.ts @@ -101,7 +101,6 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce const checkDagRunId = dagRunRequestBody.dagRunId === "" ? undefined : dagRunRequestBody.dagRunId; const checkNote = dagRunRequestBody.note === "" ? undefined : dagRunRequestBody.note; - const logicalDate = new Date().toISOString(); mutate({ dagId, @@ -110,7 +109,7 @@ export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSucce dag_run_id: checkDagRunId, data_interval_end: formattedDataIntervalEnd, data_interval_start: formattedDataIntervalStart, - logical_date: logicalDate, + logical_date: null, note: checkNote, }, }); From c8b8577497ec4b1e975dd874408d0efe92eccfe5 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 7 Feb 2025 08:43:18 +0530 Subject: [PATCH 13/18] remove backfill related changes --- airflow/models/backfill.py | 26 ++------------------------ tests/models/test_backfill.py | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 36 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index d4ad9a0100fc0..a2347e76fdc6e 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -140,7 +141,6 @@ class BackfillDagRunExceptionReason(str, Enum): IN_FLIGHT = "in flight" ALREADY_EXISTS = "already exists" UNKNOWN = "unknown" - CLEARED_RUN = "cleared existing run" class BackfillDagRun(Base): @@ -194,11 +194,7 @@ def _get_latest_dag_run_row_query(info, session): def _get_dag_run_no_create_reason(dr, reprocess_behavior: ReprocessBehavior) -> str | None: non_create_reason = None if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - if dr.clear_number == 0: - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - else: - non_create_reason = BackfillDagRunExceptionReason.CLEARED_RUN - + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT elif reprocess_behavior is ReprocessBehavior.NONE: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS elif reprocess_behavior is ReprocessBehavior.FAILED: @@ -266,23 +262,7 @@ def _create_backfill_dag_run( dag_run_conf, backfill_sort_ordinal, session, - from_date, - to_date, ): - from airflow.models import DAG - - dr = session.scalar(_get_latest_dag_run_row_query(info, session)) - if ( - dr - and dr.state not in {DagRunState.RUNNING} - and reprocess_behavior in {ReprocessBehavior.COMPLETED, ReprocessBehavior.FAILED} - ): - DAG.clear_dags( - [dag], - start_date=from_date, - end_date=to_date, - dag_run_state=DagRunState.QUEUED, - ) with session.begin_nested() as nested: dr = session.scalar( with_row_locks( @@ -432,8 +412,6 @@ def _create_backfill( reprocess_behavior=br.reprocess_behavior, backfill_sort_ordinal=backfill_sort_ordinal, session=session, - from_date=from_date, - to_date=to_date, ) log.info( "created backfill dag run dag_id=%s backfill_id=%s, info=%s", diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index 9874c79892390..0a1ad5e134921 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -164,10 +164,6 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.NONE, { "2021-01-01": 1, - "2021-01-03": 1, - "2021-01-04": 1, - "2021-01-06": 1, - "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -175,10 +171,9 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.FAILED, { "2021-01-01": 1, + "2021-01-02": 1, "2021-01-03": 1, - "2021-01-04": 1, "2021-01-06": 1, - "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -186,10 +181,11 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): ReprocessBehavior.COMPLETED, { "2021-01-01": 1, + "2021-01-02": 1, "2021-01-03": 1, "2021-01-04": 1, + "2021-01-05": 1, "2021-01-06": 1, - "2021-01-07": 1, "2021-01-09": 1, }, ), @@ -214,8 +210,12 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session): # whether a dag run is created for backfill depends on # the last run for a logical date ("2021-01-02", ["failed"]), - ("2021-01-05", ["success"]), - ("2021-01-08", ["running"]), + ("2021-01-03", ["success", "failed"]), # <-- 2021-01-03 is "failed" + ("2021-01-04", ["failed", "success"]), # <-- 2021-01-04 is "success" + ("2021-01-05", ["success", "success"]), + ("2021-01-06", ["failed", "failed"]), + ("2021-01-07", ["running", "running"]), + ("2021-01-08", ["failed", "running"]), ] for state in states ] @@ -271,12 +271,12 @@ def _get_bdr(date): # 2021-01-04 is "failed" so it may or may not be reprocessed depending # on the configuration - bdr = _get_bdr("2021-01-05") + bdr = _get_bdr("2021-01-04") actual_reason = bdr.exception_reason if reprocess_behavior is ReprocessBehavior.FAILED: - assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN + assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS elif reprocess_behavior is ReprocessBehavior.COMPLETED: - assert actual_reason == BackfillDagRunExceptionReason.CLEARED_RUN + assert actual_reason is None elif reprocess_behavior is ReprocessBehavior.NONE: assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS From 54674128071cc5d1f8ab873774c80a3760fd57e4 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 7 Feb 2025 11:31:19 +0530 Subject: [PATCH 14/18] implement review comments --- .../api_fastapi/core_api/datamodels/dag_run.py | 3 ++- .../core_api/routes/public/dag_run.py | 18 +++++------------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index d5d668e25507b..9ec31971b7477 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -85,7 +85,6 @@ class TriggerDAGRunPostBody(StrictBaseModel): logical_date: AwareDatetime | None data_interval_start: AwareDatetime | None = None data_interval_end: AwareDatetime | None = None - conf: dict = Field(default_factory=dict) note: str | None = None @@ -97,6 +96,8 @@ def check_data_intervals(cls, values): ) return values + ## when logical date is null, the run id should be generated from run_after + random string. + # TODO we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged @model_validator(mode="after") def validate_dag_run_id(self): if not self.dag_run_id: diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 3d4085be72973..8ce1372732882 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -66,6 +66,7 @@ from airflow.models import DAG, DagModel, DagRun from airflow.models.dag_version import DagVersion from airflow.timetables.base import DataInterval +from airflow.utils import timezone from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -352,6 +353,7 @@ def trigger_dag_run( ) logical_date = pendulum.instance(body.logical_date) if body.logical_date is not None else None + coerced_logical_date = timezone.coerce_datetime(logical_date) try: dag: DAG = request.app.state.dag_bag.get_dag(dag_id) @@ -363,22 +365,12 @@ def trigger_dag_run( ) else: data_interval = dag.timetable.infer_manual_data_interval( - run_after=logical_date or pendulum.now("UTC") + run_after=coerced_logical_date or pendulum.now("UTC") ) - run_id = ( - body.dag_run_id - if body.dag_run_id - else dag.timetable.generate_run_id( - run_type=DagRunType.MANUAL, - logical_date=logical_date or pendulum.now("UTC"), - data_interval=data_interval, - ) - ) - dag_run = dag.create_dagrun( - run_id=run_id, - logical_date=logical_date, + run_id=cast(str, body.dag_run_id), + logical_date=coerced_logical_date, data_interval=data_interval, run_after=data_interval.end, conf=body.conf, From d521d4df05a7a2711c29d6ed448dd229cf685317 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 7 Feb 2025 11:42:32 +0530 Subject: [PATCH 15/18] add time now to var --- airflow/api_fastapi/core_api/routes/public/dag_run.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 8ce1372732882..a45c43a433f12 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -343,6 +343,7 @@ def trigger_dag_run( ) -> DAGRunResponse: """Trigger a DAG.""" dm = session.scalar(select(DagModel).where(DagModel.is_active, DagModel.dag_id == dag_id).limit(1)) + now = pendulum.now("UTC") if not dm: raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG with dag_id: '{dag_id}' not found") @@ -364,9 +365,7 @@ def trigger_dag_run( end=pendulum.instance(body.data_interval_end), ) else: - data_interval = dag.timetable.infer_manual_data_interval( - run_after=coerced_logical_date or pendulum.now("UTC") - ) + data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date or now) dag_run = dag.create_dagrun( run_id=cast(str, body.dag_run_id), From afc73839aebd34db644c034218d7f5cb7d2c96b4 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 7 Feb 2025 14:19:46 +0530 Subject: [PATCH 16/18] fix tests --- .../core_api/routes/public/test_dag_run.py | 67 ++++++++++++++----- 1 file changed, 51 insertions(+), 16 deletions(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 92e19c839fcd5..25b27b3527622 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1131,12 +1131,7 @@ def _dags_for_trigger_tests(self, session=None): "dag_run_id, note, data_interval_start, data_interval_end", [ ("dag_run_5", "test-note", None, None), - ( - "dag_run_6", - "test-note", - "2024-01-03T00:00:00+00:00", - "2024-01-04T05:00:00+00:00", - ), + ("dag_run_6", "test-note", "2024-01-03T00:00:00+00:00", "2024-01-04T05:00:00+00:00"), (None, None, None, None), ], ) @@ -1145,7 +1140,7 @@ def test_should_respond_200( ): fixed_now = timezone.utcnow().isoformat() - request_json = {"note": note} + request_json = {"note": note, "logical_date": fixed_now} if dag_run_id is not None: request_json["dag_run_id"] = dag_run_id if data_interval_start is not None: @@ -1157,6 +1152,7 @@ def test_should_respond_200( f"/public/dags/{DAG1_ID}/dagRuns", json=request_json, ) + print(f"body is {response.json()}") assert response.status_code == 200 if dag_run_id is None: @@ -1289,29 +1285,34 @@ def test_should_respond_200( ], ) def test_invalid_data(self, test_client, post_body, expected_detail): + now = timezone.utcnow().isoformat() + post_body["logical_date"] = now response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json=post_body) assert response.status_code == 422 assert response.json() == expected_detail @mock.patch("airflow.models.DAG.create_dagrun") def test_dagrun_creation_exception_is_handled(self, mock_create_dagrun, test_client): + now = timezone.utcnow().isoformat() error_message = "Encountered Error" mock_create_dagrun.side_effect = ValueError(error_message) - response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={}) + response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"logical_date": now}) assert response.status_code == 400 assert response.json() == {"detail": error_message} def test_should_respond_404_if_a_dag_is_inactive(self, test_client, session): + now = timezone.utcnow().isoformat() self._dags_for_trigger_tests(session) - response = test_client.post("/public/dags/inactive/dagRuns", json={}) + response = test_client.post("/public/dags/inactive/dagRuns", json={"logical_date": now}) assert response.status_code == 404 assert response.json()["detail"] == "DAG with dag_id: 'inactive' not found" def test_should_respond_400_if_a_dag_has_import_errors(self, test_client, session): + now = timezone.utcnow().isoformat() self._dags_for_trigger_tests(session) - response = test_client.post("/public/dags/import_errors/dagRuns", json={}) + response = test_client.post("/public/dags/import_errors/dagRuns", json={"logical_date": now}) assert response.status_code == 400 assert ( response.json()["detail"] @@ -1326,11 +1327,11 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): note = "duplicate logical date test" response_1 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_1, "note": note}, + json={"dag_run_id": RUN_ID_1, "note": note, "logical_date": now}, ) response_2 = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"dag_run_id": RUN_ID_2, "note": note}, + json={"dag_run_id": RUN_ID_2, "note": note, "logical_date": now}, ) assert response_1.status_code == 200 @@ -1370,9 +1371,14 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): def test_should_response_422_for_missing_start_date_or_end_date( self, test_client, data_interval_start, data_interval_end ): + now = timezone.utcnow().isoformat() response = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", - json={"data_interval_start": data_interval_start, "data_interval_end": data_interval_end}, + json={ + "data_interval_start": data_interval_start, + "data_interval_end": data_interval_end, + "logical_date": now, + }, ) assert response.status_code == 422 assert ( @@ -1381,21 +1387,50 @@ def test_should_response_422_for_missing_start_date_or_end_date( ) def test_raises_validation_error_for_invalid_params(self, test_client): + now = timezone.utcnow().isoformat() response = test_client.post( f"/public/dags/{DAG2_ID}/dagRuns", - json={"conf": {"validated_number": 5000}}, + json={"conf": {"validated_number": 5000}, "logical_date": now}, ) assert response.status_code == 400 assert "Invalid input for param validated_number" in response.json()["detail"] def test_response_404(self, test_client): - response = test_client.post("/public/dags/randoms/dagRuns", json={}) + now = timezone.utcnow().isoformat() + response = test_client.post("/public/dags/randoms/dagRuns", json={"logical_date": now}) assert response.status_code == 404 assert response.json()["detail"] == "DAG with dag_id: 'randoms' not found" def test_response_409(self, test_client): - response = test_client.post(f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID}) + now = timezone.utcnow().isoformat() + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", json={"dag_run_id": DAG1_RUN1_ID, "logical_date": now} + ) assert response.status_code == 409 response_json = response.json() assert "detail" in response_json assert list(response_json["detail"].keys()) == ["reason", "statement", "orig_error"] + + def test_should_respond_200_with_null_logical_date(self, test_client): + response = test_client.post( + f"/public/dags/{DAG1_ID}/dagRuns", + json={"logical_date": None}, + ) + assert response.status_code == 200 + assert response.json() == { + "dag_run_id": mock.ANY, + "dag_id": DAG1_ID, + "logical_date": None, + "queued_at": mock.ANY, + "start_date": None, + "end_date": None, + "data_interval_start": mock.ANY, + "data_interval_end": mock.ANY, + "last_scheduling_decision": None, + "run_type": "manual", + "state": "queued", + "external_trigger": True, + "triggered_by": "rest_api", + "conf": {}, + "note": None, + } From b91f3fa9e6d33334e94a0fcd2b227dd20907f333 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Mon, 10 Feb 2025 14:56:45 +0530 Subject: [PATCH 17/18] fix review comments --- airflow/api_fastapi/core_api/datamodels/dag_run.py | 4 ++-- airflow/api_fastapi/core_api/routes/public/dag_run.py | 2 +- airflow/models/dag.py | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow/api_fastapi/core_api/datamodels/dag_run.py index 9ec31971b7477..18be129a1952c 100644 --- a/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -97,12 +97,12 @@ def check_data_intervals(cls, values): return values ## when logical date is null, the run id should be generated from run_after + random string. - # TODO we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged + # TODO: AIP83: we need to modify this validator after https://github.com/apache/airflow/pull/46398 is merged @model_validator(mode="after") def validate_dag_run_id(self): if not self.dag_run_id: self.dag_run_id = DagRun.generate_run_id( - DagRunType.MANUAL, self.logical_date if self.logical_date is not None else pendulum.now("UTC") + DagRunType.MANUAL, self.logical_date or pendulum.now("UTC") ) return self diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index 5adba7baec980..066d7bc06f344 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -356,7 +356,7 @@ def trigger_dag_run( f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", ) - logical_date = pendulum.instance(body.logical_date) if body.logical_date is not None else None + logical_date = timezone.coerce_datetime(body.logical_date) coerced_logical_date = timezone.coerce_datetime(logical_date) try: diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 74584e5318cad..7ebbb48b4b40d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1781,8 +1781,7 @@ def create_dagrun( :meta private: """ - if logical_date is not None: - logical_date = timezone.coerce_datetime(logical_date) + logical_date = timezone.coerce_datetime(logical_date) if data_interval and not isinstance(data_interval, DataInterval): data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval)) From ccb7ed1f602478a22400980fdd87581f32123094 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 <43964496+vatsrahul1001@users.noreply.github.com> Date: Mon, 10 Feb 2025 20:35:07 +0530 Subject: [PATCH 18/18] Update tests/api_fastapi/core_api/routes/public/test_dag_run.py Co-authored-by: Wei Lee --- tests/api_fastapi/core_api/routes/public/test_dag_run.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index 61fe114ef33c1..3723232a7cd31 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1160,7 +1160,6 @@ def test_should_respond_200( f"/public/dags/{DAG1_ID}/dagRuns", json=request_json, ) - print(f"body is {response.json()}") assert response.status_code == 200 if dag_run_id is None: