diff --git a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
index bf346aad5b0b8..c10b8cbae4142 100644
--- a/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
+++ b/docs/apache-airflow-providers-amazon/notifications/chime_notifier_howto_guide.rst
@@ -39,10 +39,13 @@ Example Code:
from airflow.providers.amazon.aws.notifications.chime import send_chime_notification
with DAG(
+ dag_id="mydag",
+ schedule="@once",
start_date=datetime(2023, 6, 27),
on_success_callback=[
send_chime_notification(chime_conn_id="my_chime_conn", message="The DAG {{ dag.dag_id }} succeeded")
],
+ catchup=False,
):
BashOperator(
task_id="mytask",
diff --git a/docs/apache-airflow-providers-amazon/notifications/sns.rst b/docs/apache-airflow-providers-amazon/notifications/sns.rst
index a0765023d7bc5..337e82cf62eb4 100644
--- a/docs/apache-airflow-providers-amazon/notifications/sns.rst
+++ b/docs/apache-airflow-providers-amazon/notifications/sns.rst
@@ -17,12 +17,12 @@
.. _howto/notifier:SnsNotifier:
-How-to Guide for SNS notifications
-==================================
+How-to Guide for Amazon Simple Notification Service (Amazon SNS) notifications
+==============================================================================
Introduction
------------
-`AWS SNS `__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier`
+`Amazon SNS `__ notifier :class:`~airflow.providers.amazon.aws.notifications.sns.SnsNotifier`
allows users to push messages to a SNS Topic using the various ``on_*_callbacks`` at both the DAG level and Task level.
You can also use a notifier with ``sla_miss_callback``.
@@ -38,7 +38,7 @@ Example Code:
from datetime import datetime
from airflow import DAG
- from airflow.operators.python import PythonOperator
+ from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.notifications.sns import send_sns_notification
dag_failure_sns_notification = send_sns_notification(
@@ -54,7 +54,13 @@ Example Code:
target_arn="arn:aws:sns:us-west-2:123456789098:AnotherTopicName",
)
- with DAG(start_date=datetime(2023, 1, 1), on_failure_callback=[dag_failure_sns_notification]):
+ with DAG(
+ dag_id="mydag",
+ schedule="@once",
+ start_date=datetime(2023, 1, 1),
+ on_failure_callback=[dag_failure_sns_notification],
+ catchup=False,
+ ):
BashOperator(
task_id="mytask",
on_failure_callback=[task_failure_sns_notification],
diff --git a/tests/providers/amazon/aws/notifications/test_sns.py b/tests/providers/amazon/aws/notifications/test_sns.py
index 3666d025d2ca6..fd70d3bb1d61b 100644
--- a/tests/providers/amazon/aws/notifications/test_sns.py
+++ b/tests/providers/amazon/aws/notifications/test_sns.py
@@ -20,6 +20,7 @@
import pytest
+from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.notifications.sns import SnsNotifier, send_sns_notification
from airflow.utils.types import NOTSET
@@ -34,16 +35,19 @@ def test_class_and_notifier_are_same(self):
@pytest.mark.parametrize("region_name", ["eu-west-2", None, PARAM_DEFAULT_VALUE])
def test_parameters_propagate_to_hook(self, aws_conn_id, region_name):
"""Test notifier attributes propagate to SnsHook."""
- notifier_kwargs = {
+ publish_kwargs = {
"target_arn": "arn:aws:sns:us-west-2:123456789098:TopicName",
"message": "foo-bar",
+ "subject": "spam-egg",
+ "message_attributes": {},
}
+ notifier_kwargs = {}
if aws_conn_id is not NOTSET:
notifier_kwargs["aws_conn_id"] = aws_conn_id
if region_name is not NOTSET:
notifier_kwargs["region_name"] = region_name
- notifier = SnsNotifier(**notifier_kwargs)
+ notifier = SnsNotifier(**notifier_kwargs, **publish_kwargs)
with mock.patch("airflow.providers.amazon.aws.notifications.sns.SnsHook") as mock_hook:
hook = notifier.hook
assert hook is notifier.hook, "Hook property not cached"
@@ -51,3 +55,40 @@ def test_parameters_propagate_to_hook(self, aws_conn_id, region_name):
aws_conn_id=(aws_conn_id if aws_conn_id is not NOTSET else "aws_default"),
region_name=(region_name if region_name is not NOTSET else None),
)
+
+ # Basic check for notifier
+ notifier.notify({})
+ mock_hook.return_value.publish_to_target.assert_called_once_with(**publish_kwargs)
+
+ def test_sns_notifier_templated(self, dag_maker):
+ with dag_maker("test_sns_notifier_templated") as dag:
+ EmptyOperator(task_id="task1")
+
+ notifier = SnsNotifier(
+ aws_conn_id="{{ dag.dag_id }}",
+ target_arn="arn:aws:sns:{{ var_region }}:{{ var_account }}:{{ var_topic }}",
+ message="I, {{ var_username }}",
+ subject="{{ var_subject }}",
+ message_attributes={"foo": "{{ dag.dag_id }}"},
+ region_name="{{ var_region }}",
+ )
+ with mock.patch("airflow.providers.amazon.aws.notifications.sns.SnsHook") as m:
+ notifier(
+ {
+ "dag": dag,
+ "var_username": "Robot",
+ "var_region": "us-west-1",
+ "var_account": "000000000000",
+ "var_topic": "AwesomeTopic",
+ "var_subject": "spam-egg",
+ }
+ )
+ # Hook initialisation
+ m.assert_called_once_with(aws_conn_id="test_sns_notifier_templated", region_name="us-west-1")
+ # Publish message
+ m.return_value.publish_to_target.assert_called_once_with(
+ target_arn="arn:aws:sns:us-west-1:000000000000:AwesomeTopic",
+ message="I, Robot",
+ subject="spam-egg",
+ message_attributes={"foo": "test_sns_notifier_templated"},
+ )
diff --git a/tests/providers/amazon/aws/notifications/test_sqs.py b/tests/providers/amazon/aws/notifications/test_sqs.py
index 356e46428a154..c3071e559e0e7 100644
--- a/tests/providers/amazon/aws/notifications/test_sqs.py
+++ b/tests/providers/amazon/aws/notifications/test_sqs.py
@@ -20,6 +20,7 @@
import pytest
+from airflow.operators.empty import EmptyOperator
from airflow.providers.amazon.aws.notifications.sqs import SqsNotifier, send_sqs_notification
from airflow.utils.types import NOTSET
@@ -59,3 +60,37 @@ def test_parameters_propagate_to_hook(self, aws_conn_id, region_name):
# Basic check for notifier
notifier.notify({})
mock_hook.return_value.send_message.assert_called_once_with(**send_message_kwargs)
+
+ def test_sqs_notifier_templated(self, dag_maker):
+ with dag_maker("test_sns_notifier_templated") as dag:
+ EmptyOperator(task_id="task1")
+
+ notifier = SqsNotifier(
+ aws_conn_id="{{ dag.dag_id }}",
+ queue_url="https://sqs.{{ var_region }}.amazonaws.com/{{ var_account }}/{{ var_queue }}",
+ message_body="The {{ var_username|capitalize }} Show",
+ message_attributes={"bar": "{{ dag.dag_id }}"},
+ message_group_id="{{ var_group_id }}",
+ region_name="{{ var_region }}",
+ )
+ with mock.patch("airflow.providers.amazon.aws.notifications.sqs.SqsHook") as m:
+ notifier(
+ {
+ "dag": dag,
+ "var_username": "truman",
+ "var_region": "ca-central-1",
+ "var_account": "123321123321",
+ "var_queue": "AwesomeQueue",
+ "var_group_id": "spam",
+ }
+ )
+ # Hook initialisation
+ m.assert_called_once_with(aws_conn_id="test_sns_notifier_templated", region_name="ca-central-1")
+ # Send message
+ m.return_value.send_message.assert_called_once_with(
+ queue_url="https://sqs.ca-central-1.amazonaws.com/123321123321/AwesomeQueue",
+ message_body="The Truman Show",
+ message_group_id="spam",
+ message_attributes={"bar": "test_sns_notifier_templated"},
+ delay_seconds=0,
+ )