From 9144308b3d6202bcf180eb2126b603d66be9fb38 Mon Sep 17 00:00:00 2001 From: Andrey Anshin Date: Fri, 1 Sep 2023 18:33:18 +0400 Subject: [PATCH] Fix examples for Amazon notifiers (#34009) --- .../chime_notifier_howto_guide.rst | 3 ++ .../notifications/sns.rst | 16 ++++--- .../amazon/aws/notifications/test_sns.py | 45 ++++++++++++++++++- .../amazon/aws/notifications/test_sqs.py | 35 +++++++++++++++ 4 files changed, 92 insertions(+), 7 deletions(-) diff --git a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst index bf346aad5b0b8..c10b8cbae4142 100644 --- a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst +++ b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst @@ -39,10 +39,13 @@ Example Code: from airflow.providers.amazon.aws.notifications.chime import send_chime_notification with DAG( + dag_id="mydag", + schedule="@once", start_date=datetime(2023, 6, 27), on_success_callback=[ send_chime_notification(chime_conn_id="my_chime_conn", message="The DAG {{ dag.dag_id }} succeeded") ], + catchup=False, ): BashOperator( task_id="mytask", diff --git a/docs/apache-airflow-providers-amazon/notifications/sns.rst b/docs/apache-airflow-providers-amazon/notifications/sns.rst index a0765023d7bc5..337e82cf62eb4 100644 --- a/docs/apache-airflow-providers-amazon/notifications/sns.rst +++ b/docs/apache-airflow-providers-amazon/notifications/sns.rst @@ -17,12 +17,12 @@ .. _howto/notifier:SnsNotifier: -How-to Guide for SNS notifications -================================== +How-to Guide for Amazon Simple Notification Service (Amazon SNS) notifications +============================================================================== Introduction ------------ -`AWS SNS `__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier` +`Amazon SNS `__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier` allows users to push messages to a SNS Topic using the various ``on_*_callbacks`` at both the DAG level and Task level. You can also use a notifier with ``sla_miss_callback``. @@ -38,7 +38,7 @@ Example Code: from datetime import datetime from airflow import DAG - from airflow.operators.python import PythonOperator + from airflow.operators.bash import BashOperator from airflow.providers.amazon.aws.notifications.sns import send_sns_notification dag_failure_sns_notification = send_sns_notification( @@ -54,7 +54,13 @@ Example Code: target_arn="arn:aws:sns:us-west-2:123456789098:AnotherTopicName", ) - with DAG(start_date=datetime(2023, 1, 1), on_failure_callback=[dag_failure_sns_notification]): + with DAG( + dag_id="mydag", + schedule="@once", + start_date=datetime(2023, 1, 1), + on_failure_callback=[dag_failure_sns_notification], + catchup=False, + ): BashOperator( task_id="mytask", on_failure_callback=[task_failure_sns_notification], diff --git a/tests/providers/amazon/aws/notifications/test_sns.py b/tests/providers/amazon/aws/notifications/test_sns.py index 3666d025d2ca6..fd70d3bb1d61b 100644 --- a/tests/providers/amazon/aws/notifications/test_sns.py +++ b/tests/providers/amazon/aws/notifications/test_sns.py @@ -20,6 +20,7 @@ import pytest +from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.notifications.sns import SnsNotifier, send_sns_notification from airflow.utils.types import NOTSET @@ -34,16 +35,19 @@ def test_class_and_notifier_are_same(self): @pytest.mark.parametrize("region_name", ["eu-west-2", None, PARAM_DEFAULT_VALUE]) def test_parameters_propagate_to_hook(self, aws_conn_id, region_name): """Test notifier attributes propagate to SnsHook.""" - notifier_kwargs = { + publish_kwargs = { "target_arn": "arn:aws:sns:us-west-2:123456789098:TopicName", "message": "foo-bar", + "subject": "spam-egg", + "message_attributes": {}, } + notifier_kwargs = {} if aws_conn_id is not NOTSET: notifier_kwargs["aws_conn_id"] = aws_conn_id if region_name is not NOTSET: notifier_kwargs["region_name"] = region_name - notifier = SnsNotifier(**notifier_kwargs) + notifier = SnsNotifier(**notifier_kwargs, **publish_kwargs) with mock.patch("airflow.providers.amazon.aws.notifications.sns.SnsHook") as mock_hook: hook = notifier.hook assert hook is notifier.hook, "Hook property not cached" @@ -51,3 +55,40 @@ def test_parameters_propagate_to_hook(self, aws_conn_id, region_name): aws_conn_id=(aws_conn_id if aws_conn_id is not NOTSET else "aws_default"), region_name=(region_name if region_name is not NOTSET else None), ) + + # Basic check for notifier + notifier.notify({}) + mock_hook.return_value.publish_to_target.assert_called_once_with(**publish_kwargs) + + def test_sns_notifier_templated(self, dag_maker): + with dag_maker("test_sns_notifier_templated") as dag: + EmptyOperator(task_id="task1") + + notifier = SnsNotifier( + aws_conn_id="{{ dag.dag_id }}", + target_arn="arn:aws:sns:{{ var_region }}:{{ var_account }}:{{ var_topic }}", + message="I, {{ var_username }}", + subject="{{ var_subject }}", + message_attributes={"foo": "{{ dag.dag_id }}"}, + region_name="{{ var_region }}", + ) + with mock.patch("airflow.providers.amazon.aws.notifications.sns.SnsHook") as m: + notifier( + { + "dag": dag, + "var_username": "Robot", + "var_region": "us-west-1", + "var_account": "000000000000", + "var_topic": "AwesomeTopic", + "var_subject": "spam-egg", + } + ) + # Hook initialisation + m.assert_called_once_with(aws_conn_id="test_sns_notifier_templated", region_name="us-west-1") + # Publish message + m.return_value.publish_to_target.assert_called_once_with( + target_arn="arn:aws:sns:us-west-1:000000000000:AwesomeTopic", + message="I, Robot", + subject="spam-egg", + message_attributes={"foo": "test_sns_notifier_templated"}, + ) diff --git a/tests/providers/amazon/aws/notifications/test_sqs.py b/tests/providers/amazon/aws/notifications/test_sqs.py index 356e46428a154..c3071e559e0e7 100644 --- a/tests/providers/amazon/aws/notifications/test_sqs.py +++ b/tests/providers/amazon/aws/notifications/test_sqs.py @@ -20,6 +20,7 @@ import pytest +from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier, send_sqs_notification from airflow.utils.types import NOTSET @@ -59,3 +60,37 @@ def test_parameters_propagate_to_hook(self, aws_conn_id, region_name): # Basic check for notifier notifier.notify({}) mock_hook.return_value.send_message.assert_called_once_with(**send_message_kwargs) + + def test_sqs_notifier_templated(self, dag_maker): + with dag_maker("test_sns_notifier_templated") as dag: + EmptyOperator(task_id="task1") + + notifier = SqsNotifier( + aws_conn_id="{{ dag.dag_id }}", + queue_url="https://sqs.{{ var_region }}.amazonaws.com/{{ var_account }}/{{ var_queue }}", + message_body="The {{ var_username|capitalize }} Show", + message_attributes={"bar": "{{ dag.dag_id }}"}, + message_group_id="{{ var_group_id }}", + region_name="{{ var_region }}", + ) + with mock.patch("airflow.providers.amazon.aws.notifications.sqs.SqsHook") as m: + notifier( + { + "dag": dag, + "var_username": "truman", + "var_region": "ca-central-1", + "var_account": "123321123321", + "var_queue": "AwesomeQueue", + "var_group_id": "spam", + } + ) + # Hook initialisation + m.assert_called_once_with(aws_conn_id="test_sns_notifier_templated", region_name="ca-central-1") + # Send message + m.return_value.send_message.assert_called_once_with( + queue_url="https://sqs.ca-central-1.amazonaws.com/123321123321/AwesomeQueue", + message_body="The Truman Show", + message_group_id="spam", + message_attributes={"bar": "test_sns_notifier_templated"}, + delay_seconds=0, + )