Skip to content

Commit

Permalink
feature(providers): added OpsgenieNotifier (#35530)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Utkarsh Sharma <utkarsharma2@gmail.com>
Co-authored-by: Andrey Anshin <Andrey.Anshin@taragol.is>
  • Loading branch information
3 people committed Nov 20, 2023
1 parent 9207e7d commit 2d01cfd
Show file tree
Hide file tree
Showing 13 changed files with 461 additions and 0 deletions.
16 changes: 16 additions & 0 deletions airflow/providers/opsgenie/notifications/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
81 changes: 81 additions & 0 deletions airflow/providers/opsgenie/notifications/opsgenie.py
@@ -0,0 +1,81 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from functools import cached_property
from typing import TYPE_CHECKING, Sequence

from airflow.exceptions import AirflowOptionalProviderFeatureException

try:
from airflow.notifications.basenotifier import BaseNotifier
except ImportError:
raise AirflowOptionalProviderFeatureException(
"Failed to import BaseNotifier. This feature is only available in Airflow versions >= 2.6.0"
)

from airflow.providers.opsgenie.hooks.opsgenie import OpsgenieAlertHook

if TYPE_CHECKING:
from airflow.providers.opsgenie.typing.opsgenie import CreateAlertPayload
from airflow.utils.context import Context


class OpsgenieNotifier(BaseNotifier):
"""
This notifier allows you to post alerts to Opsgenie.
Accepts a connection that has an Opsgenie API key as the connection's password.
This notifier sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.
Each Opsgenie API key can be pre-configured to a team integration.
You can override these defaults in this notifier.
.. seealso::
For more information on how to use this notifier, take a look at the guide:
:ref:`howto/notifier:OpsgenieNotifier`
:param payload: The payload necessary for creating an alert.
:param opsgenie_conn_id: Optional. The name of the Opsgenie connection to use. Default conn_id is opsgenie_default
"""

template_fields: Sequence[str] = ("payload",)

def __init__(
self,
*,
payload: CreateAlertPayload,
opsgenie_conn_id: str = "opsgenie_default",
) -> None:
super().__init__()

self.payload = payload
self.opsgenie_conn_id = opsgenie_conn_id

@cached_property
def hook(self) -> OpsgenieAlertHook:
"""Opsgenie alert Hook."""
return OpsgenieAlertHook(self.opsgenie_conn_id)

def notify(self, context: Context) -> None:
"""Call the OpsgenieAlertHook to post message."""
self.hook.get_conn().create_alert(self.payload)


send_opsgenie_notification = OpsgenieNotifier
3 changes: 3 additions & 0 deletions airflow/providers/opsgenie/provider.yaml
Expand Up @@ -65,3 +65,6 @@ hooks:
connection-types:
- hook-class-name: airflow.providers.opsgenie.hooks.opsgenie.OpsgenieAlertHook
connection-type: opsgenie

notifications:
- airflow.providers.opsgenie.notifications.opsgenie.OpsgenieNotifier
16 changes: 16 additions & 0 deletions airflow/providers/opsgenie/typing/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
59 changes: 59 additions & 0 deletions airflow/providers/opsgenie/typing/opsgenie.py
@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TypedDict

from typing_extensions import NotRequired, Required # For compat with Python < 3.11


class CreateAlertPayload(TypedDict):
"""
Payload schema for creating an Opsgenie alert.
:param message: The Message of the Opsgenie alert.
:param alias: Client-defined identifier of the alert.
:param description: Description field of the alert.
:param responders: Teams, users, escalations and schedules that
the alert will be routed to send notifications.
:param visible_to: Teams and users that the alert will become visible
to without sending any notification.
:param actions: Custom actions that will be available for the alert.
:param tags: Tags of the alert.
:param details: Map of key-value pairs to use as custom properties of the alert.
:param entity: Entity field of the alert that is
generally used to specify which domain alert is related to.
:param source: Source field of the alert. Default value is
IP address of the incoming request.
:param priority: Priority level of the alert. Default value is P3.
:param user: Display name of the request owner.
:param note: Additional note that will be added while creating the alert.
"""

message: Required[str]
alias: NotRequired[str | None]
description: NotRequired[str | None]
responders: NotRequired[list[dict] | None]
visible_to: NotRequired[list[dict] | None]
actions: NotRequired[list[str] | None]
tags: NotRequired[list[str] | None]
details: NotRequired[dict | None]
entity: NotRequired[str | None]
source: NotRequired[str | None]
priority: NotRequired[str | None]
user: NotRequired[str | None]
note: NotRequired[str | None]
1 change: 1 addition & 0 deletions docs/apache-airflow-providers-opsgenie/index.rst
Expand Up @@ -35,6 +35,7 @@
:caption: Guides

Operators <operators/index>
Notifications <notifications/index>

.. toctree::
:hidden:
Expand Down
30 changes: 30 additions & 0 deletions docs/apache-airflow-providers-opsgenie/notifications/index.rst
@@ -0,0 +1,30 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Opsgenie Notifiers
====================

.. important:: This feature is only available in Airflow versions >= 2.6.0


.. toctree::
:maxdepth: 1
:glob:

*
@@ -0,0 +1,33 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _howto/notifier:OpsgenieNotifier:

Opsgenie Alert Notifier
=======================

Use the :class:`~airflow.providers.opsgenie.notifications.opsgenie.OpsgenieNotifier` to send an alert to opsgenie.


Using the Notifier
^^^^^^^^^^^^^^^^^^
Send an alert to Opsgenie with a specific message.

.. exampleinclude:: /../../tests/system/providers/opsgenie/example_opsgenie_notifier.py
:language: python
:start-after: [START howto_notifier_opsgenie]
:end-before: [END howto_notifier_opsgenie]
16 changes: 16 additions & 0 deletions tests/providers/opsgenie/notifications/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
119 changes: 119 additions & 0 deletions tests/providers/opsgenie/notifications/test_opsgenie.py
@@ -0,0 +1,119 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest import mock

import pytest

from airflow.operators.empty import EmptyOperator
from airflow.providers.opsgenie.hooks.opsgenie import OpsgenieAlertHook
from airflow.providers.opsgenie.notifications.opsgenie import OpsgenieNotifier, send_opsgenie_notification

pytestmark = pytest.mark.db_test


class TestOpsgenieNotifier:
_config = {
"message": "An example alert message",
"alias": "Life is too short for no alias",
"description": "Every alert needs a description",
"responders": [
{"id": "4513b7ea-3b91-438f-b7e4-e3e54af9147c", "type": "team"},
{"name": "NOC", "type": "team"},
{"id": "bb4d9938-c3c2-455d-aaab-727aa701c0d8", "type": "user"},
{"username": "trinity@opsgenie.com", "type": "user"},
{"id": "aee8a0de-c80f-4515-a232-501c0bc9d715", "type": "escalation"},
{"name": "Nightwatch Escalation", "type": "escalation"},
{"id": "80564037-1984-4f38-b98e-8a1f662df552", "type": "schedule"},
{"name": "First Responders Schedule", "type": "schedule"},
],
"visible_to": [
{"id": "4513b7ea-3b91-438f-b7e4-e3e54af9147c", "type": "team"},
{"name": "rocket_team", "type": "team"},
{"id": "bb4d9938-c3c2-455d-aaab-727aa701c0d8", "type": "user"},
{"username": "trinity@opsgenie.com", "type": "user"},
],
"actions": ["Restart", "AnExampleAction"],
"tags": ["OverwriteQuietHours", "Critical"],
"details": {"key1": "value1", "key2": "value2"},
"entity": "An example entity",
"source": "Airflow",
"priority": "P1",
"user": "Jesse",
"note": "Write this down",
}

expected_payload_dict = {
"message": _config["message"],
"alias": _config["alias"],
"description": _config["description"],
"responders": _config["responders"],
"visible_to": _config["visible_to"],
"actions": _config["actions"],
"tags": _config["tags"],
"details": _config["details"],
"entity": _config["entity"],
"source": _config["source"],
"priority": _config["priority"],
"user": _config["user"],
"note": _config["note"],
}

@mock.patch.object(OpsgenieAlertHook, "get_conn")
def test_notifier(self, mock_opsgenie_alert_hook, dag_maker):
with dag_maker("test_notifier") as dag:
EmptyOperator(task_id="task1")
notifier = send_opsgenie_notification(payload=self._config)
notifier({"dag": dag})
mock_opsgenie_alert_hook.return_value.create_alert.assert_called_once_with(self.expected_payload_dict)

@mock.patch.object(OpsgenieAlertHook, "get_conn")
def test_notifier_with_notifier_class(self, mock_opsgenie_alert_hook, dag_maker):
with dag_maker("test_notifier") as dag:
EmptyOperator(task_id="task1")
notifier = OpsgenieNotifier(payload=self._config)
notifier({"dag": dag})
mock_opsgenie_alert_hook.return_value.create_alert.assert_called_once_with(self.expected_payload_dict)

@mock.patch.object(OpsgenieAlertHook, "get_conn")
def test_notifier_templated(self, mock_opsgenie_alert_hook, dag_maker):
dag_id = "test_notifier"
with dag_maker(dag_id) as dag:
EmptyOperator(task_id="task1")

template_fields = ("message", "alias", "description", "entity", "priority", "note")
templated_config = {}
for key, value in self._config.items():
if key in template_fields:
templated_config[key] = value + " {{dag.dag_id}}"
else:
templated_config[key] = value

templated_expected_payload_dict = {}
for key, value in self.expected_payload_dict.items():
if key in template_fields:
templated_expected_payload_dict[key] = value + f" {dag_id}"
else:
templated_expected_payload_dict[key] = value

notifier = OpsgenieNotifier(payload=templated_config)
notifier({"dag": dag})
mock_opsgenie_alert_hook.return_value.create_alert.assert_called_once_with(
templated_expected_payload_dict
)

0 comments on commit 2d01cfd

Please sign in to comment.