Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Airflow logging for operator Jinja template processing #25452

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import datetime
import inspect
import logging
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -77,6 +78,9 @@
)


logger = logging.getLogger("airflow.models.abstractoperator.AbstractOperator")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logger instance, following the same pattern as baseoperator.py.



class AbstractOperator(LoggingMixin, DAGNode):
"""Common implementation for operators, including unmapped and mapped.

Expand Down Expand Up @@ -345,13 +349,22 @@ def _do_render_template_fields(
)
if not value:
continue
rendered_content = self.render_template(
value,
context,
jinja_env,
seen_oids,
)
setattr(parent, attr_name, rendered_content)
try:
rendered_content = self.render_template(
value,
context,
jinja_env,
seen_oids,
)
except Exception:
logger.exception(
f"Exception rendering Jinja template for in "
f"task '{self.task_id}', field "
f"'{attr_name}'. Template: {value!r}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think self.log.exception is better here; baseoperator needed an additional logger for free functions outside a class, but this inherits LoggingMixin and does not need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

raise
else:
setattr(parent, attr_name, rendered_content)

def render_template(
self,
Expand Down Expand Up @@ -391,8 +404,8 @@ def render_template(
template = jinja_env.from_string(value)
dag = self.get_dag()
if dag and dag.render_template_as_native_obj:
return render_template_as_native(template, context)
return render_template_to_string(template, context)
return render_template_as_native(template, context) # type: ignore[arg-type]
return render_template_to_string(template, context) # type: ignore[arg-type]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Work around mypy failures. These issues seemed to be unrelated to my changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the errors? We do not want to introduce necessary ignores into the code base; if the errors are indeed unrelated, they need to be fixed first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the errors. When you say:

they need to be fixed first.

Are you requesting that I fix them, or someone else will fix them?

airflow/models/abstractoperator.py:402: error: Argument 2 to "render_template_as_native" has incompatible type "Context"; expected "MutableMapping[str, Any]"  [arg-type]
                    return render_template_as_native(template, context)
                                                               ^
airflow/models/abstractoperator.py:403: error: Argument 2 to "render_template_to_string" has incompatible type "Context"; expected "MutableMapping[str, Any]"  [arg-type]
                return render_template_to_string(template, context)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the # type: ignore comments. We'll see if the CI requires them. Local mypy didn't like those lines.

For what it's worth, these are the versions of mypy and related packages in my Breeze Docker environment (newly set up yesterday):

mypy==0.950
mypy-boto3-appflow==1.24.36.post1
mypy-boto3-rds==1.24.38
mypy-boto3-redshift-data==1.24.36.post1
mypy-extensions==0.4.3


if isinstance(value, (DagParam, XComArg)):
return value.resolve(context)
Expand Down
54 changes: 54 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,3 +778,57 @@ def task0():

MockOperator(task_id="task1", arg1=task0())
copy.deepcopy(dag)


@pytest.mark.parametrize(
("task", "context", "expected_exception", "expected_rendering", "expected_log", "not_expected_log"),
[
# Simple success case.
(
MockOperator(task_id="op1", arg1="{{ foo }}"),
dict(foo="footemplated"),
None,
dict(arg1="footemplated"),
None,
"Exception rendering Jinja template",
),
# Jinja syntax error.
(
MockOperator(task_id="op1", arg1="{{ foo"),
dict(),
jinja2.TemplateSyntaxError,
None,
"Exception rendering Jinja template for in task 'op1', field 'arg1'. Template: '{{ foo'",
None,
),
# Type error
(
MockOperator(task_id="op1", arg1="{{ foo + 1 }}"),
dict(foo="footemplated"),
TypeError,
None,
"Exception rendering Jinja template for in task 'op1', field 'arg1'. Template: '{{ foo + 1 }}'",
None,
),
],
)
def test_render_template_fields_logging(
caplog, task, context, expected_exception, expected_rendering, expected_log, not_expected_log
):
"""Verify if operator attributes are correctly templated."""
# Trigger templating and verify results
def _do_render():
task.render_template_fields(context=context)

with caplog.at_level(logging.ERROR, "airflow.models.abstractoperator.AbstractOperator"):
if expected_exception:
with pytest.raises(expected_exception):
_do_render()
else:
_do_render()
for k, v in expected_rendering.items():
assert getattr(task, k) == v
if expected_log:
assert expected_log in caplog.text
if not_expected_log:
assert not_expected_log not in caplog.text