Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid re-fetching DAG run in TriggerDagRunOperator #27635

Merged
merged 11 commits into from
Nov 16, 2022
Merged
4 changes: 1 addition & 3 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ def _trigger_dag(
dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)

if dag_run:
raise DagRunAlreadyExists(
f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"
)
raise DagRunAlreadyExists(dag_run=dag_run, execution_date=execution_date, run_id=run_id)

run_conf = None
if conf:
Expand Down
11 changes: 10 additions & 1 deletion airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import datetime
import warnings
from http import HTTPStatus
from typing import Any, NamedTuple, Sized
from typing import TYPE_CHECKING, Any, NamedTuple, Sized

if TYPE_CHECKING:
from airflow.models import DagRun


class AirflowException(Exception):
Expand Down Expand Up @@ -185,6 +188,12 @@ class DagRunNotFound(AirflowNotFoundException):
class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run entry."""

def __init__(self, dag_run: DagRun, execution_date: datetime.datetime, run_id: str) -> None:
super().__init__(
f"A DAG Run already exists for DAG {dag_run.dag_id} at {execution_date} with run id {run_id}"
)
self.dag_run = dag_run


class DagFileExists(AirflowBadRequest):
"""Raise when a DAG ID is still in DagBag i.e., DAG file is in DAG folder."""
Expand Down
2 changes: 1 addition & 1 deletion airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def execute(self, context: Context):
dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
dag = dag_bag.get_dag(self.trigger_dag_id)
dag.clear(start_date=parsed_execution_date, end_date=parsed_execution_date)
dag_run = DagRun.find(dag_id=dag.dag_id, run_id=run_id)[0]
dag_run = e.dag_run
else:
raise e
if dag_run is None:
Expand Down
32 changes: 32 additions & 0 deletions tests/operators/test_trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ def test_trigger_dagrun_twice(self):
assert triggered_dag_run.execution_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)

def test_trigger_dagrun_with_scheduled_dag_run(self):
"""Test TriggerDagRunOperator with custom execution_date and scheduled dag_run."""
utc_now = timezone.utcnow()
task = TriggerDagRunOperator(
task_id="test_trigger_dagrun_with_execution_date",
trigger_dag_id=TRIGGERED_DAG_ID,
execution_date=utc_now,
dag=self.dag,
poke_interval=1,
reset_dag_run=True,
wait_for_completion=True,
)
run_id = f"scheduled__{utc_now.isoformat()}"
with create_session() as session:
dag_run = DagRun(
dag_id=TRIGGERED_DAG_ID,
execution_date=utc_now,
state=State.SUCCESS,
run_type="scheduled",
run_id=run_id,
)
session.add(dag_run)
session.commit()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all()
assert len(dagruns) == 1
triggered_dag_run = dagruns[0]
assert triggered_dag_run.external_trigger
assert triggered_dag_run.execution_date == utc_now
self.assert_extra_link(triggered_dag_run, task, session)

def test_trigger_dagrun_with_templated_execution_date(self):
"""Test TriggerDagRunOperator with templated execution_date."""
task = TriggerDagRunOperator(
Expand Down