Skip to content

Commit

Permalink
Implement Azure Service Bus Topic Create, Delete Operators (#25436)
Browse files Browse the repository at this point in the history
- Added Create Topic Operator
- Added Test case
- Added Example DAG
- Added Doc for the operator

Implemented Azure service bus Topic Delete operator

- Added Operator for Delete Topic Operator
- Added example DAG
  • Loading branch information
bharanidharan14 committed Aug 16, 2022
1 parent 9033839 commit 5c7c518
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 1 deletion.
165 changes: 164 additions & 1 deletion airflow/providers/microsoft/azure/operators/asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# specific language governing permissions and limitations
# under the License.
import datetime
from typing import TYPE_CHECKING, List, Optional, Sequence, Union
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Union

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook

if TYPE_CHECKING:
from azure.servicebus.management._models import AuthorizationRule

from airflow.utils.context import Context


Expand Down Expand Up @@ -206,6 +208,125 @@ def execute(self, context: "Context") -> None:
hook.delete_queue(self.queue_name)


class AzureServiceBusTopicCreateOperator(BaseOperator):
"""
Create an Azure Service Bus Topic under a Service Bus Namespace by using ServiceBusAdministrationClient
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureServiceBusTopicCreateOperator`
:param topic_name: Name of the topic.
:param default_message_time_to_live: ISO 8601 default message time span to live value. This is
the duration after which the message expires, starting from when the message is sent to Service
Bus. This is the default value used when TimeToLive is not set on a message itself.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
like "PT300S" is accepted.
:param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
memory allocated for the topic.
:param requires_duplicate_detection: A value indicating if this topic requires duplicate
detection.
:param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
duration of the duplicate detection history. The default value is 10 minutes.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
like "PT300S" is accepted.
:param enable_batched_operations: Value that indicates whether server-side batched operations
are enabled.
:param size_in_bytes: The size of the topic, in bytes.
:param filtering_messages_before_publishing: Filter messages before publishing.
:param authorization_rules: List of Authorization rules for resource.
:param support_ordering: A value that indicates whether the topic supports ordering.
:param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
automatically deleted. The minimum duration is 5 minutes.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
like "PT300S" is accepted.
:param enable_partitioning: A value that indicates whether the topic is to be partitioned
across multiple message brokers.
:param enable_express: A value that indicates whether Express Entities are enabled. An express
queue holds a message in memory temporarily before writing it to persistent storage.
:param user_metadata: Metadata associated with the topic.
:param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
can be accepted by the queue. This feature is only available when using a Premium namespace
and Service Bus API version "2021-05" or higher.
The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
"""

template_fields: Sequence[str] = ("topic_name",)
ui_color = "#e4f0e8"

def __init__(
self,
*,
topic_name: str,
azure_service_bus_conn_id: str = 'azure_service_bus_default',
default_message_time_to_live: Optional[Union[datetime.timedelta, str]] = None,
max_size_in_megabytes: Optional[int] = None,
requires_duplicate_detection: Optional[bool] = None,
duplicate_detection_history_time_window: Optional[Union[datetime.timedelta, str]] = None,
enable_batched_operations: Optional[bool] = None,
size_in_bytes: Optional[int] = None,
filtering_messages_before_publishing: Optional[bool] = None,
authorization_rules: Optional[List["AuthorizationRule"]] = None,
support_ordering: Optional[bool] = None,
auto_delete_on_idle: Optional[Union[datetime.timedelta, str]] = None,
enable_partitioning: Optional[bool] = None,
enable_express: Optional[bool] = None,
user_metadata: Optional[str] = None,
max_message_size_in_kilobytes: Optional[int] = None,
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.topic_name = topic_name
self.azure_service_bus_conn_id = azure_service_bus_conn_id
self.default_message_time_to_live = default_message_time_to_live
self.max_size_in_megabytes = max_size_in_megabytes
self.requires_duplicate_detection = requires_duplicate_detection
self.duplicate_detection_history_time_window = duplicate_detection_history_time_window
self.enable_batched_operations = enable_batched_operations
self.size_in_bytes = size_in_bytes
self.filtering_messages_before_publishing = filtering_messages_before_publishing
self.authorization_rules = authorization_rules
self.support_ordering = support_ordering
self.auto_delete_on_idle = auto_delete_on_idle
self.enable_partitioning = enable_partitioning
self.enable_express = enable_express
self.user_metadata = user_metadata
self.max_message_size_in_kilobytes = max_message_size_in_kilobytes

def execute(self, context: "Context") -> str:
"""Creates Topic in Service Bus namespace, by connecting to Service Bus Admin client"""
if self.topic_name is None:
raise TypeError("Topic name cannot be None.")

# Create the hook
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

with hook.get_conn() as service_mgmt_conn:
topic_properties = service_mgmt_conn.get_topic(self.topic_name)
if topic_properties and topic_properties.name == self.topic_name:
self.log.info("Topic name already exists")
return topic_properties.name
topic = service_mgmt_conn.create_topic(
topic_name=self.topic_name,
default_message_time_to_live=self.default_message_time_to_live,
max_size_in_megabytes=self.max_size_in_megabytes,
requires_duplicate_detection=self.requires_duplicate_detection,
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
enable_batched_operations=self.enable_batched_operations,
size_in_bytes=self.size_in_bytes,
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
authorization_rules=self.authorization_rules,
support_ordering=self.support_ordering,
auto_delete_on_idle=self.auto_delete_on_idle,
enable_partitioning=self.enable_partitioning,
enable_express=self.enable_express,
user_metadata=self.user_metadata,
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
)
self.log.info("Created Topic %s", topic.name)
return topic.name


class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
"""
Create an Azure Service Bus Topic Subscription under a Service Bus Namespace
Expand Down Expand Up @@ -467,3 +588,45 @@ def execute(self, context: "Context") -> None:

# delete subscription with name
hook.delete_subscription(self.subscription_name, self.topic_name)


class AzureServiceBusTopicDeleteOperator(BaseOperator):
"""
Deletes the topic in the Azure Service Bus namespace
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AzureServiceBusTopicDeleteOperator`
:param topic_name: Name of the topic to be deleted.
:param azure_service_bus_conn_id: Reference to the
:ref:`Azure Service Bus connection <howto/connection:azure_service_bus>`.
"""

template_fields: Sequence[str] = ("topic_name",)
ui_color = "#e4f0e8"

def __init__(
self,
*,
topic_name: str,
azure_service_bus_conn_id: str = 'azure_service_bus_default',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.topic_name = topic_name
self.azure_service_bus_conn_id = azure_service_bus_conn_id

def execute(self, context: "Context") -> None:
"""Delete topic in Service Bus namespace, by connecting to Service Bus Admin client"""
if self.topic_name is None:
raise TypeError("Topic name cannot be None.")
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

with hook.get_conn() as service_mgmt_conn:
topic_properties = service_mgmt_conn.get_topic(self.topic_name)
if topic_properties and topic_properties.name == self.topic_name:
service_mgmt_conn.delete_topic(self.topic_name)
self.log.info("Topic %s deleted.", self.topic_name)
else:
self.log.info("Topic %s does not exist.", self.topic_name)
37 changes: 37 additions & 0 deletions docs/apache-airflow-providers-microsoft-azure/operators/asb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,43 @@ Below is an example of using this operator to execute an Azure Service Bus Delet
:start-after: [START howto_operator_delete_service_bus_queue]
:end-before: [END howto_operator_delete_service_bus_queue]

Azure Service Bus Topic Operators
-----------------------------------------
Azure Service Bus Topic based Operators helps to interact with topic in service bus namespace
and it helps to Create, Delete operation for topic.

.. _howto/operator:AzureServiceBusTopicCreateOperator:

Create Azure Service Bus Topic
======================================

To create Azure service bus topic with specific Parameter you can use
:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicCreateOperator`.

Below is an example of using this operator to execute an Azure Service Bus Create Topic.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
:language: python
:dedent: 4
:start-after: [START howto_operator_create_service_bus_topic]
:end-before: [END howto_operator_create_service_bus_topic]

.. _howto/operator:AzureServiceBusTopicDeleteOperator:

Delete Azure Service Bus Topic
======================================

To Delete the Azure service bus topic you can use
:class:`~airflow.providers.microsoft.azure.operators.asb.AzureServiceBusTopicDeleteOperator`.

Below is an example of using this operator to execute an Azure Service Bus Delete topic.

.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_azure_service_bus.py
:language: python
:dedent: 4
:start-after: [START howto_operator_delete_service_bus_topic]
:end-before: [END howto_operator_delete_service_bus_topic]

Azure Service Bus Subscription Operators
-----------------------------------------
Azure Service Bus Subscription based Operators helps to interact topic Subscription in service bus namespace
Expand Down
104 changes: 104 additions & 0 deletions tests/providers/microsoft/azure/operators/test_asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
AzureServiceBusSendMessageOperator,
AzureServiceBusSubscriptionCreateOperator,
AzureServiceBusSubscriptionDeleteOperator,
AzureServiceBusTopicCreateOperator,
AzureServiceBusTopicDeleteOperator,
AzureServiceBusUpdateSubscriptionOperator,
)

Expand Down Expand Up @@ -204,6 +206,51 @@ def test_receive_message_queue(self, mock_get_conn):
mock_get_conn.assert_has_calls(expected_calls)


class TestABSTopicCreateOperator:
def test_init(self):
"""
Test init by creating AzureServiceBusTopicCreateOperator with task id and topic name,
by asserting the value
"""
asb_create_topic = AzureServiceBusTopicCreateOperator(
task_id="asb_create_topic",
topic_name=TOPIC_NAME,
)
assert asb_create_topic.task_id == "asb_create_topic"
assert asb_create_topic.topic_name == TOPIC_NAME

@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
@mock.patch('azure.servicebus.management.TopicProperties')
def test_create_topic(self, mock_topic_properties, mock_get_conn):
"""
Test AzureServiceBusTopicCreateOperator passed with the topic name
mocking the connection details, hook create_topic function
"""
asb_create_topic = AzureServiceBusTopicCreateOperator(
task_id="asb_create_topic",
topic_name=TOPIC_NAME,
)
mock_topic_properties.name = TOPIC_NAME
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = mock_topic_properties

with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
asb_create_topic.execute(None)
mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)

