Skip to content

Commit

Permalink
Implement Azure service bus subscription Operators (#24625)
Browse files Browse the repository at this point in the history
Implement Azure service bus subscription Operators

- Added AzureServiceBusSubscriptionCreateOperator
- Added AzureServiceBusSubscriptionDeleteOperator
-example DAG
- Added hooks for creating subscription and delete subscription
- Added unit Test case
  • Loading branch information
bharanidharan14 committed Jul 12, 2022
1 parent 50531f3 commit aa8bf2c
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 5 deletions.
19 changes: 18 additions & 1 deletion airflow/providers/microsoft/azure/hooks/asb.py
Expand Up @@ -114,6 +114,22 @@ def delete_queue(self, queue_name: str) -> None:
with self.get_conn() as service_mgmt_conn:
service_mgmt_conn.delete_queue(queue_name)

def delete_subscription(self, subscription_name: str, topic_name: str) -> None:
"""
Delete a topic subscription entities under a ServiceBus Namespace
:param subscription_name: The subscription name that will own the rule in topic
:param topic_name: The topic that will own the subscription rule.
"""
if subscription_name is None:
raise TypeError("Subscription name cannot be None.")
if topic_name is None:
raise TypeError("Topic name cannot be None.")

with self.get_conn() as service_mgmt_conn:
self.log.info("Deleting Subscription %s", subscription_name)
service_mgmt_conn.delete_subscription(topic_name, subscription_name)


class MessageHook(BaseAzureServiceBusHook):
"""
Expand All @@ -138,7 +154,8 @@ def send_message(
:param queue_name: The name of the queue or a QueueProperties with name.
:param messages: Message which needs to be sent to the queue. It can be string or list of string.
:param batch_message_flag: bool flag, can be set to True if message needs to be sent as batch message.
:param batch_message_flag: bool flag, can be set to True if message needs to be
sent as batch message.
"""
if queue_name is None:
raise TypeError("Queue name cannot be None.")
Expand Down
151 changes: 149 additions & 2 deletions airflow/providers/microsoft/azure/operators/asb.py
Expand Up @@ -14,8 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import TYPE_CHECKING, List, Sequence, Union
import datetime
from typing import TYPE_CHECKING, List, Optional, Sequence, Union

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook
Expand Down Expand Up @@ -204,3 +204,150 @@ def execute(self, context: "Context") -> None:

# delete queue with name
hook.delete_queue(self.queue_name)


class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
"""
Create an Azure Service Bus Topic Subscription under a Service Bus Namespace
by using ServiceBusAdministrationClient
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureServiceBusSubscriptionCreateOperator`
:param topic_name: The topic that will own the to-be-created subscription.
:param subscription_name: Name of the subscription that need to be created
:param lock_duration: ISO 8601 time span duration of a peek-lock; that is, the amount of time that
the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the
default value is 1 minute. Input value of either type ~datetime.timedelta or string in ISO 8601
duration format like "PT300S" is accepted.
:param requires_session: A value that indicates whether the queue supports the concept of sessions.
:param default_message_time_to_live: ISO 8601 default message time span to live value. This is the
duration after which the message expires, starting from when the message is sent to
Service Bus. This is the default value used when TimeToLive is not set on a message itself.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration
format like "PT300S" is accepted.
:param dead_lettering_on_message_expiration: A value that indicates whether this subscription has
dead letter support when a message expires.
:param dead_lettering_on_filter_evaluation_exceptions: A value that indicates whether this
subscription has dead letter support when a message expires.
:param max_delivery_count: The maximum delivery count. A message is automatically dead lettered
after this number of deliveries. Default value is 10.
:param enable_batched_operations: Value that indicates whether server-side batched
operations are enabled.
:param forward_to: The name of the recipient entity to which all the messages sent to the
subscription are forwarded to.
:param user_metadata: Metadata associated with the subscription. Maximum number of characters is 1024.
:param forward_dead_lettered_messages_to: The name of the recipient entity to which all the
messages sent to the subscription are forwarded to.
:param auto_delete_on_idle: ISO 8601 time Span idle interval after which the subscription is
automatically deleted. The minimum duration is 5 minutes. Input value of either
type ~datetime.timedelta or string in ISO 8601 duration format like "PT300S" is accepted.
:param azure_service_bus_conn_id: Reference to the
:ref:`Azure Service Bus connection<howto/connection:azure_service_bus>`.
"""

template_fields: Sequence[str] = ("topic_name", "subscription_name")
ui_color = "#e4f0e8"

def __init__(
self,
*,
topic_name: str,
subscription_name: str,
azure_service_bus_conn_id: str = 'azure_service_bus_default',
lock_duration: Optional[Union[datetime.timedelta, str]] = None,
requires_session: Optional[bool] = None,
default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
dead_lettering_on_message_expiration: Optional[bool] = True,
dead_lettering_on_filter_evaluation_exceptions: Optional[bool] = None,
max_delivery_count: Optional[int] = 10,
enable_batched_operations: Optional[bool] = True,
forward_to: Optional[str] = None,
user_metadata: Optional[str] = None,
forward_dead_lettered_messages_to: Optional[str] = None,
auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.topic_name = topic_name
self.subscription_name = subscription_name
self.lock_duration = lock_duration
self.requires_session = requires_session
self.default_message_time_to_live = default_message_time_to_live
self.dl_on_message_expiration = dead_lettering_on_message_expiration
self.dl_on_filter_evaluation_exceptions = dead_lettering_on_filter_evaluation_exceptions
self.max_delivery_count = max_delivery_count
self.enable_batched_operations = enable_batched_operations
self.forward_to = forward_to
self.user_metadata = user_metadata
self.forward_dead_lettered_messages_to = forward_dead_lettered_messages_to
self.auto_delete_on_idle = auto_delete_on_idle
self.azure_service_bus_conn_id = azure_service_bus_conn_id

def execute(self, context: "Context") -> None:
"""Creates Subscription in Service Bus namespace, by connecting to Service Bus Admin client"""
if self.subscription_name is None:
raise TypeError("Subscription name cannot be None.")
if self.topic_name is None:
raise TypeError("Topic name cannot be None.")
# Create the hook
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

with hook.get_conn() as service_mgmt_conn:
# create subscription with name
subscription = service_mgmt_conn.create_subscription(
topic_name=self.topic_name,
subscription_name=self.subscription_name,
lock_duration=self.lock_duration,
requires_session=self.requires_session,
default_message_time_to_live=self.default_message_time_to_live,
dead_lettering_on_message_expiration=self.dl_on_message_expiration,
dead_lettering_on_filter_evaluation_exceptions=self.dl_on_filter_evaluation_exceptions,
max_delivery_count=self.max_delivery_count,
enable_batched_operations=self.enable_batched_operations,
forward_to=self.forward_to,
user_metadata=self.user_metadata,
forward_dead_lettered_messages_to=self.forward_dead_lettered_messages_to,
auto_delete_on_idle=self.auto_delete_on_idle,
)
self.log.info("Created subscription %s", subscription.name)


class AzureServiceBusSubscriptionDeleteOperator(BaseOperator):
"""
Deletes the topic subscription in the Azure ServiceBus namespace
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureServiceBusSubscriptionDeleteOperator`
:param topic_name: The topic that will own the to-be-created subscription.
:param subscription_name: Name of the subscription that need to be created
:param azure_service_bus_conn_id: Reference to the
:ref:`Azure Service Bus connection <howto/connection:azure_service_bus>`.
"""

template_fields: Sequence[str] = ("topic_name", "subscription_name")
ui_color = "#e4f0e8"

def __init__(
self,
*,
topic_name: str,
subscription_name: str,
azure_service_bus_conn_id: str = 'azure_service_bus_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.topic_name = topic_name
self.subscription_name = subscription_name
self.azure_service_bus_conn_id = azure_service_bus_conn_id

def execute(self, context: "Context") -> None:
"""Delete topic subscription in Service Bus namespace, by connecting to Service Bus Admin client"""
# Create the hook
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

# delete subscription with name
hook.delete_subscription(self.subscription_name, self.topic_name)
38 changes: 38 additions & 0 deletions docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
Expand Up @@ -98,6 +98,44 @@ Below is an example of using this operator to execute an Azure Service Bus Delet
:start-after: [START howto_operator_delete_service_bus_queue]
:end-before: [END howto_operator_delete_service_bus_queue]

Azure Service Bus Subscription Operators
-----------------------------------------
Azure Service Bus Subscription based Operators helps to interact topic Subscription in service bus namespace
and it helps to Create, Delete operation for subscription under topic.

.. _howto/operator:AzureServiceBusSubscriptionCreateOperator:

Create Azure Service Bus Subscription
======================================

To create Azure service bus topic Subscription with specific Parameter you can use
:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSubscriptionCreateOperator`.

Below is an example of using this operator to execute an Azure Service Bus Create Subscription.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_service_bus_subscription]
:end-before: [END howto_operator_create_service_bus_subscription]

.. _howto/operator:AzureServiceBusSubscriptionDeleteOperator:

Delete Azure Service Bus Subscription
======================================

To Delete the Azure service bus topic Subscription you can use
:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusSubscriptionDeleteOperator`.

Below is an example of using this operator to execute an Azure Service Bus Delete Subscription under topic.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_service_bus_subscription]
:end-before: [END howto_operator_delete_service_bus_subscription]



Reference
---------
Expand Down
29 changes: 29 additions & 0 deletions tests/providers/microsoft/azure/hooks/test_asb.py
Expand Up @@ -201,3 +201,32 @@ def test_receive_message_exception(self, mock_sb_client):
hook = MessageHook(azure_service_bus_conn_id=self.conn_id)
with pytest.raises(TypeError):
hook.receive_message(None)

@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn')
def test_delete_subscription(self, mock_sb_admin_client):
"""
Test Delete subscription functionality by passing subscription name and topic name,
assert the function with values, mock the azure service bus function `delete_subscription`
"""
subscription_name = "test_subscription_name"
topic_name = "test_topic_name"
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
hook.delete_subscription(subscription_name, topic_name)
expected_calls = [mock.call().__enter__().delete_subscription(topic_name, subscription_name)]
mock_sb_admin_client.assert_has_calls(expected_calls)

@pytest.mark.parametrize(
"mock_subscription_name, mock_topic_name",
[("subscription_1", None), (None, "topic_1")],
)
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
def test_delete_subscription_exception(
self, mock_sb_admin_client, mock_subscription_name, mock_topic_name
):
"""
Test `delete_subscription` functionality to raise AirflowException,
by passing subscription name and topic name as None and pytest raise Airflow Exception
"""
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
with pytest.raises(TypeError):
hook.delete_subscription(mock_subscription_name, mock_topic_name)
97 changes: 97 additions & 0 deletions tests/providers/microsoft/azure/operators/test_asb.py
Expand Up @@ -25,12 +25,19 @@
AzureServiceBusDeleteQueueOperator,
AzureServiceBusReceiveMessageOperator,
AzureServiceBusSendMessageOperator,
AzureServiceBusSubscriptionCreateOperator,
AzureServiceBusSubscriptionDeleteOperator,
)

QUEUE_NAME = "test_queue"
MESSAGE = "Test Message"
MESSAGE_LIST = [MESSAGE + " " + str(n) for n in range(0, 10)]

OWNER_NAME = "airflow"
DAG_ID = "test_azure_service_bus_subscription"
TOPIC_NAME = "sb_mgmt_topic_test"
SUBSCRIPTION_NAME = "sb_mgmt_subscription"


class TestAzureServiceBusCreateQueueOperator:
@pytest.mark.parametrize(
Expand Down Expand Up @@ -193,3 +200,93 @@ def test_receive_message_queue(self, mock_get_conn):
.__exit__
]
mock_get_conn.assert_has_calls(expected_calls)


class TestASBCreateSubscriptionOperator:
def test_init(self):
"""
Test init by creating ASBCreateSubscriptionOperator with task id, subscription name, topic name and
asserting with value
"""
asb_create_subscription = AzureServiceBusSubscriptionCreateOperator(
task_id="asb_create_subscription",
topic_name=TOPIC_NAME,
subscription_name=SUBSCRIPTION_NAME,
)
assert asb_create_subscription.task_id == "asb_create_subscription"
assert asb_create_subscription.subscription_name == SUBSCRIPTION_NAME
assert asb_create_subscription.topic_name == TOPIC_NAME

@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
@mock.patch('azure.servicebus.management.SubscriptionProperties')
def test_create_subscription(self, mock_subscription_properties, mock_get_conn):
"""
Test AzureServiceBusSubscriptionCreateOperator passed with the subscription name, topic name
mocking the connection details, hook create_subscription function
"""
asb_create_subscription = AzureServiceBusSubscriptionCreateOperator(
task_id="create_service_bus_subscription",
topic_name=TOPIC_NAME,
subscription_name=SUBSCRIPTION_NAME,
)
mock_subscription_properties.name = SUBSCRIPTION_NAME
mock_subscription_properties.to = SUBSCRIPTION_NAME
mock_get_conn.return_value.__enter__.return_value.create_subscription.return_value = (
mock_subscription_properties
)

with mock.patch.object(asb_create_subscription.log, "info") as mock_log_info:
asb_create_subscription.execute(None)
mock_log_info.assert_called_with("Created subscription %s", SUBSCRIPTION_NAME)

@pytest.mark.parametrize(
"mock_subscription_name, mock_topic_name",
[("subscription_1", None), (None, "topic_1")],
)
@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
def test_create_subscription_exception(
self, mock_sb_admin_client, mock_subscription_name, mock_topic_name
):
"""
Test `AzureServiceBusSubscriptionCreateOperator` functionality to raise AirflowException,
by passing subscription name and topic name as None and pytest raise Airflow Exception
"""
asb_create_subscription = AzureServiceBusSubscriptionCreateOperator(
task_id="create_service_bus_subscription",
topic_name=mock_topic_name,
subscription_name=mock_subscription_name,
)
with pytest.raises(TypeError):
asb_create_subscription.execute(None)


class TestASBDeleteSubscriptionOperator:
def test_init(self):
"""
Test init by creating AzureServiceBusSubscriptionDeleteOperator with task id, subscription name,
topic name and asserting with values
"""
asb_delete_subscription_operator = AzureServiceBusSubscriptionDeleteOperator(
task_id="asb_delete_subscription",
topic_name=TOPIC_NAME,
subscription_name=SUBSCRIPTION_NAME,
)
assert asb_delete_subscription_operator.task_id == "asb_delete_subscription"
assert asb_delete_subscription_operator.topic_name == TOPIC_NAME
assert asb_delete_subscription_operator.subscription_name == SUBSCRIPTION_NAME

@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
def test_delete_subscription(self, mock_get_conn):
"""
Test AzureServiceBusSubscriptionDeleteOperator by mocking subscription name, topic name and
connection and hook delete_subscription
"""
asb_delete_subscription_operator = AzureServiceBusSubscriptionDeleteOperator(
task_id="asb_delete_subscription",
topic_name=TOPIC_NAME,
subscription_name=SUBSCRIPTION_NAME,
)
asb_delete_subscription_operator.execute(None)
mock_get_conn.return_value.__enter__.return_value.delete_subscription.assert_called_once_with(
TOPIC_NAME, SUBSCRIPTION_NAME
)

0 comments on commit aa8bf2c

Please sign in to comment.