Skip to content

Commit

Permalink
Raise error when DagRun fails while running dag test (#36517)
Browse files Browse the repository at this point in the history
**Motivation**:

Currently, when using `airflow dags test`, there is no easy way to know programmatically if a DagRun fails since the state is not stored in DB. The way to do know relies on log lines as below:

```bash
state=$(airflow dags test exception_dag | grep "DagRun Finished" | awk -F, '{for(i=1;i<=NF;i++) if ($i ~ / state=/) print $i}' | awk -F= '{print $2}') if [[ $state == "failed" ]]; then exit 1 else exit 0 fi
```

This PR adds will return an exit code 1 when `airflow dags test` command if DagRun fails and makes it easy to integrate in CI for testing.

(cherry picked from commit 383ad31)
  • Loading branch information
kaxil authored and ephraimbuddy committed Jan 11, 2024
1 parent c93209b commit 21b58ec
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
5 changes: 4 additions & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}")
execution_date = args.execution_date or timezone.utcnow()
dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
dag.test(execution_date=execution_date, run_conf=run_conf, session=session)
dr: DagRun = dag.test(execution_date=execution_date, run_conf=run_conf, session=session)
show_dagrun = args.show_dagrun
imgcat = args.imgcat_dagrun
filename = args.save_dagrun
Expand All @@ -536,6 +536,9 @@ def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> No
if show_dagrun:
print(dot_graph.source)

if dr and dr.state == DagRunState.FAILED:
raise SystemExit("DagRun failed")


@cli_utils.action_cli
@providers_configuration_loaded
Expand Down
11 changes: 11 additions & 0 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.config import conf_vars
Expand Down Expand Up @@ -747,6 +748,16 @@ def test_dag_test(self, mock_get_dag):
]
)

@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_dag_test_fail_raise_error(self, mock_get_dag):
execution_date_str = DEFAULT_DATE.isoformat()
mock_get_dag.return_value.test.return_value = DagRun(
dag_id="example_bash_operator", execution_date=DEFAULT_DATE, state=DagRunState.FAILED
)
cli_args = self.parser.parse_args(["dags", "test", "example_bash_operator", execution_date_str])
with pytest.raises(SystemExit, match=r"DagRun failed"):
dag_command.dag_test(cli_args)

@mock.patch("airflow.cli.commands.dag_command.get_dag")
@mock.patch("airflow.utils.timezone.utcnow")
def test_dag_test_no_execution_date(self, mock_utcnow, mock_get_dag):
Expand Down

0 comments on commit 21b58ec

Please sign in to comment.