Skip to content

Commit

Permalink
Add Amazon EventBridge PutRule hook and operator (#32869)
Browse files Browse the repository at this point in the history
  • Loading branch information
mahammi committed Jul 31, 2023
1 parent 357d35c commit 196d336
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 13 deletions.
65 changes: 64 additions & 1 deletion airflow/providers/amazon/aws/hooks/eventbridge.py
Expand Up @@ -16,12 +16,75 @@
# under the License.
from __future__ import annotations

import json

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.utils import trim_none_values


def _validate_json(pattern: str) -> None:
try:
json.loads(pattern)
except ValueError:
raise ValueError("`event_pattern` must be a valid JSON string.")


class EventBridgeHook(AwsBaseHook):
"""Amazon EventBridge Hook."""

def __init__(self, *args, **kwargs):
"""Creating object."""
super().__init__(client_type="events", *args, **kwargs)

def put_rule(
self,
name: str,
description: str | None = None,
event_bus_name: str | None = None,
event_pattern: str | None = None,
role_arn: str | None = None,
schedule_expression: str | None = None,
state: str | None = None,
tags: list[dict] | None = None,
**kwargs,
):
"""
Create or update an EventBridge rule.
:param name: name of the rule to create or update (required)
:param description: description of the rule
:param event_bus_name: name or ARN of the event bus to associate with this rule
:param event_pattern: pattern of events to be matched to this rule
:param role_arn: the Amazon Resource Name of the IAM role associated with the rule
:param schedule_expression: the scheduling expression (for example, a cron or rate expression)
:param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
:param tags: list of key-value pairs to associate with the rule
"""
if not (event_pattern or schedule_expression):
raise ValueError(
"One of `event_pattern` or `schedule_expression` are required in order to "
"put or update your rule."
)

if state and state not in ["ENABLED", "DISABLED"]:
raise ValueError("`state` must be specified as ENABLED or DISABLED.")

if event_pattern:
_validate_json(event_pattern)

put_rule_kwargs: dict[str, str | list] = {
**trim_none_values(
{
"Name": name,
"Description": description,
"EventBusName": event_bus_name,
"EventPattern": event_pattern,
"RoleArn": role_arn,
"ScheduleExpression": schedule_expression,
"State": state,
"Tags": tags,
}
)
}

return self.conn.put_rule(**put_rule_kwargs)
85 changes: 85 additions & 0 deletions airflow/providers/amazon/aws/operators/eventbridge.py
Expand Up @@ -32,6 +32,10 @@ class EventBridgePutEventsOperator(BaseOperator):
"""
Put Events onto Amazon EventBridge.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EventBridgePutEventsOperator`
:param entries: the list of events to be put onto EventBridge, each event is a dict (required)
:param endpoint_id: the URL subdomain of the endpoint
:param aws_conn_id: the AWS connection to use
Expand Down Expand Up @@ -85,3 +89,84 @@ def execute(self, context: Context):

if self.do_xcom_push:
return [e["EventId"] for e in response["Entries"]]


class EventBridgePutRuleOperator(BaseOperator):
"""
Create or update a specified EventBridge rule.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:EventBridgePutRuleOperator`
:param name: name of the rule to create or update (required)
:param description: description of the rule
:param event_bus_name: name or ARN of the event bus to associate with this rule
:param event_pattern: pattern of events to be matched to this rule
:param role_arn: the Amazon Resource Name of the IAM role associated with the rule
:param schedule_expression: the scheduling expression (for example, a cron or rate expression)
:param state: indicates whether rule is set to be "ENABLED" or "DISABLED"
:param tags: list of key-value pairs to associate with the rule
:param region: the region where rule is to be created or updated
"""

template_fields: Sequence[str] = (
"aws_conn_id",
"name",
"description",
"event_bus_name",
"event_pattern",
"role_arn",
"schedule_expression",
"state",
"tags",
"region_name",
)

def __init__(
self,
*,
name: str,
description: str | None = None,
event_bus_name: str | None = None,
event_pattern: str | None = None,
role_arn: str | None = None,
schedule_expression: str | None = None,
state: str | None = None,
tags: list | None = None,
region_name: str | None = None,
aws_conn_id: str = "aws_default",
**kwargs,
):
super().__init__(**kwargs)
self.name = name
self.description = description
self.event_bus_name = event_bus_name
self.event_pattern = event_pattern
self.role_arn = role_arn
self.region_name = region_name
self.schedule_expression = schedule_expression
self.state = state
self.tags = tags
self.aws_conn_id = aws_conn_id

@cached_property
def hook(self) -> EventBridgeHook:
"""Create and return an EventBridgeHook."""
return EventBridgeHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)

def execute(self, context: Context):

self.log.info('Sending rule "%s" to EventBridge.', self.name)

return self.hook.put_rule(
name=self.name,
description=self.description,
event_bus_name=self.event_bus_name,
event_pattern=self.event_pattern,
role_arn=self.role_arn,
schedule_expression=self.schedule_expression,
state=self.state,
tags=self.tags,
)
24 changes: 21 additions & 3 deletions docs/apache-airflow-providers-amazon/operators/eventbridge.rst
Expand Up @@ -15,9 +15,9 @@
specific language governing permissions and limitations
under the License.
========================================
==================
Amazon EventBridge
========================================
==================

`Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a serverless event bus service that makes it easy
to connect your applications with data from a variety of sources. EventBridge delivers a stream of real-time data from
Expand All @@ -34,8 +34,11 @@ Prerequisite Tasks
Operators
---------

.. _howto/operator:EventBridgePutEventsOperator:


Send events to Amazon EventBridge
==========================================
=================================

To send custom events to Amazon EventBridge, use
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.
Expand All @@ -46,6 +49,21 @@ To send custom events to Amazon EventBridge, use
:start-after: [START howto_operator_eventbridge_put_events]
:end-before: [END howto_operator_eventbridge_put_events]

.. _howto/operator:EventBridgePutRuleOperator:


Create or update a rule on Amazon EventBridge
======================================================

To create or update a rule on EventBridge, use
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutRuleOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eventbridge.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eventbridge_put_rule]
:end-before: [END howto_operator_eventbridge_put_rule]


Reference
---------
Expand Down
11 changes: 11 additions & 0 deletions tests/providers/amazon/aws/hooks/test_eventbridge.py
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import pytest
from moto import mock_events

from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
Expand All @@ -26,3 +27,13 @@ class TestEventBridgeHook:
def test_conn_returns_a_boto3_connection(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
assert hook.conn is not None

def test_put_rule(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
response = hook.put_rule(name="test", event_pattern='{"source": ["aws.s3"]}', state="ENABLED")
assert "RuleArn" in response

def test_put_rule_with_bad_json_fails(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
with pytest.raises(ValueError):
hook.put_rule(name="test", event_pattern="invalid json", state="ENABLED")
44 changes: 38 additions & 6 deletions tests/providers/amazon/aws/operators/test_eventbridge.py
Expand Up @@ -23,17 +23,20 @@

from airflow import AirflowException
from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
from airflow.providers.amazon.aws.operators.eventbridge import EventBridgePutEventsOperator
from airflow.providers.amazon.aws.operators.eventbridge import (
EventBridgePutEventsOperator,
EventBridgePutRuleOperator,
)

TASK_ID = "events_putevents_job"
ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType": "test-detail-type"}]
FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode": "test_code"}]
EVENT_PATTERN = '{"source": ["aws.s3"]}'


class TestEventBridgePutEventsOperator:
def test_init(self):
operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
task_id="put_events_job",
entries=ENTRIES,
)

Expand All @@ -46,7 +49,7 @@ def test_execute(self, mock_conn: MagicMock):
mock_conn.put_events.return_value = hook_response

operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
task_id="put_events_job",
entries=ENTRIES,
)

Expand All @@ -56,7 +59,6 @@ def test_execute(self, mock_conn: MagicMock):

@mock.patch.object(EventBridgeHook, "conn")
def test_failed_to_send(self, mock_conn: MagicMock):

hook_response = {
"FailedEntryCount": 1,
"Entries": FAILED_ENTRIES_RESPONSE,
Expand All @@ -65,9 +67,39 @@ def test_failed_to_send(self, mock_conn: MagicMock):
mock_conn.put_events.return_value = hook_response

operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
task_id="failed_put_events_job",
entries=ENTRIES,
)

with pytest.raises(AirflowException):
operator.execute(None)


class TestEventBridgePutRuleOperator:
def test_init(self):
operator = EventBridgePutRuleOperator(
task_id="events_put_rule_job", name="match_s3_events", event_pattern=EVENT_PATTERN
)

assert operator.event_pattern == EVENT_PATTERN

@mock.patch.object(EventBridgeHook, "conn")
def test_execute(self, mock_conn: MagicMock):
hook_response = {"RuleArn": "arn:aws:events:us-east-1:123456789012:rule/test"}
mock_conn.put_rule.return_value = hook_response

operator = EventBridgePutRuleOperator(
task_id="events_put_rule_job", name="match_s3_events", event_pattern=EVENT_PATTERN
)

result = operator.execute(None)

assert result == hook_response

def test_put_rule_with_bad_json_fails(self):
operator = EventBridgePutRuleOperator(
task_id="failed_put_rule_job", name="match_s3_events", event_pattern="invalid json"
)

with pytest.raises(ValueError):
operator.execute(None)
25 changes: 22 additions & 3 deletions tests/system/providers/amazon/aws/example_eventbridge.py
Expand Up @@ -19,7 +19,12 @@
from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.eventbridge import EventBridgePutEventsOperator
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.eventbridge import (
EventBridgePutEventsOperator,
EventBridgePutRuleOperator,
)
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

DAG_ID = "example_eventbridge"
ENTRIES = [
Expand All @@ -31,20 +36,34 @@
}
]

sys_test_context_task = SystemTestContextBuilder().build()

with DAG(
dag_id=DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
tags=["example"],
catchup=False,
) as dag:
test_context = sys_test_context_task()

# [START howto_operator_eventbridge_put_events]
env_id = test_context[ENV_ID_KEY]

# [START howto_operator_eventbridge_put_events]
put_events = EventBridgePutEventsOperator(task_id="put_events_task", entries=ENTRIES)

# [END howto_operator_eventbridge_put_events]

# [START howto_operator_eventbridge_put_rule]
put_rule = EventBridgePutRuleOperator(
task_id="put_rule_task",
name="Example Rule",
event_pattern='{"source": ["example.myapp"]}',
description="This rule matches events from example.myapp.",
)
# [END howto_operator_eventbridge_put_rule]

chain(test_context, put_events, put_rule)


from tests.system.utils import get_test_run # noqa: E402

Expand Down

0 comments on commit 196d336

Please sign in to comment.