From e715223d20fd7f96f92c9cc3cafc781a7afec8bf Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 3 Jan 2023 11:44:11 +0100 Subject: [PATCH] Fix "airflow tasks render" cli command for mapped task instances The fix was to use the 'template_fields' attr directly since both mapped and unmapped tasks now have that attribute. I also had to use ti.task instead of the task from dag.get_task due to this error: `AttributeError: 'DecoratedMappedOperator' object has no attribute 'templates_dict'` and I wonder if this is a bug --- airflow/cli/commands/task_command.py | 9 ++-- tests/cli/commands/test_task_command.py | 62 +++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py index 3561890a56eac3..1824bc722ac3bb 100644 --- a/airflow/cli/commands/task_command.py +++ b/airflow/cli/commands/task_command.py @@ -594,21 +594,22 @@ def task_test(args, dag=None): @cli_utils.action_cli(check_db=False) @suppress_logs_and_warning -def task_render(args): +def task_render(args, dag=None): """Renders and displays templated fields for a given task.""" - dag = get_dag(args.subdir, args.dag_id) + if not dag: + dag = get_dag(args.subdir, args.dag_id) task = dag.get_task(task_id=args.task_id) ti, _ = _get_ti( task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory" ) ti.render_templates() - for attr in task.__class__.template_fields: + for attr in task.template_fields: print( textwrap.dedent( f""" # ---------------------------------------------------------- # property: {attr} # ---------------------------------------------------------- - {getattr(task, attr)} + {getattr(ti.task, attr)} """ ) ) diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py index 1c012890dc1e1f..eb50c3635c903d 100644 --- a/tests/cli/commands/test_task_command.py +++ b/tests/cli/commands/test_task_command.py @@ -41,6 +41,7 @@ from airflow.exceptions import AirflowException, DagRunNotFound from airflow.models import DagBag, DagRun, Pool, TaskInstance from airflow.models.serialized_dag import SerializedDagModel +from airflow.operators.bash import BashOperator from airflow.utils import timezone from airflow.utils.session import create_session from airflow.utils.state import State @@ -390,6 +391,67 @@ def test_task_render(self): assert 'echo "2016-01-01"' in output assert 'echo "2016-01-08"' in output + def test_mapped_task_render(self): + """ + tasks render should render and displays templated fields for a given mapping task + """ + with redirect_stdout(io.StringIO()) as stdout: + task_command.task_render( + self.parser.parse_args( + [ + "tasks", + "render", + "test_mapped_classic", + "consumer_literal", + "2022-01-01", + "--map-index", + "0", + ] + ) + ) + # the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapping task should have + # op_args=[1] + output = stdout.getvalue() + assert "[1]" in output + assert "[2]" not in output + assert "[3]" not in output + assert "property: op_args" in output + + def test_mapped_task_render_with_template(self, dag_maker): + """ + tasks render should render and displays templated fields for a given mapping task + """ + with dag_maker() as dag: + templated_command = """ + {% for i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + {% endfor %} + """ + commands = [templated_command, "echo 1"] + + BashOperator.partial(task_id="some_command").expand(bash_command=commands) + + with redirect_stdout(io.StringIO()) as stdout: + task_command.task_render( + self.parser.parse_args( + [ + "tasks", + "render", + "test_dag", + "some_command", + "2022-01-01", + "--map-index", + "0", + ] + ), + dag=dag, + ) + + output = stdout.getvalue() + assert 'echo "2022-01-01"' in output + assert 'echo "2022-01-08"' in output + def test_cli_run_when_pickle_and_dag_cli_method_selected(self): """ tasks run should return an AirflowException when invalid pickle_id is passed