Skip to content

Commit

Permalink
Add Amazon SES hook (#10004)
Browse files Browse the repository at this point in the history
- refactor airflow.utils.email and add typing
  • Loading branch information
ipeluffo committed Aug 10, 2020
1 parent a3386e5 commit f06fe61
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 11 deletions.
100 changes: 100 additions & 0 deletions airflow/providers/amazon/aws/hooks/ses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This module contains AWS SES Hook
"""
from typing import Any, Dict, Iterable, List, Optional, Union

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.utils.email import build_mime_message


class SESHook(AwsBaseHook):
"""
Interact with Amazon Simple Email Service.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""

def __init__(self, *args, **kwargs) -> None:
kwargs['client_type'] = 'ses'
super().__init__(*args, **kwargs)

def send_email( # pylint: disable=too-many-arguments
self,
mail_from: str,
to: Union[str, Iterable[str]],
subject: str,
html_content: str,
files: Optional[List[str]] = None,
cc: Optional[Union[str, Iterable[str]]] = None,
bcc: Optional[Union[str, Iterable[str]]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
reply_to: Optional[str] = None,
return_path: Optional[str] = None,
custom_headers: Optional[Dict[str, Any]] = None
) -> dict:
"""
Send email using Amazon Simple Email Service
:param mail_from: Email address to set as email's from
:param to: List of email addresses to set as email's to
:param subject: Email's subject
:param html_content: Content of email in HTML format
:param files: List of paths of files to be attached
:param cc: List of email addresses to set as email's CC
:param bcc: List of email addresses to set as email's BCC
:param mime_subtype: Can be used to specify the subtype of the message. Default = mixed
:param mime_charset: Email's charset. Default = UTF-8.
:param return_path: The email address to which replies will be sent. By default, replies
are sent to the original sender's email address.
:param reply_to: The email address to which message bounces and complaints should be sent.
"Return-Path" is sometimes called "envelope from," "envelope sender," or "MAIL FROM."
:param custom_headers: Additional headers to add to the MIME message.
No validations are run on these values and they should be able to be encoded.
:return: Response from Amazon SES service with unique message identifier.
"""
ses_client = self.get_conn()

custom_headers = custom_headers or {}
if reply_to:
custom_headers['Reply-To'] = reply_to
if return_path:
custom_headers['Return-Path'] = return_path

message, recipients = build_mime_message(
mail_from=mail_from,
to=to,
subject=subject,
html_content=html_content,
files=files,
cc=cc,
bcc=bcc,
mime_subtype=mime_subtype,
mime_charset=mime_charset,
custom_headers=custom_headers,
)

return ses_client.send_raw_email(
Source=mail_from, Destinations=recipients, RawMessage={'Data': message.as_string()}
)
76 changes: 66 additions & 10 deletions airflow/utils/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formatdate
from typing import Iterable, List, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
Expand All @@ -47,22 +47,74 @@ def send_email(to: Union[List[str], Iterable[str]], subject: str, html_content:
mime_subtype=mime_subtype, mime_charset=mime_charset, **kwargs)


def send_email_smtp(to, subject, html_content, files=None,
dryrun=False, cc=None, bcc=None,
mime_subtype='mixed', mime_charset='utf-8',
**kwargs):
def send_email_smtp(
to: Union[str, Iterable[str]],
subject: str,
html_content: str,
files: Optional[List[str]] = None,
dryrun: bool = False,
cc: Optional[Union[str, Iterable[str]]] = None,
bcc: Optional[Union[str, Iterable[str]]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
**kwargs,
):
"""
Send an email with html content
>>> send_email('test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True)
"""
smtp_mail_from = conf.get('smtp', 'SMTP_MAIL_FROM')

msg, recipients = build_mime_message(
mail_from=smtp_mail_from,
to=to,
subject=subject,
html_content=html_content,
files=files,
cc=cc,
bcc=bcc,
mime_subtype=mime_subtype,
mime_charset=mime_charset,
)

send_mime_email(e_from=smtp_mail_from, e_to=recipients, mime_msg=msg, dryrun=dryrun)


def build_mime_message(
mail_from: str,
to: Union[str, Iterable[str]],
subject: str,
html_content: str,
files: Optional[List[str]] = None,
cc: Optional[Union[str, Iterable[str]]] = None,
bcc: Optional[Union[str, Iterable[str]]] = None,
mime_subtype: str = 'mixed',
mime_charset: str = 'utf-8',
custom_headers: Optional[Dict[str, Any]] = None,
) -> Tuple[MIMEMultipart, List[str]]:
"""
Build a MIME message that can be used to send an email and
returns full list of recipients.
:param mail_from: Email address to set as email's from
:param to: List of email addresses to set as email's to
:param subject: Email's subject
:param html_content: Content of email in HTML format
:param files: List of paths of files to be attached
:param cc: List of email addresses to set as email's CC
:param bcc: List of email addresses to set as email's BCC
:param mime_subtype: Can be used to specify the subtype of the message. Default = mixed
:param mime_charset: Email's charset. Default = UTF-8.
:param custom_headers: Additional headers to add to the MIME message.
No validations are run on these values and they should be able to be encoded.
:return: Email as MIMEMultipart and list of recipients' addresses.
"""
to = get_email_address_list(to)

msg = MIMEMultipart(mime_subtype)
msg['Subject'] = subject
msg['From'] = smtp_mail_from
msg['From'] = mail_from
msg['To'] = ", ".join(to)
recipients = to
if cc:
Expand All @@ -86,14 +138,18 @@ def send_email_smtp(to, subject, html_content, files=None,
file.read(),
Name=basename
)
part['Content-Disposition'] = 'attachment; filename="%s"' % basename
part['Content-ID'] = '<%s>' % basename
part['Content-Disposition'] = f'attachment; filename="{basename}"'
part['Content-ID'] = f'<{basename}>'
msg.attach(part)

send_mime_email(smtp_mail_from, recipients, msg, dryrun)
if custom_headers:
for header_key, header_value in custom_headers.items():
msg[header_key] = header_value

return msg, recipients


def send_mime_email(e_from, e_to, mime_msg, dryrun=False):
def send_mime_email(e_from: str, e_to: List[str], mime_msg: MIMEMultipart, dryrun: bool = False) -> None:
"""
Send MIME email.
"""
Expand Down
6 changes: 6 additions & 0 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,12 @@ These integrations allow you to perform various operations within the Amazon Web
:mod:`airflow.providers.amazon.aws.sensors.sagemaker_transform`,
:mod:`airflow.providers.amazon.aws.sensors.sagemaker_tuning`

* - `Amazon Simple Email Service (SES) <https://aws.amazon.com/ses/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.ses`
-
-

* - `Amazon Simple Notification Service (SNS) <https://aws.amazon.com/sns/>`__
-
- :mod:`airflow.providers.amazon.aws.hooks.sns`
Expand Down
78 changes: 78 additions & 0 deletions tests/providers/amazon/aws/hooks/test_ses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import boto3
import pytest
from moto import mock_ses

from airflow.providers.amazon.aws.hooks.ses import SESHook

boto3.setup_default_session()


@mock_ses
def test_get_conn():
hook = SESHook(aws_conn_id='aws_default')
assert hook.get_conn() is not None


@mock_ses
@pytest.mark.parametrize('to',
[
'to@domain.com',
['to1@domain.com', 'to2@domain.com'],
'to1@domain.com,to2@domain.com'
])
@pytest.mark.parametrize('cc',
[
'cc@domain.com',
['cc1@domain.com', 'cc2@domain.com'],
'cc1@domain.com,cc2@domain.com'
])
@pytest.mark.parametrize('bcc',
[
'bcc@domain.com',
['bcc1@domain.com', 'bcc2@domain.com'],
'bcc1@domain.com,bcc2@domain.com'
])
def test_send_email(to, cc, bcc):
# Given
hook = SESHook()
ses_client = hook.get_conn()
mail_from = 'test_from@domain.com'

# Amazon only allows to send emails from verified addresses,
# then we need to validate the from address before sending the email,
# otherwise this test would raise a `botocore.errorfactory.MessageRejected` exception
ses_client.verify_email_identity(EmailAddress=mail_from)

# When
response = hook.send_email(
mail_from=mail_from,
to=to,
subject='subject',
html_content='<html>Test</html>',
cc=cc,
bcc=bcc,
reply_to='reply_to@domain.com',
return_path='return_path@domain.com',
)

# Then
assert response is not None
assert isinstance(response, dict)
assert 'MessageId' in response
24 changes: 23 additions & 1 deletion tests/utils/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from airflow import utils
from airflow.configuration import conf
from airflow.utils.email import get_email_address_list
from airflow.utils.email import build_mime_message, get_email_address_list
from tests.test_utils.config import conf_vars

EMAILS = ['test1@example.com', 'test2@example.com']
Expand Down Expand Up @@ -96,6 +96,28 @@ def test_custom_backend(self, mock_send_email):
cc=None, bcc=None, mime_charset='utf-8', mime_subtype='mixed')
self.assertFalse(mock_send_email.called)

def test_build_mime_message(self):
mail_from = 'from@example.com'
mail_to = 'to@example.com'
subject = 'test subject'
html_content = '<html>Test</html>'
custom_headers = {'Reply-To': 'reply_to@example.com'}

msg, recipients = build_mime_message(
mail_from=mail_from,
to=mail_to,
subject=subject,
html_content=html_content,
custom_headers=custom_headers,
)

self.assertIn('From', msg)
self.assertIn('To', msg)
self.assertIn('Subject', msg)
self.assertIn('Reply-To', msg)
self.assertListEqual([mail_to], recipients)
self.assertEqual(msg['To'], ','.join(recipients))


@conf_vars({('smtp', 'SMTP_SSL'): 'False'})
class TestEmailSmtp(unittest.TestCase):
Expand Down

0 comments on commit f06fe61

Please sign in to comment.