Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to access operator attrs within Jinja context for mapped tasks #24388

Closed
1 of 2 tasks
josh-fell opened this issue Jun 11, 2022 · 5 comments · Fixed by #26702
Closed
1 of 2 tasks

Unable to access operator attrs within Jinja context for mapped tasks #24388

josh-fell opened this issue Jun 11, 2022 · 5 comments · Fixed by #26702
Assignees
Labels
affected_version:2.3 Issues Reported for 2.3 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@josh-fell
Copy link
Contributor

josh-fell commented Jun 11, 2022

Apache Airflow version

2.3.2 (latest released)

What happened

When attempting to generate mapped SQL tasks using a Jinja-templated query that access operator attributes, an exception like the following is thrown:

jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute '<operator attribute>'

For example, when attempting to map SQLValueCheckOperator tasks with respect to database using a query of SELECT COUNT(*) FROM {{ task.database }}.tbl;:
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'database'

Or, when using SnowflakeOperator and mapping via parameters of a query like SELECT * FROM {{ task.parameters.tbl }};:
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'parameters'

What you think should happen instead

When using Jinja-template SQL queries, the attribute that is being using for the mapping should be accessible via {{ task.<operator attribute> }}. Executing the same SQL query with classic, non-mapped tasks allows for this operator attr access from the task context object.

Ideally, the same interface should apply for both non-mapped and mapped tasks. Also with the preference of using parameters over params in SQL-type operators, having the ability to map over parameters will help folks move from using params to parameters.

How to reproduce

Consider the following DAG:

from pendulum import datetime

from airflow.decorators import dag
from airflow.operators.sql import SQLValueCheckOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator


CORE_SQL = "SELECT COUNT(*) FROM {{ task.database }}.tbl;"
SNOWFLAKE_SQL = """SELECT * FROM {{ task.parameters.tbl }};"""


@dag(dag_id="map-city", start_date=datetime(2022, 6, 7), schedule_interval=None)
def map_city():
        classic_sql_value_check = SQLValueCheckOperator(
        task_id="classic_sql_value_check",
        conn_id="snowflake",
        sql=CORE_SQL,
        database="dev",
        pass_value=20000,
    )

    mapped_value_check = SQLValueCheckOperator.partial(
        task_id="check_row_count",
        conn_id="snowflake",
        sql=CORE_SQL,
        pass_value=20000,
    ).expand(database=["dev", "production"])

    classic_snowflake_task = SnowflakeOperator(
        task_id="classic_snowflake_task",
        snowflake_conn_id="snowflake",
        sql=SNOWFLAKE_SQL,
        parameters={"tbl": "foo"},
    )

    mapped_snowflake_task = SnowflakeOperator.partial(
        task_id="mapped_snowflake_task", snowflake_conn_id="snowflake", sql=SNOWFLAKE_SQL
    ).expand(
        parameters=[
            {"tbl": "foo"},
            {"tbl": "bar"},
        ]
    )


_ = map_city()

SQLValueCheckOperator tasks
The logs for the "classic_sql_value_check", non-mapped task show the query executing as expected:
[2022-06-11, 02:01:03 UTC] {sql.py:204} INFO - Executing SQL check: SELECT COUNT(*) FROM dev.tbl;
while the mapped "check_row_count" task fails with the following exception:

[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'map-city', 'check_row_count', 'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '350', '--raw', '--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmpm5bg9mt5', '--map-index', '0', '--error-file', '/tmp/tmp2kbilt2l']
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 350: Subtask check_row_count
[2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running <TaskInstance: map-city.check_row_count manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host 569596df5be5
[2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1451, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1555, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2212, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 726, in render_template_fields
    self._do_render_template_fields(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 344, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 391, in render_template
    return render_template_to_string(template, context)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 296, in render_template_to_string
    return render_template(template, context, native=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 291, in render_template
    return "".join(nodes)
  File "<template>", line 13, in root
  File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'database'

SnowflakeOperator tasks
Similarly, the "classic_snowflake_task" non-mapped task is able to execute the SQL query as expected:
[2022-06-11, 02:01:04 UTC] {snowflake.py:324} INFO - Running statement: SELECT * FROM foo;, parameters: {'tbl': 'foo'}
while the mapped "mapped_snowflake_task task fails to execute the query:

[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'map-city', 'mapped_snowflake_task', 'manual__2022-06-11T02:01:01.831761+00:00', '--job-id', '347', '--raw', '--subdir', 'DAGS_FOLDER/map_city.py', '--cfg-path', '/tmp/tmp6kmqs5ew', '--map-index', '0', '--error-file', '/tmp/tmpkufg9xqx']
[2022-06-11, 02:01:03 UTC] {standard_task_runner.py:80} INFO - Job 347: Subtask mapped_snowflake_task
[2022-06-11, 02:01:03 UTC] {task_command.py:370} INFO - Running <TaskInstance: map-city.mapped_snowflake_task manual__2022-06-11T02:01:01.831761+00:00 map_index=0 [running]> on host 569596df5be5
[2022-06-11, 02:01:03 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1451, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1555, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2212, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 726, in render_template_fields
    self._do_render_template_fields(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 344, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/usr/local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 391, in render_template
    return render_template_to_string(template, context)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 296, in render_template_to_string
    return render_template(template, context, native=False)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 291, in render_template
    return "".join(nodes)
  File "<template>", line 13, in root
  File "/usr/local/lib/python3.9/site-packages/jinja2/sandbox.py", line 326, in getattr
    value = getattr(obj, attribute)
  File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 910, in __getattr__
    return self._fail_with_undefined_error()
  File "/usr/local/lib/python3.9/site-packages/jinja2/runtime.py", line 903, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'airflow.models.mappedoperator.MappedOperator object' has no attribute 'parameters'

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

apache-airflow-providers-snowflake==2.7.0

Deployment

Astronomer

Deployment details

Astronomer Runtime 5.0.3

Anything else

Even though using the {{ task.<operator attr> }} method does not work for mapped tasks, there is a workaround. Given the SnowflakeOperator example from above attempting to execute the query: SELECT * FROM {{ task.parameters.tbl }};, users can modify the templated query to SELECT * FROM {{ task.mapped_kwargs.parameters[ti.map_index].tbl }}; for successful execution. This workaround isn't very obvious though and requires from solid digging into the new 2.3.0 code.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@josh-fell josh-fell added kind:bug This is a clearly a bug area:core labels Jun 11, 2022
@josh-fell josh-fell added the affected_version:2.3 Issues Reported for 2.3 label Jun 14, 2022
@mrn-aglic
Copy link

The proposed workaround: SELECT * FROM {{ task.mapped_kwargs.parameters[ti.map_index].tbl }};
Doesn't seem to be working on airflow 2.4.0.
I get the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 410, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 474, in render_template
    return render_template_to_string(template, context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 288, in render_template_to_string
    return render_template(template, cast(MutableMapping[str, Any], context), native=False)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 283, in render_template
    return "".join(nodes)
  File "<template>", line 14, in root
  File "/home/airflow/.local/lib/python3.9/site-packages/jinja2/sandbox.py", line 326, in getattr
    value = getattr(obj, attribute)
  File "/home/airflow/.local/lib/python3.9/site-packages/jinja2/runtime.py", line 859, in __getattr__
    return self._fail_with_undefined_error()
  File "/home/airflow/.local/lib/python3.9/site-packages/jinja2/runtime.py", line 852, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: '***.models.mappedoperator.MappedOperator object' has no attribute 'mapped_kwargs'
[2022-09-26, 11:23:52 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1457, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1576, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2193, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 764, in render_template_fields
    self._do_render_template_fields(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 410, in _do_render_template_fields
    rendered_content = self.render_template(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/abstractoperator.py", line 474, in render_template
    return render_template_to_string(template, context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 288, in render_template_to_string
    return render_template(template, cast(MutableMapping[str, Any], context), native=False)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/helpers.py", line 283, in render_template
    return "".join(nodes)
  File "<template>", line 14, in root
  File "/home/airflow/.local/lib/python3.9/site-packages/jinja2/sandbox.py", line 326, in getattr
    value = getattr(obj, attribute)
  File "/home/airflow/.local/lib/python3.9/site-packages/jinja2/runtime.py", line 859, in __getattr__
    return self._fail_with_undefined_error()
  File "/home/airflow/.local/lib/python3.9/site-packages/jinja2/runtime.py", line 852, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: '***.models.mappedoperator.MappedOperator object' has no attribute 'mapped_kwargs'

@uranusjr
Copy link
Member

The mapped_kwargs attribute is intended to be private and is subject to breakages. The equivalent in 2.4 is task.expand_input.value.

The fix has been included in #26100. Once that’s merged (to main and released in 2.5), I plan to write up a patch to backport the behaviour to 2.4, so task points to the unmapped operator instead to allow users to simply access the attribute directly. If this means a lot to you, feel free to do the backporting yourself—the relevant part is adding context_update_for_unmapped and calling it at the correct place.

@uranusjr uranusjr self-assigned this Sep 26, 2022
@mrn-aglic
Copy link

Thanks for the response. I tried to do it a couple of ways, including this:
task.expand_input.value.parameters.batch_start

But I get the following error:


[2022-09-27, 07:36:38 UTC] {crypto.py:84} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-09-27, 07:36:38 UTC] {base.py:71} INFO - Using connection ID 'src' for task execution.
[2022-09-27, 07:36:38 UTC] {sql.py:315} INFO - Running statement: SELECT * FROM match_scores OFFSET {{ task_instance.xcom_pull(task_ids='get_batch_starts', dag_id='task_mapping_classic_operators', key='batch_start') }} LIMIT 80, parameters: {'batch_start': '80', 'batch_end': '160'}
[2022-09-27, 07:36:38 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/postgres/operators/postgres.py", line 92, in execute
    self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py", line 295, in run
    self._run_command(cur, sql_statement, parameters)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/providers/common/sql/hooks/sql.py", line 318, in _run_command
    cur.execute(sql_statement, parameters)
psycopg2.errors.SyntaxError: syntax error at or near "{"
LINE 1: SELECT * FROM match_scores OFFSET {{ task_instance.xcom_pull...
                                          ^

[2022-09-27, 07:36:38 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=task_mapping_classic_operators, task_id=batch_select, map_index=1, execution_date=20220927T040000, start_date=20220927T073637, end_date=20220927T073638
[2022-09-27, 07:36:38 UTC] {standard_task_runner.py:102} ERROR - Failed to execute job 171 for task batch_select (syntax error at or near "{"
LINE 1: SELECT * FROM match_scores OFFSET {{ task_instance.xcom_pull...
                                          ^

Here is the full operator definition:

 batch_select_op = PostgresOperator.partial(
        task_id="batch_select",
        postgres_conn_id="src",
        sql=f"SELECT * FROM match_scores OFFSET {{{{ task.expand_input.value.parameters.batch_start }}}} LIMIT {BATCH_SIZE}"
    ).expand(parameters=get_batch_starts.output)

@uranusjr
Copy link
Member

Unfortunately there’s currently no way to access the resolved value of a mapped operator.

@ashb
Copy link
Member

ashb commented Sep 27, 2022

Unfortunately there’s currently no way to access the resolved value of a mapped operator.

No way currently. We'll add it in 2.4.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.3 Issues Reported for 2.3 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants