Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow email field to be templated #35546

Merged
merged 4 commits into from Nov 9, 2023

Conversation

gdavoian
Copy link
Contributor

@gdavoian gdavoian commented Nov 9, 2023

The current serialization logic treats all of the BaseOperator fields as non-templateable by default. However, the email field may considered an exception to that rule and may be allowed to be templated.

For example, our team heavily relies on templating this field (e.g., "{{ var.value.some_email_list }}") as the default_args["email"] for each and every DAG we have, and we don't want to use Variable.get("some_email_list") on the module level of our scripts instead because it's actually an anti-pattern (opening a DB connection and making a SQL query under the hood at parsing time and not at execution time).

See the discussion under #29821 for more details.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link

boring-cyborg bot commented Nov 9, 2023

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@utkarsharma2
Copy link
Contributor

utkarsharma2 commented Nov 9, 2023

@gdavoian Thanks for the PR! :)

I just wanted to understand this better, why email field? What benefits does it offer for all the operators?

@gdavoian
Copy link
Contributor Author

gdavoian commented Nov 9, 2023

@utkarsharma2 I've just updated the description of the PR with the use case of our team.

@potiuk
Copy link
Member

potiuk commented Nov 9, 2023

Can you also update/add test case (see the example test case in #29821 as an example.

@utkarsharma2
Copy link
Contributor

utkarsharma2 commented Nov 9, 2023

For example, our team heavily relies on templating this field (e.g., "{{ var.value.some_email_list }}") as the default_args["email"] for each and every DAG we have, and we don't want to use Variable.get("some_email_list") on the module level of our scripts instead because it's actually an anti-pattern (opening a DB connection and making a SQL query under the hood at parsing time and not at execution time).

@gdavoian No strong objections here, but I'm a little concerned that if it is a specific case of your team then the downside of this is we add an overhead of rendering this field for all operators in airflow for everyone who uses it.

Also, I checked the code, and Variable.get() is using a cache so it might not be a bad idea to use it, considering the downside. WDYT?

@gdavoian gdavoian force-pushed the allow_email_field_to_be_templated branch from 9be4e1b to e87ca0e Compare November 9, 2023 09:53
@gdavoian
Copy link
Contributor Author

gdavoian commented Nov 9, 2023

@utkarsharma2 But the email field isn't added to template_fields by default (BaseOperator.template_fields is empty), it's up to developers of third-party/custom operators to add it to their operators. So if they really want email to be templated and are aware of that overhead, then why not? :)

Honestly, I don't know how that caching works, but we use templating heavily (not only for email lists but also for things like the links to docker images, etc.), and currently, the only thing we can't really template is the email field coming from BaseOperator (e.g., we can easily template the image field of KubernetesOperator etc.).

@potiuk
Copy link
Member

potiuk commented Nov 9, 2023

For example, our team heavily relies on templating this field (e.g., "{{ var.value.some_email_list }}") as the default_args["email"] for each and every DAG we have, and we don't want to use Variable.get("some_email_list") on the module level of our scripts instead because it's actually an anti-pattern (opening a DB connection and making a SQL query under the hood at parsing time and not at execution time).

@gdavoian No strong objections here, but I'm a little concerned that if it is a specific case of your team then the downside of this is we add an overhead of rendering this field for all operators in airflow for everyone who uses it.

Also, I checked the code, and Variable.get() is using a cache so it might not be a bad idea to use it, considering the downside. WDYT?

Yeah. It looks like a useful case - mostly because of the specific place where it is used.

Actually this case is I think largerly superseeded by Notifiers, but having this specific exception for email does not seem to have a side-effect, and I can easily imagine how it might be useful. We've been discussing the reasoning for that with @gdavoian and @hussein-awala in #29821 (comment) and it seems like it would not hurt to add it

@potiuk
Copy link
Member

potiuk commented Nov 9, 2023

Nice!

@potiuk potiuk merged commit addbd58 into apache:main Nov 9, 2023
46 checks passed
Copy link

boring-cyborg bot commented Nov 9, 2023

Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions.

@gdavoian gdavoian deleted the allow_email_field_to_be_templated branch November 9, 2023 18:15
@hussein-awala
Copy link
Member

Since email is the only field we can exclude, I wonder if the user really needs it.

We have an alternative way to achieve that without using the email, email_on_retry, and email_on_failure params, where the user can use on_failure_callback and on_retry_callback with a SmtpNotifier which support templated email.

WDYT?

@gdavoian
Copy link
Contributor Author

gdavoian commented Nov 9, 2023

I think that having an alternative (and more customizable) way of sending email notifications via callbacks is a good thing, but in most cases, the basic mechanism implemented via the email field is more than enough, it's very straightforward, easy to set up, and it is exactly what a data engineering team usually needs (i.e., a quick way to know that something has failed and must be fixed).

Speaking about if the user needs it, I think yes. For example, we have multiple separate Airflow environments, with hundreds of DAGs overall, and we heavily rely on the email field and the ability to template it.

romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Nov 10, 2023
@potiuk
Copy link
Member

potiuk commented Nov 10, 2023

Since email is the only field we can exclude, I wonder if the user really needs it.

We have an alternative way to achieve that without using the email, email_on_retry, and email_on_failure params, where the user can use on_failure_callback and on_retry_callback with a SmtpNotifier which support templated email.

WDYT?

@hussein-awala Crossed my mind too actually ... I'd even say maybe a better overal solution would be to convert the email into SMTPNotifier under the hood (and treat it as syntactic sugar rather than field in the operator? Maybe then we could get rid of it being a field and get rid of the exclusion.

This does look like a special case and the notifiers are nicely addressing the problem.

Speaking about if the user needs it, I think yes. For example, we have multiple separate Airflow environments, with hundreds of DAGs overall, and we heavily rely on the email field and the ability to template it.

I see the need, but likely in this case it does not have to be FIELD in the operator, simply if it is set in the constructor we could - under-the-hood, configure notifiers automatically. That would not change anything in the API (you'd stil set templated email as email= - but then under the hood, the operator could set the email_on_* to use the SMTPNotifier with that email

@ephraimbuddy ephraimbuddy added the type:improvement Changelog: Improvements label Nov 20, 2023
@ephraimbuddy ephraimbuddy added this to the Airflow 2.8.0 milestone Nov 20, 2023
@tlochner95
Copy link

tlochner95 commented Jan 3, 2024

@gdavoian I just wanted to thank you for this contribution. I spent a couple of days on and off trying to figure out if this is possible. I've upgraded to v2.8.0 and am trying this functionality now, but am not able to get it working so far. Maybe I am doing something wrong, but I have my default_args listed as so:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': "{{ var.json.EMAILS_TO_RECEIVE_FAILURE_ALERTS }}",
    'email_on_failure': True,
    'email_on_retry': True
}

However, this is resulting in this error:

Traceback (most recent call last):
  File "/home/tlochner/my-local-airflow/.venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1000, in _email_alert
    send_email(task.email, subject, html_content)
  File "/home/tlochner/my-local-airflow/.venv/lib/python3.8/site-packages/airflow/utils/email.py", line 80, in send_email
    return backend(
  File "/home/tlochner/my-local-airflow/.venv/lib/python3.8/site-packages/airflow/utils/email.py", line 154, in send_email_smtp
    send_mime_email(e_from=mail_from, e_to=recipients, mime_msg=msg, conn_id=conn_id, dryrun=dryrun)
  File "/home/tlochner/my-local-airflow/.venv/lib/python3.8/site-packages/airflow/utils/email.py", line 282, in send_mime_email
    smtp_conn.sendmail(e_from, e_to, mime_msg.as_string())
  File "/usr/lib/python3.8/smtplib.py", line 894, in sendmail
    raise SMTPRecipientsRefused(senderrs)
smtplib.SMTPRecipientsRefused: {'{{ var.json.EMAILS_TO_RECEIVE_FAILURE_ALERTS }}': (501, b'5.1.3 Invalid address')}

Am I doing something wrong here? I have my Variable defined in the Airflow UI, and the value looks like this:
['myemail@email.com']

Any help is greatly appreciated.
Thank you!

@gdavoian
Copy link
Contributor Author

gdavoian commented Jan 4, 2024

@tlochner95 it's nice to hear that I'm not alone :)

First of all, I would join the items of your email list with commas, as it's rendered as a string, not as a list (unless you set render_template_as_native_obj=True on your DAG object):

'email': "{{ ','.join( var.json.EMAILS_TO_RECEIVE_FAILURE_ALERTS ) }}",

Secondly, the fact that email is allowed to be templated doesn't mean it's templated by default. Unfortunately, that's not the case. So you must explicitly add email to template_fields of your custom operators.

class MyOperator(BaseOperator):
    template_fields = ('email',)

If you mostly use built-in or third-party operators, another option is to implement a utility function that monkey-patches operators in the current scope on the fly, which is exactly what we're doing. I can't share the source code, but here's a hint on how you could implement it yourself: start with BaseOperator, modify template_fields by adding email for all of its subclasses (BaseOperator.__subclasses__()), and then recursively do the same for each subclass, etc. Don't worry about infinite recursion; only the subclasses defined within the imported modules will be visible through __subclasses__(), and nothing more, typically a handful of classes in total. Finally, just call the function once per DAG file and enjoy the convenience of enabled templating for emails, which is disabled in Airflow by default.

@potiuk
Copy link
Member

potiuk commented Jan 4, 2024

Just to add to it. It's actually rather easy to update template_field in your Dag - you do not need to recursively update BaseOperator.

from .... import AnyOperator

AnyOperator.template_fields = ('email',)

task = AnyOperator(....) 

That one should do the job.

@tlochner95
Copy link

Thank you both very much for your responses. I was able to get a working example using both of your help. Unfortunately for us, we are using AWS MWAA to host Airflow and will need to wait until they support v2.8.0 to be able to use the feature, but until then thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants