Skip to content

Commit

Permalink
Add Eventbridge PutEvents operator and hook (#32498)
Browse files Browse the repository at this point in the history
* Add Eventbridge PutEvents operator and hook

---------

Co-authored-by: Maham Ali <maahaam@amazon.com>
Co-authored-by: Raphaël Vandon <vandonr@amazon.com>
  • Loading branch information
3 people committed Jul 20, 2023
1 parent dff360e commit eea53a2
Show file tree
Hide file tree
Showing 8 changed files with 332 additions and 0 deletions.
27 changes: 27 additions & 0 deletions airflow/providers/amazon/aws/hooks/eventbridge.py
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook


class EventBridgeHook(AwsBaseHook):
"""Amazon EventBridge Hook."""

def __init__(self, *args, **kwargs):
"""Creating object."""
super().__init__(client_type="events", *args, **kwargs)
87 changes: 87 additions & 0 deletions airflow/providers/amazon/aws/operators/eventbridge.py
@@ -0,0 +1,87 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Sequence

from airflow import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
from airflow.providers.amazon.aws.utils import trim_none_values

if TYPE_CHECKING:
from airflow.utils.context import Context


class EventBridgePutEventsOperator(BaseOperator):
"""
Put Events onto Amazon EventBridge.
:param entries: the list of events to be put onto EventBridge, each event is a dict (required)
:param endpoint_id: the URL subdomain of the endpoint
:param aws_conn_id: the AWS connection to use
:param region_name: the region where events are to be sent
"""

template_fields: Sequence[str] = ("entries", "endpoint_id", "aws_conn_id", "region_name")

def __init__(
self,
*,
entries: list[dict],
endpoint_id: str | None = None,
aws_conn_id: str = "aws_default",
region_name: str | None = None,
**kwargs,
):
super().__init__(**kwargs)
self.entries = entries
self.endpoint_id = endpoint_id
self.aws_conn_id = aws_conn_id
self.region_name = region_name

@cached_property
def hook(self) -> EventBridgeHook:
"""Create and return an EventBridgeHook."""
return EventBridgeHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name)

def execute(self, context: Context):

response = self.hook.conn.put_events(
**trim_none_values(
{
"Entries": self.entries,
"EndpointId": self.endpoint_id,
}
)
)

self.log.info("Sent %d events to EventBridge.", len(self.entries))

if response.get("FailedEntryCount"):
for event in response["Entries"]:
if "ErrorCode" in event:
self.log.error(event)

raise AirflowException(
f"{response['FailedEntryCount']} entries in this request have failed to send."
)

if self.do_xcom_push:
return [e["EventId"] for e in response["Entries"]]
12 changes: 12 additions & 0 deletions airflow/providers/amazon/provider.yaml
Expand Up @@ -154,6 +154,12 @@ integrations:
- /docs/apache-airflow-providers-amazon/operators/emr/emr_serverless.rst
logo: /integration-logos/aws/Amazon-EMR_light-bg@4x.png
tags: [aws]
- integration-name: Amazon EventBridge
external-doc-url: https://docs.aws.amazon.com/eventbridge/latest/APIReference/Welcome.html
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/eventbridge.rst
logo: /integration-logos/aws/Amazon-EventBridge_64.png
tags: [aws]
- integration-name: Amazon Glacier
external-doc-url: https://aws.amazon.com/glacier/
logo: /integration-logos/aws/Amazon-S3-Glacier_light-bg@4x.png
Expand Down Expand Up @@ -307,6 +313,9 @@ operators:
- integration-name: Amazon EMR on EKS
python-modules:
- airflow.providers.amazon.aws.operators.emr
- integration-name: Amazon EventBridge
python-modules:
- airflow.providers.amazon.aws.operators.eventbridge
- integration-name: Amazon Glacier
python-modules:
- airflow.providers.amazon.aws.operators.glacier
Expand Down Expand Up @@ -457,6 +466,9 @@ hooks:
- integration-name: Amazon EMR on EKS
python-modules:
- airflow.providers.amazon.aws.hooks.emr
- integration-name: Amazon EventBridge
python-modules:
- airflow.providers.amazon.aws.hooks.eventbridge
- integration-name: Amazon Glacier
python-modules:
- airflow.providers.amazon.aws.hooks.glacier
Expand Down
53 changes: 53 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/eventbridge.rst
@@ -0,0 +1,53 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
========================================
Amazon EventBridge
========================================

`Amazon Eventbridge <https://docs.aws.amazon.com/eventbridge/>`__ is a serverless event bus service that makes it easy
to connect your applications with data from a variety of sources. EventBridge delivers a stream of real-time data from
your own applications, software-as-a-service (SaaS) applications, and AWS services and routes that data to targets such
as AWS Lambda. You can set up routing rules to determine where to send your data to build application architectures
that react in real time to all of your data sources. EventBridge enables you to build event-driven architectures
that are loosely coupled and distributed.

Prerequisite Tasks
------------------

.. include:: ../_partials/prerequisite_tasks.rst

Operators
---------

Send events to Amazon EventBridge
==========================================

To send custom events to Amazon EventBridge, use
:class:`~airflow.providers.amazon.aws.operators.eventbridge.EventBridgePutEventsOperator`.

.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eventbridge.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eventbridge_put_events]
:end-before: [END howto_operator_eventbridge_put_events]


Reference
---------

* `AWS boto3 library documentation for EventBridge <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/events.html>`__
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions tests/providers/amazon/aws/hooks/test_eventbridge.py
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from moto import mock_events

from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook


@mock_events
class TestEventBridgeHook:
def test_conn_returns_a_boto3_connection(self):
hook = EventBridgeHook(aws_conn_id="aws_default")
assert hook.conn is not None
73 changes: 73 additions & 0 deletions tests/providers/amazon/aws/operators/test_eventbridge.py
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock
from unittest.mock import MagicMock

import pytest

from airflow import AirflowException
from airflow.providers.amazon.aws.hooks.eventbridge import EventBridgeHook
from airflow.providers.amazon.aws.operators.eventbridge import EventBridgePutEventsOperator

TASK_ID = "events_putevents_job"
ENTRIES = [{"Detail": "test-detail", "Source": "test-source", "DetailType": "test-detail-type"}]
FAILED_ENTRIES_RESPONSE = [{"ErrorCode": "test_code"}, {"ErrorCode": "test_code"}]


class TestEventBridgePutEventsOperator:
def test_init(self):
operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
entries=ENTRIES,
)

assert operator.entries == ENTRIES

@mock.patch.object(EventBridgeHook, "conn")
def test_execute(self, mock_conn: MagicMock):
hook_response = {"FailedEntryCount": 0, "Entries": [{"EventId": "foobar"}]}

mock_conn.put_events.return_value = hook_response

operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
entries=ENTRIES,
)

result = operator.execute(None)

assert result == ["foobar"]

@mock.patch.object(EventBridgeHook, "conn")
def test_failed_to_send(self, mock_conn: MagicMock):

hook_response = {
"FailedEntryCount": 1,
"Entries": FAILED_ENTRIES_RESPONSE,
}

mock_conn.put_events.return_value = hook_response

operator = EventBridgePutEventsOperator(
task_id=TASK_ID,
entries=ENTRIES,
)

with pytest.raises(AirflowException):
operator.execute(None)
52 changes: 52 additions & 0 deletions tests/system/providers/amazon/aws/example_eventbridge.py
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.providers.amazon.aws.operators.eventbridge import EventBridgePutEventsOperator

DAG_ID = "example_eventbridge"
ENTRIES = [
{
"Detail": '{"event-name": "custom-event"}',
"EventBusName": "custom-bus",
"Source": "example.myapp",
"DetailType": "Sample Custom Event",
}
]

with DAG(
dag_id=DAG_ID,
schedule="@once",
start_date=datetime(2021, 1, 1),
tags=["example"],
catchup=False,
) as dag:

# [START howto_operator_eventbridge_put_events]

put_events = EventBridgePutEventsOperator(task_id="put_events_task", entries=ENTRIES)

# [END howto_operator_eventbridge_put_events]


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit eea53a2

Please sign in to comment.