Skip to content

Commit

Permalink
Fix examples for Amazon notifiers (#34009)
Browse files Browse the repository at this point in the history
  • Loading branch information
Taragolis committed Sep 1, 2023
1 parent 5595075 commit 9144308
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 7 deletions.
Expand Up @@ -39,10 +39,13 @@ Example Code:
from airflow.providers.amazon.aws.notifications.chime import send_chime_notification
with DAG(
dag_id="mydag",
schedule="@once",
start_date=datetime(2023, 6, 27),
on_success_callback=[
send_chime_notification(chime_conn_id="my_chime_conn", message="The DAG {{ dag.dag_id }} succeeded")
],
catchup=False,
):
BashOperator(
task_id="mytask",
Expand Down
16 changes: 11 additions & 5 deletions docs/apache-airflow-providers-amazon/notifications/sns.rst
Expand Up @@ -17,12 +17,12 @@
.. _howto/notifier:SnsNotifier:

How-to Guide for SNS notifications
==================================
How-to Guide for Amazon Simple Notification Service (Amazon SNS) notifications
==============================================================================

Introduction
------------
`AWS SNS <https://aws.amazon.com/sns/>`__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier`
`Amazon SNS <https://aws.amazon.com/sns/>`__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier`
allows users to push messages to a SNS Topic using the various ``on_*_callbacks`` at both the DAG level and Task level.

You can also use a notifier with ``sla_miss_callback``.
Expand All @@ -38,7 +38,7 @@ Example Code:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.notifications.sns import send_sns_notification
dag_failure_sns_notification = send_sns_notification(
Expand All @@ -54,7 +54,13 @@ Example Code:
target_arn="arn:aws:sns:us-west-2:123456789098:AnotherTopicName",
)
with DAG(start_date=datetime(2023, 1, 1), on_failure_callback=[dag_failure_sns_notification]):
with DAG(
dag_id="mydag",
schedule="@once",
start_date=datetime(2023, 1, 1),
on_failure_callback=[dag_failure_sns_notification],
catchup=False,
):
BashOperator(
task_id="mytask",
on_failure_callback=[task_failure_sns_notification],
Expand Down
45 changes: 43 additions & 2 deletions tests/providers/amazon/aws/notifications/test_sns.py
Expand Up @@ -20,6 +20,7 @@

import pytest

from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.notifications.sns import SnsNotifier, send_sns_notification
from airflow.utils.types import NOTSET

Expand All @@ -34,20 +35,60 @@ def test_class_and_notifier_are_same(self):
@pytest.mark.parametrize("region_name", ["eu-west-2", None, PARAM_DEFAULT_VALUE])
def test_parameters_propagate_to_hook(self, aws_conn_id, region_name):
"""Test notifier attributes propagate to SnsHook."""
notifier_kwargs = {
publish_kwargs = {
"target_arn": "arn:aws:sns:us-west-2:123456789098:TopicName",
"message": "foo-bar",
"subject": "spam-egg",
"message_attributes": {},
}
notifier_kwargs = {}
if aws_conn_id is not NOTSET:
notifier_kwargs["aws_conn_id"] = aws_conn_id
if region_name is not NOTSET:
notifier_kwargs["region_name"] = region_name

notifier = SnsNotifier(**notifier_kwargs)
notifier = SnsNotifier(**notifier_kwargs, **publish_kwargs)
with mock.patch("airflow.providers.amazon.aws.notifications.sns.SnsHook") as mock_hook:
hook = notifier.hook
assert hook is notifier.hook, "Hook property not cached"
mock_hook.assert_called_once_with(
aws_conn_id=(aws_conn_id if aws_conn_id is not NOTSET else "aws_default"),
region_name=(region_name if region_name is not NOTSET else None),
)

# Basic check for notifier
notifier.notify({})
mock_hook.return_value.publish_to_target.assert_called_once_with(**publish_kwargs)

def test_sns_notifier_templated(self, dag_maker):
with dag_maker("test_sns_notifier_templated") as dag:
EmptyOperator(task_id="task1")

notifier = SnsNotifier(
aws_conn_id="{{ dag.dag_id }}",
target_arn="arn:aws:sns:{{ var_region }}:{{ var_account }}:{{ var_topic }}",
message="I, {{ var_username }}",
subject="{{ var_subject }}",
message_attributes={"foo": "{{ dag.dag_id }}"},
region_name="{{ var_region }}",
)
with mock.patch("airflow.providers.amazon.aws.notifications.sns.SnsHook") as m:
notifier(
{
"dag": dag,
"var_username": "Robot",
"var_region": "us-west-1",
"var_account": "000000000000",
"var_topic": "AwesomeTopic",
"var_subject": "spam-egg",
}
)
# Hook initialisation
m.assert_called_once_with(aws_conn_id="test_sns_notifier_templated", region_name="us-west-1")
# Publish message
m.return_value.publish_to_target.assert_called_once_with(
target_arn="arn:aws:sns:us-west-1:000000000000:AwesomeTopic",
message="I, Robot",
subject="spam-egg",
message_attributes={"foo": "test_sns_notifier_templated"},
)
35 changes: 35 additions & 0 deletions tests/providers/amazon/aws/notifications/test_sqs.py
Expand Up @@ -20,6 +20,7 @@

import pytest

from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier, send_sqs_notification
from airflow.utils.types import NOTSET

Expand Down Expand Up @@ -59,3 +60,37 @@ def test_parameters_propagate_to_hook(self, aws_conn_id, region_name):
# Basic check for notifier
notifier.notify({})
mock_hook.return_value.send_message.assert_called_once_with(**send_message_kwargs)

def test_sqs_notifier_templated(self, dag_maker):
with dag_maker("test_sns_notifier_templated") as dag:
EmptyOperator(task_id="task1")

notifier = SqsNotifier(
aws_conn_id="{{ dag.dag_id }}",
queue_url="https://sqs.{{ var_region }}.amazonaws.com/{{ var_account }}/{{ var_queue }}",
message_body="The {{ var_username|capitalize }} Show",
message_attributes={"bar": "{{ dag.dag_id }}"},
message_group_id="{{ var_group_id }}",
region_name="{{ var_region }}",
)
with mock.patch("airflow.providers.amazon.aws.notifications.sqs.SqsHook") as m:
notifier(
{
"dag": dag,
"var_username": "truman",
"var_region": "ca-central-1",
"var_account": "123321123321",
"var_queue": "AwesomeQueue",
"var_group_id": "spam",
}
)
# Hook initialisation
m.assert_called_once_with(aws_conn_id="test_sns_notifier_templated", region_name="ca-central-1")
# Send message
m.return_value.send_message.assert_called_once_with(
queue_url="https://sqs.ca-central-1.amazonaws.com/123321123321/AwesomeQueue",
message_body="The Truman Show",
message_group_id="spam",
message_attributes={"bar": "test_sns_notifier_templated"},
delay_seconds=0,
)

0 comments on commit 9144308

Please sign in to comment.