Skip to content

Commit

Permalink
Avoid re-fetching DAG run in TriggerDagRunOperator (#27635)
Browse files Browse the repository at this point in the history
  • Loading branch information
Adityamalik123 committed Nov 16, 2022
1 parent c9c257d commit 4637c9e
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
4 changes: 1 addition & 3 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ def _trigger_dag(
dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)

if dag_run:
raise DagRunAlreadyExists(
f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
)
raise DagRunAlreadyExists(dag_run=dag_run, execution_date=execution_date, run_id=run_id)

run_conf = None
if conf:
Expand Down
11 changes: 10 additions & 1 deletion airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import datetime
import warnings
from http import HTTPStatus
from typing import Any, NamedTuple, Sized
from typing import TYPE_CHECKING, Any, NamedTuple, Sized

if TYPE_CHECKING:
from airflow.models import DagRun


class AirflowException(Exception):
Expand Down Expand Up @@ -185,6 +188,12 @@ class DagRunNotFound(AirflowNotFoundException):
class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run entry."""

def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, run_id: str) -> None:
super().__init__(
f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}"
)
self.dag_run = dag_run


class DagFileExists(AirflowBadRequest):
"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder."""
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def execute(self, context: Context):
dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
dag = dag_bag.get_dag(self.trigger_dag_id)
dag.clear(start_date=parsed_execution_date, end_date=parsed_execution_date)
dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
dag_run = e.dag_run
else:
raise e
if dag_run is None:
Expand Down
32 changes: 32 additions & 0 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ def test_trigger_dagrun_twice(self):
assert triggered_dag_run.execution_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)

def test_trigger_dagrun_with_scheduled_dag_run(self):
"""Test TriggerDagRunOperator with custom execution_date and scheduled dag_run."""
utc_now = timezone.utcnow()
task = TriggerDagRunOperator(
task_id="test_trigger_dagrun_with_execution_date",
trigger_dag_id=TRIGGERED_DAG_ID,
execution_date=utc_now,
dag=self.dag,
poke_interval=1,
reset_dag_run=True,
wait_for_completion=True,
)
run_id = f"scheduled__{utc_now.isoformat()}"
with create_session() as session:
dag_run = DagRun(
dag_id=TRIGGERED_DAG_ID,
execution_date=utc_now,
state=State.SUCCESS,
run_type="scheduled",
run_id=run_id,
)
session.add(dag_run)
session.commit()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
assert len(dagruns) == 1
triggered_dag_run = dagruns[0]
assert triggered_dag_run.external_trigger
assert triggered_dag_run.execution_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)

def test_trigger_dagrun_with_templated_execution_date(self):
"""Test TriggerDagRunOperator with templated execution_date."""
task = TriggerDagRunOperator(
Expand Down

0 comments on commit 4637c9e

Please sign in to comment.