@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
def test_create_subscription_exception(self, mock_sb_admin_client):
"""
Test `AzureServiceBusTopicCreateOperator` functionality to raise AirflowException,
by passing topic name as None and pytest raise Airflow Exception
"""
asb_create_topic_exception = AzureServiceBusTopicCreateOperator(
task_id="create_service_bus_subscription",
topic_name=None,
)
with pytest.raises(TypeError):
asb_create_topic_exception.execute(None)


class TestASBCreateSubscriptionOperator:
def test_init(self):
"""
Expand Down Expand Up @@ -377,3 +424,60 @@ def test_receive_message_queue(self, mock_get_conn):
.__exit__
]
mock_get_conn.assert_has_calls(expected_calls)


class TestASBTopicDeleteOperator:
def test_init(self):
"""
Test init by creating AzureServiceBusTopicDeleteOperator with task id, topic name and asserting
with values
"""
asb_delete_topic_operator = AzureServiceBusTopicDeleteOperator(
task_id="asb_delete_topic",
topic_name=TOPIC_NAME,
)
assert asb_delete_topic_operator.task_id == "asb_delete_topic"
assert asb_delete_topic_operator.topic_name == TOPIC_NAME

@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
@mock.patch('azure.servicebus.management.TopicProperties')
def test_delete_topic(self, mock_topic_properties, mock_get_conn):
"""
Test AzureServiceBusTopicDeleteOperator by mocking topic name, connection
"""
asb_delete_topic = AzureServiceBusTopicDeleteOperator(
task_id="asb_delete_topic",
topic_name=TOPIC_NAME,
)
mock_topic_properties.name = TOPIC_NAME
mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = mock_topic_properties
with mock.patch.object(asb_delete_topic.log, "info") as mock_log_info:
asb_delete_topic.execute(None)
mock_log_info.assert_called_with("Topic %s deleted.", TOPIC_NAME)

@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook.get_conn")
def test_delete_topic_not_exists(self, mock_get_conn):
"""
Test AzureServiceBusTopicDeleteOperator by mocking topic name, connection
"""
asb_delete_topic_not_exists = AzureServiceBusTopicDeleteOperator(
task_id="asb_delete_topic_not_exists",
topic_name=TOPIC_NAME,
)
mock_get_conn.return_value.__enter__.return_value.get_topic.return_value = None
with mock.patch.object(asb_delete_topic_not_exists.log, "info") as mock_log_info:
asb_delete_topic_not_exists.execute(None)
mock_log_info.assert_called_with("Topic %s does not exist.", TOPIC_NAME)

@mock.patch('airflow.providers.microsoft.azure.hooks.asb.AdminClientHook')
def test_delete_topic_exception(self, mock_sb_admin_client):
"""
Test `delete_topic` functionality to raise AirflowException,
by passing topic name as None and pytest raise Airflow Exception
"""
asb_delete_topic_exception = AzureServiceBusTopicDeleteOperator(
task_id="delete_service_bus_subscription",
topic_name=None,
)
with pytest.raises(TypeError):
asb_delete_topic_exception.execute(None)
Loading

0 comments on commit 5c7c518

Please sign in to comment.