Skip to content

Commit

Permalink
Show custom instance names for a mapped task in UI (apache#36797)
Browse files Browse the repository at this point in the history
Co-authored-by: Brent Bovenzi <brent.bovenzi@gmail.com>
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
3 people authored and utkarsharma2 committed Apr 22, 2024
1 parent 15e9925 commit a603294
Show file tree
Hide file tree
Showing 26 changed files with 203 additions and 7 deletions.
7 changes: 7 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3626,6 +3626,13 @@ components:
type: string
sla_miss:
$ref: "#/components/schemas/SLAMiss"
rendered_map_index:
description: |
Rendered name of an expanded task instance, if the task is mapped.
*New in version 2.9.0*
type: string
nullable: true
rendered_fields:
description: |
JSON object describing rendered fields.
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class Meta:
executor_config = auto_field()
note = auto_field()
sla_miss = fields.Nested(SlaMissSchema, dump_default=None)
rendered_map_index = auto_field()
rendered_fields = JsonObjectField(dump_default={})
trigger = fields.Nested(TriggerSchema)
triggerer_job = fields.Nested(JobSchema)
Expand Down
1 change: 1 addition & 0 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> XComArg:
expand_input=EXPAND_INPUT_EMPTY, # Don't use this; mapped values go to op_kwargs_expand_input.
partial_kwargs=partial_kwargs,
task_id=task_id,
map_index_template=partial_kwargs.pop("map_index_template", None),
params=partial_params,
deps=MappedOperator.deps_for(self.operator_class),
operator_extra_links=self.operator_class.operator_extra_links,
Expand Down
5 changes: 5 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def partial(
priority_weight: int | ArgNotSet = NOTSET,
weight_rule: str | ArgNotSet = NOTSET,
sla: timedelta | None | ArgNotSet = NOTSET,
map_index_template: str | None | ArgNotSet = NOTSET,
max_active_tis_per_dag: int | None | ArgNotSet = NOTSET,
max_active_tis_per_dagrun: int | None | ArgNotSet = NOTSET,
on_execute_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback] | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -289,6 +290,7 @@ def partial(
"dag": dag,
"task_group": task_group,
"task_id": task_id,
"map_index_template": map_index_template,
"start_date": start_date,
"end_date": end_date,
"owner": owner,
Expand Down Expand Up @@ -781,6 +783,7 @@ def __init__(
resources: dict[str, Any] | None = None,
run_as_user: str | None = None,
task_concurrency: int | None = None,
map_index_template: str | None = None,
max_active_tis_per_dag: int | None = None,
max_active_tis_per_dagrun: int | None = None,
executor_config: dict | None = None,
Expand Down Expand Up @@ -933,6 +936,7 @@ def __init__(
self.max_active_tis_per_dag: int | None = max_active_tis_per_dag
self.max_active_tis_per_dagrun: int | None = max_active_tis_per_dagrun
self.do_xcom_push: bool = do_xcom_push
self.map_index_template: str | None = map_index_template
self.multiple_outputs: bool = multiple_outputs

self.doc_md = doc_md
Expand Down Expand Up @@ -1572,6 +1576,7 @@ def get_serialized_fields(cls):
"is_setup",
"is_teardown",
"on_failure_fail_dagrun",
"map_index_template",
}
)
DagContext.pop_context_managed_dag()
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def _expand(self, expand_input: ExpandInput, *, strict: bool) -> MappedOperator:
expand_input=expand_input,
partial_kwargs=partial_kwargs,
task_id=task_id,
map_index_template=partial_kwargs.pop("map_index_template", None),
params=self.params,
deps=MappedOperator.deps_for(self.operator_class),
operator_extra_links=self.operator_class.operator_extra_links,
Expand Down Expand Up @@ -280,6 +281,7 @@ class MappedOperator(AbstractOperator):
end_date: pendulum.DateTime | None
upstream_task_ids: set[str] = attr.ib(factory=set, init=False)
downstream_task_ids: set[str] = attr.ib(factory=set, init=False)
map_index_template: str | None

_disallow_kwargs_override: bool
"""Whether execution fails if ``expand_input`` has duplicates to ``partial_kwargs``.
Expand Down
21 changes: 18 additions & 3 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,7 @@ def get_triggering_events() -> dict[str, list[DatasetEvent | DatasetEventPydanti
"inlets": task.inlets,
"logical_date": logical_date,
"macros": macros,
"map_index_template": task.map_index_template,
"next_ds": get_next_ds(),
"next_ds_nodash": get_next_ds_nodash(),
"next_execution_date": get_next_execution_date(),
Expand Down Expand Up @@ -1252,6 +1253,7 @@ class TaskInstance(Base, LoggingMixin):
pid = Column(Integer)
executor_config = Column(ExecutorConfigType(pickler=dill))
updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow)
rendered_map_index = Column(String(64))

external_executor_id = Column(StringID())

Expand Down Expand Up @@ -2512,7 +2514,12 @@ def signal_handler(signum, frame):
self.task.params = context["params"]

with set_current_context(context):
task_orig = self.render_templates(context=context)
dag = self.task.get_dag()
if dag is not None:
jinja_env = dag.get_template_env()
else:
jinja_env = None
task_orig = self.render_templates(context=context, jinja_env=jinja_env)

if not test_mode:
rtif = RenderedTaskInstanceFields(ti=self, render_templates=False)
Expand Down Expand Up @@ -2547,10 +2554,16 @@ def signal_handler(signum, frame):
# Execute the task
with set_current_context(context):
result = self._execute_task(context, task_orig)

# Run post_execute callback
# Is never MappedOperator at this point
self.task.post_execute(context=context, result=result) # type: ignore[union-attr]

# DAG authors define map_index_template at the task level
if jinja_env is not None and (template := context.get("map_index_template")) is not None:
rendered_map_index = self.rendered_map_index = jinja_env.from_string(template).render(context)
self.log.info("Map index rendered as %s", rendered_map_index)

Stats.incr(f"operator_successes_{self.task.task_type}", tags=self.stats_tags)
# Same metric with tagging
Stats.incr("operator_successes", tags={**self.stats_tags, "task_type": self.task.task_type})
Expand Down Expand Up @@ -2922,7 +2935,9 @@ def overwrite_params_with_dag_run_conf(self, params: dict, dag_run: DagRun):
self.log.debug("Updating task params (%s) with DagRun.conf (%s)", params, dag_run.conf)
params.update(dag_run.conf)

def render_templates(self, context: Context | None = None) -> Operator:
def render_templates(
self, context: Context | None = None, jinja_env: jinja2.Environment | None = None
) -> Operator:
"""Render templates in the operator fields.
If the task was originally mapped, this may replace ``self.task`` with
Expand All @@ -2937,7 +2952,7 @@ def render_templates(self, context: Context | None = None) -> Operator:
# unmapped BaseOperator created by this function! This is because the
# MappedOperator is useless for template rendering, and we need to be
# able to access the unmapped task instead.
original_task.render_template_fields(context)
original_task.render_template_fields(context, jinja_env)

return original_task

Expand Down
1 change: 1 addition & 0 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
"ds_nodash",
"expanded_ti_count",
"inlets",
"map_index_template",
"next_ds",
"next_ds_nodash",
"outlets",
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/pydantic/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class TaskInstancePydantic(BaseModelPydantic, LoggingMixin):
pid: Optional[int]
executor_config: Any
updated_at: Optional[datetime]
rendered_map_index: Optional[str]
external_executor_id: Optional[str]
trigger_id: Optional[int]
trigger_timeout: Optional[datetime]
Expand Down
3 changes: 2 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,8 @@
"_log_config_logger_name": { "type": "string" },
"_is_mapped": { "const": true, "$comment": "only present when True" },
"expand_input": { "type": "object" },
"partial_kwargs": { "type": "object" }
"partial_kwargs": { "type": "object" },
"map_index_template": { "type": "string" }
},
"dependencies": {
"expand_input": ["partial_kwargs", "_is_mapped"],
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,7 @@ def deserialize_operator(cls, encoded_op: dict[str, Any]) -> Operator:
task_group=None,
start_date=None,
end_date=None,
map_index_template=None,
disallow_kwargs_override=encoded_op["_disallow_kwargs_override"],
expand_input_attr=encoded_op["_expand_input_attr"],
)
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"inlets",
"logical_date",
"macros",
"map_index_template",
"next_ds",
"next_ds_nodash",
"next_execution_date",
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/context.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Context(TypedDict, total=False):
inlets: list
logical_date: DateTime
macros: Any
map_index_template: str
next_ds: str | None
next_ds_nodash: str | None
next_execution_date: DateTime | None
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/static/js/components/Table/Cells.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import { getMetaValue } from "src/utils";
import { useContainerRef } from "src/context/containerRef";
import { SimpleStatus } from "src/dag/StatusBox";

interface CellProps {
export interface CellProps {
cell: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
value: any;
Expand Down
16 changes: 14 additions & 2 deletions airflow/www/static/js/dag/details/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
import { getDagRunLabel, getMetaValue, getTask } from "src/utils";
import useSelection from "src/dag/useSelection";
import Time from "src/components/Time";
import { useGridData } from "src/api";
import { useGridData, useTaskInstance } from "src/api";
import RunTypeIcon from "src/components/RunTypeIcon";

import BreadcrumbText from "./BreadcrumbText";
Expand All @@ -45,6 +45,15 @@ const Header = () => {
onSelect,
clearSelection,
} = useSelection();

const { data: taskInstance } = useTaskInstance({
dagId,
dagRunId: runId || "",
taskId: taskId || "",
mapIndex,
enabled: mapIndex !== undefined,
});

const dagRun = dagRuns.find((r) => r.runId === runId);

const group = getTask({ taskId, task: groups });
Expand Down Expand Up @@ -131,7 +140,10 @@ const Header = () => {
<BreadcrumbLink
_hover={isMappedTaskDetails ? { cursor: "default" } : undefined}
>
<BreadcrumbText label="Map Index" value={mapIndex} />
<BreadcrumbText
label="Map Index"
value={taskInstance?.renderedMapIndex || mapIndex}
/>
</BreadcrumbLink>
</BreadcrumbItem>
)}
Expand Down
6 changes: 6 additions & 0 deletions airflow/www/static/js/dag/details/taskInstance/Details.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ const Details = ({ gridInstance, taskInstance, group }: Props) => {
<Td>{taskInstance.mapIndex}</Td>
</Tr>
)}
{taskInstance?.renderedMapIndex !== undefined && (
<Tr>
<Td>Rendered Map Index</Td>
<Td>{taskInstance.renderedMapIndex}</Td>
</Tr>
)}
{!!taskInstance?.tryNumber && (
<Tr>
<Td>Try Number</Td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { StatusWithNotes } from "src/dag/StatusBox";
import { Table } from "src/components/Table";
import Time from "src/components/Time";
import { useOffsetTop } from "src/utils";
import type { CellProps } from "src/components/Table";

interface Props {
dagId: string;
Expand Down Expand Up @@ -69,6 +70,7 @@ const MappedInstances = ({ dagId, runId, taskId, onRowClicked }: Props) => {
() =>
taskInstances.map((mi) => ({
...mi,
renderedMapIndex: mi.renderedMapIndex,
state: (
<Flex alignItems="center">
<StatusWithNotes
Expand All @@ -94,6 +96,8 @@ const MappedInstances = ({ dagId, runId, taskId, onRowClicked }: Props) => {
{
Header: "Map Index",
accessor: "mapIndex",
Cell: ({ cell: { row } }: CellProps) =>
row.original.renderedMapIndex || row.original.mapIndex,
},
{
Header: "State",
Expand Down
6 changes: 6 additions & 0 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,12 @@ export interface components {
pid?: number | null;
executor_config?: string;
sla_miss?: components["schemas"]["SLAMiss"];
/**
* @description Rendered name of an expanded task instance, if the task is mapped.
*
* *New in version 2.9.0*
*/
rendered_map_index?: string | null;
/**
* @description JSON object describing rendered fields.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,44 @@ As well as a single parameter it is possible to pass multiple parameters to expa
This would result in the add task being called 6 times. Please note, however, that the order of expansion is not guaranteed.

Named mapping
-------------

By default, mapped tasks are assigned an integer index. It is possible to override the integer index for each mapped task in the Airflow UI with a name based on the task's input. This is done by providing a Jinja template for the task with ``map_index_template``. This template is rendered after each expanded task is executed using the task context. This means you can reference attributes on the task like this:

.. code-block:: python
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
...,
sql="SELECT * FROM data WHERE date = %(date)s",
map_index_template="""{{ task.parameters['date'] }}""",
).expand(
parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)
In the above example, the expanded task instances will be named "2024-01-01" and "2024-01-02". The names show up in the Airflow UI instead of "0" and "1", respectively.

Since the template is rendered after the main execution block, it is possible to also dynamically inject into the rendering context. This is useful when the logic to render a desirable name is difficult to express in the Jinja template syntax, particularly in a taskflow function. For example:

.. code-block:: python
from airflow.operators.python import get_current_context
@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
context = get_current_context()
context["my_variable"] = my_value * 3
... # Normal execution...
# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])
Mapping with non-TaskFlow operators
===================================

Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/templates-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ Variable Type Description
``{{ run_id }}`` str The currently running :class:`~airflow.models.dagrun.DagRun` run ID.
``{{ dag_run }}`` DagRun The currently running :class:`~airflow.models.dagrun.DagRun`.
``{{ test_mode }}`` bool Whether the task instance was run by the ``airflow test`` CLI.
``{{ map_index_template }}`` None | str Template used to render the expanded task instance of a mapped task. Setting this value will be reflected in the rendered result.
``{{ expanded_ti_count }}`` int | ``None`` | Number of task instances that a mapped task was expanded into. If
| the current task is not mapped, this should be ``None``.
| Added in version 2.5.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def test_mapped_task_instances(self, one_task_with_mapped_tis, session):
"queue": "default",
"queued_when": None,
"rendered_fields": {},
"rendered_map_index": None,
"sla_miss": None,
"start_date": "2020-01-01T00:00:00+00:00",
"state": "success",
Expand Down

0 comments on commit a603294

Please sign in to comment.