Skip to content

Commit

Permalink
Fix "airflow tasks render" cli command for mapped task instances (#28698
Browse files Browse the repository at this point in the history
)

The fix was to use the 'template_fields' attr directly since both mapped and unmapped
tasks now have that attribute.
I also had to use ti.task instead of the task from dag.get_task due to this error:
`AttributeError: 'DecoratedMappedOperator' object has no attribute 'templates_dict'` and
I wonder if this is a bug

(cherry picked from commit 1da17be)
  • Loading branch information
ephraimbuddy committed Jan 12, 2023
1 parent 2528a5b commit b4d65eb
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
9 changes: 5 additions & 4 deletions airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,21 +591,22 @@ def task_test(args, dag=None):

@cli_utils.action_cli(check_db=False)
@suppress_logs_and_warning
def task_render(args):
def task_render(args, dag=None):
"""Renders and displays templated fields for a given task."""
dag = get_dag(args.subdir, args.dag_id)
if not dag:
dag = get_dag(args.subdir, args.dag_id)
task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(
task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory"
)
ti.render_templates()
for attr in task.__class__.template_fields:
for attr in task.template_fields:
print(
textwrap.dedent(
f""" # ----------------------------------------------------------
# property: {attr}
# ----------------------------------------------------------
{getattr(task, attr)}
{getattr(ti.task, attr)}
"""
)
)
Expand Down
62 changes: 62 additions & 0 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.exceptions import AirflowException, DagRunNotFound
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.bash import BashOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
Expand Down Expand Up @@ -389,6 +390,67 @@ def test_task_render(self):
assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output

def test_mapped_task_render(self):
"""
tasks render should render and displays templated fields for a given mapping task
"""
with redirect_stdout(io.StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(
[
"tasks",
"render",
"test_mapped_classic",
"consumer_literal",
"2022-01-01",
"--map-index",
"0",
]
)
)
# the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapping task should have
# op_args=[1]
output = stdout.getvalue()
assert "[1]" in output
assert "[2]" not in output
assert "[3]" not in output
assert "property: op_args" in output

def test_mapped_task_render_with_template(self, dag_maker):
"""
tasks render should render and displays templated fields for a given mapping task
"""
with dag_maker() as dag:
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
commands = [templated_command, "echo 1"]

BashOperator.partial(task_id="some_command").expand(bash_command=commands)

with redirect_stdout(io.StringIO()) as stdout:
task_command.task_render(
self.parser.parse_args(
[
"tasks",
"render",
"test_dag",
"some_command",
"2022-01-01",
"--map-index",
"0",
]
),
dag=dag,
)

output = stdout.getvalue()
assert 'echo "2022-01-01"' in output
assert 'echo "2022-01-08"' in output

def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
"""
tasks run should return an AirflowException when invalid pickle_id is passed
Expand Down

0 comments on commit b4d65eb

Please sign in to comment.