diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py index 20b55d56a3f9a..01078aa38e8ee 100644 --- a/airflow/cli/cli_parser.py +++ b/airflow/cli/cli_parser.py @@ -1170,6 +1170,7 @@ class GroupCommand(NamedTuple): args=( ARG_DAG_ID, ARG_EXECUTION_DATE, + ARG_CONF, ARG_SUBDIR, ARG_SHOW_DAGRUN, ARG_IMGCAT_DAGRUN, diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py index 559c0e75c3be2..ddfce4ebdbc72 100644 --- a/airflow/cli/commands/dag_command.py +++ b/airflow/cli/commands/dag_command.py @@ -451,6 +451,13 @@ def dag_list_dag_runs(args, dag=None, session=NEW_SESSION): @cli_utils.action_cli def dag_test(args, session=None): """Execute one single DagRun for a given DAG and execution date, using the DebugExecutor.""" + run_conf = None + if args.conf: + try: + run_conf = json.loads(args.conf) + except ValueError as e: + raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}") + dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False) try: @@ -458,6 +465,7 @@ def dag_test(args, session=None): executor=DebugExecutor(), start_date=args.execution_date, end_date=args.execution_date, + conf=run_conf, # Always run the DAG at least once even if no logical runs are # available. This does not make a lot of sense, but Airflow has # been doing this prior to 2.2 so we keep compatibility. diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py index f6c85468a5648..59df405629902 100644 --- a/tests/cli/commands/test_dag_command.py +++ b/tests/cli/commands/test_dag_command.py @@ -17,6 +17,7 @@ # under the License. import contextlib import io +import json import os import tempfile import unittest @@ -607,6 +608,40 @@ def test_dag_test(self, mock_get_dag, mock_executor): executor=mock_executor.return_value, start_date=cli_args.execution_date, end_date=cli_args.execution_date, + conf=None, + run_at_least_once=True, + ), + ] + ) + + @mock.patch("airflow.cli.commands.dag_command.DebugExecutor") + @mock.patch("airflow.cli.commands.dag_command.get_dag") + def test_dag_test_conf(self, mock_get_dag, mock_executor): + cli_args = self.parser.parse_args( + [ + 'dags', + 'test', + 'example_bash_operator', + DEFAULT_DATE.isoformat(), + "-c", + "{\"dag_run_conf_param\": \"param_value\"}", + ] + ) + dag_command.dag_test(cli_args) + + mock_get_dag.assert_has_calls( + [ + mock.call(subdir=cli_args.subdir, dag_id='example_bash_operator'), + mock.call().clear( + start_date=cli_args.execution_date, + end_date=cli_args.execution_date, + dag_run_state=False, + ), + mock.call().run( + executor=mock_executor.return_value, + start_date=cli_args.execution_date, + end_date=cli_args.execution_date, + conf=json.loads(cli_args.conf), run_at_least_once=True, ), ] @@ -636,6 +671,7 @@ def test_dag_test_show_dag(self, mock_get_dag, mock_executor, mock_render_dag): executor=mock_executor.return_value, start_date=cli_args.execution_date, end_date=cli_args.execution_date, + conf=None, run_at_least_once=True, ), ]