Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Migrate OpsgenieNotificationAction #9716

Merged
merged 18 commits into from
Apr 8, 2024
95 changes: 48 additions & 47 deletions great_expectations/checkpoint/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
json_encoders = {Renderer: lambda r: r.serialize()}

type: str
notify_on: Literal["all", "failure", "success"] = "all"

@property
def _using_cloud_context(self) -> bool:
Expand Down Expand Up @@ -164,6 +165,15 @@
def v1_run(self, checkpoint_result: CheckpointResult) -> str | dict:
raise NotImplementedError

def _is_enabled(self, success: bool) -> bool:
return (
self.notify_on == "all"
or self.notify_on == "success"
and success
or self.notify_on == "failure"
and not success
)


class DataDocsAction(ValidationAction):
def _build_data_docs(
Expand Down Expand Up @@ -326,13 +336,7 @@
payload["store_validation_result"]["validation_result_url"]
)
result = {"slack_notification_result": "none required"}
if (
self.notify_on == "all"
or self.notify_on == "success"
and validation_success
or self.notify_on == "failure"
and not validation_success
):
if self._is_enabled(success=validation_success):
query: Dict = self.renderer.render(
validation_result_suite,
data_docs_pages,
Expand Down Expand Up @@ -451,13 +455,7 @@
)

def _run_pypd_alert(self, dedup_key: str, message: str, success: bool):
if (
self.notify_on == "all"
or self.notify_on == "success"
and success
or self.notify_on == "failure"
and not success
):
if self._is_enabled(success=success):
pypd.api_key = self.api_key
pypd.EventV2.create(
data={
Expand Down Expand Up @@ -560,13 +558,7 @@
if payload[action_names]["class"] == "UpdateDataDocsAction":
data_docs_pages = payload[action_names]

if (
self.notify_on == "all"
or self.notify_on == "success"
and validation_success
or self.notify_on == "failure"
and not validation_success
):
if self._is_enabled(success=validation_success):
query = self.renderer.render(
validation_result_suite,
validation_result_suite_identifier,
Expand Down Expand Up @@ -625,6 +617,35 @@
renderer = _renderer
return renderer

@override
def v1_run(self, checkpoint_result: CheckpointResult) -> dict:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of overlap with _run. I might DRY this up, but if we can remove _run in the near term, I'm less stressed about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Legacy _run will be removed shortly!

validation_success = checkpoint_result.success or False
checkpoint_name = checkpoint_result.checkpoint_config.name

if self._is_enabled(success=validation_success):
settings = {
"api_key": self.api_key,
"region": self.region,
"priority": self.priority,
"tags": self.tags,
}

description = self.renderer.v1_render(checkpoint_result=checkpoint_result)

message = f"Great Expectations Checkpoint {checkpoint_name} "
if checkpoint_result.success:
message += "succeeded!"
else:
message += "failed!"

alert_result = send_opsgenie_alert(
query=description, message=message, settings=settings
)

return {"opsgenie_alert_result": alert_result}
else:
return {"opsgenie_alert_result": "No alert sent"}

Check warning on line 647 in great_expectations/checkpoint/actions.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/checkpoint/actions.py#L647

Added line #L647 was not covered by tests

@override
def _run( # type: ignore[override] # signature does not match parent # noqa: PLR0913
self,
Expand Down Expand Up @@ -653,13 +674,7 @@

validation_success = validation_result_suite.success

if (
self.notify_on == "all"
or self.notify_on == "success"
and validation_success
or self.notify_on == "failure"
and not validation_success
):
if self._is_enabled(success=validation_success):
expectation_suite_name = validation_result_suite.meta.get(
"expectation_suite_name", "__no_expectation_suite_name__"
)
Expand All @@ -673,7 +688,8 @@

description = self.renderer.render(validation_result_suite, None, None)

alert_result = send_opsgenie_alert(description, expectation_suite_name, settings)
message = f"Great Expectations suite {expectation_suite_name} failed"
alert_result = send_opsgenie_alert(description, message, settings)

return {"opsgenie_alert_result": alert_result}
else:
Expand Down Expand Up @@ -806,11 +822,7 @@
if payload[action_names]["class"] == "UpdateDataDocsAction":
data_docs_pages = payload[action_names]

if (
(self.notify_on == "all")
or (self.notify_on == "success" and validation_success)
or (self.notify_on == "failure" and not validation_success)
):
if self._is_enabled(success=validation_success):
title, html = self.renderer.render(
validation_result_suite, data_docs_pages, self.notify_with
)
Expand Down Expand Up @@ -1116,19 +1128,8 @@
checkpoint_identifier=None,
**kwargs,
):
suite_name: str = validation_result_suite.meta["expectation_suite_name"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused on this change. This is a separate action, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup but mypy kept flagging it as an issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably because of the change to meta below

if "batch_kwargs" in validation_result_suite.meta:
data_asset_name = validation_result_suite.meta["batch_kwargs"].get(
"data_asset_name", "__no_data_asset_name__"
)
elif "active_batch_definition" in validation_result_suite.meta:
data_asset_name = (
validation_result_suite.meta["active_batch_definition"].data_asset_name
if validation_result_suite.meta["active_batch_definition"].data_asset_name
else "__no_data_asset_name__"
)
else:
data_asset_name = "__no_data_asset_name__"
suite_name: str = validation_result_suite.suite_name
data_asset_name: str = validation_result_suite.asset_name or "__no_data_asset_name__"

validation_results: list = validation_result_suite.results
validation_results_serializable: list = convert_to_json_serializable(validation_results)
Expand Down
4 changes: 2 additions & 2 deletions great_expectations/checkpoint/types/checkpoint_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@
"expectation_suite_names": [expectation_suite_name],
}
else:
assets_validated_by_batch_id[batch_id]["validation_results"].append(
assets_validated_by_batch_id[batch_id]["validation_results"].append( # type: ignore[union-attr]

Check warning on line 291 in great_expectations/checkpoint/types/checkpoint_result.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/checkpoint/types/checkpoint_result.py#L291

Added line #L291 was not covered by tests
validation_result
)
assets_validated_by_batch_id[batch_id]["expectation_suite_names"].append(
assets_validated_by_batch_id[batch_id]["expectation_suite_names"].append( # type: ignore[union-attr]

Check warning on line 294 in great_expectations/checkpoint/types/checkpoint_result.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/checkpoint/types/checkpoint_result.py#L294

Added line #L294 was not covered by tests
Comment on lines +291 to +294
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the old checkpoint result (not sure why mypy is mad about it now). We're using a new Pydantic type so I think we can ignore these?

expectation_suite_name
)
self._data_assets_validated_by_batch_id = assets_validated_by_batch_id
Expand Down
24 changes: 10 additions & 14 deletions great_expectations/checkpoint/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@


# noinspection SpellCheckingInspection
def send_opsgenie_alert(query, suite_name, settings):
def send_opsgenie_alert(query: str, message: str, settings: dict) -> bool:
"""Creates an alert in Opsgenie."""
if settings["region"] is not None:
url = (
Expand All @@ -75,7 +75,7 @@

headers = {"Authorization": f"GenieKey {settings['api_key']}"}
payload = {
"message": f"Great Expectations suite {suite_name} failed",
"message": message,
"description": query,
"priority": settings["priority"], # allow this to be modified in settings
"tags": settings["tags"],
Expand All @@ -85,18 +85,14 @@

try:
response = session.post(url, headers=headers, json=payload)
except requests.ConnectionError:
logger.warning("Failed to connect to Opsgenie")
except Exception as e:
logger.error(str(e)) # noqa: TRY400
else:
if response.status_code != 202: # noqa: PLR2004
logger.warning(
"Request to Opsgenie API " f"returned error {response.status_code}: {response.text}"
)
else:
return "success"
return "error"
response.raise_for_status()
except requests.ConnectionError as e:
logger.warning(f"Failed to connect to Opsgenie: {e}")
return False

Check warning on line 91 in great_expectations/checkpoint/util.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/checkpoint/util.py#L90-L91

Added lines #L90 - L91 were not covered by tests
except requests.HTTPError as e:
logger.warning(f"Request to Opsgenie API returned error {response.status_code}: {e}")
return False
return True


def send_microsoft_teams_notifications(query, microsoft_teams_webhook):
Expand Down
3 changes: 2 additions & 1 deletion great_expectations/checkpoint/v1_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ def _root_validate_result(cls, values: dict) -> dict:
if len(run_results) == 0:
raise ValueError("CheckpointResult must contain at least one run result") # noqa: TRY003

values["success"] = all(result.success for result in run_results.values())
if values["success"] is None:
values["success"] = all(result.success for result in run_results.values())
return values

@property
Expand Down
8 changes: 7 additions & 1 deletion great_expectations/core/expectation_validation_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@
suite_name: str,
evaluation_parameters: Optional[dict] = None,
statistics: Optional[dict] = None,
meta: Optional[ExpectationSuiteValidationResult | dict] = None,
meta: Optional[ExpectationSuiteValidationResultMeta | dict] = None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been wrong for a while (thank you @tyler-hoffman for pointing it out). Some minor changes in this PR to adhere to the proper type

batch_id: Optional[str] = None,
result_url: Optional[str] = None,
id: Optional[str] = None,
Expand All @@ -532,6 +532,12 @@
self.id = id
self._metrics: dict = {}

@property
def asset_name(self) -> str | None:
if "active_batch_definition" in self.meta:
return self.meta["active_batch_definition"].data_asset_name

Check warning on line 538 in great_expectations/core/expectation_validation_result.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/core/expectation_validation_result.py#L538

Added line #L538 was not covered by tests
return None
Comment on lines +535 to +539
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to make this a top-level property (might write a ticket around it) but not pressing


def __eq__(self, other):
"""ExpectationSuiteValidationResult equality ignores instance identity, relying only on properties.""" # noqa: E501
if not isinstance(other, self.__class__):
Expand Down
49 changes: 49 additions & 0 deletions great_expectations/render/renderer/opsgenie_renderer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,63 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

logger = logging.getLogger(__name__)

from great_expectations.core.id_dict import BatchKwargs
from great_expectations.render.renderer.renderer import Renderer

if TYPE_CHECKING:
from great_expectations.checkpoint.v1_checkpoint import CheckpointResult
from great_expectations.core.expectation_validation_result import (

Check warning on line 13 in great_expectations/render/renderer/opsgenie_renderer.py

View check run for this annotation

Codecov / codecov/patch

great_expectations/render/renderer/opsgenie_renderer.py#L12-L13

Added lines #L12 - L13 were not covered by tests
ExpectationSuiteValidationResult,
)


class OpsgenieRenderer(Renderer):
def v1_render(self, checkpoint_result: CheckpointResult):
text_blocks: list[str] = []
for run_result in checkpoint_result.run_results.values():
text_block = self._render_validation_result(result=run_result)
text_blocks.append(text_block)

return self._concatenate_text_blocks(
checkpoint_result=checkpoint_result, text_blocks=text_blocks
)

def _render_validation_result(self, result: ExpectationSuiteValidationResult) -> str:
suite_name = result.suite_name
data_asset_name = result.asset_name or "__no_data_asset_name__"
n_checks_succeeded = result.statistics["successful_expectations"]
n_checks = result.statistics["evaluated_expectations"]
run_id = result.meta.get("run_id", "__no_run_id__")
batch_id = result.batch_id or "__no_batch_id__"
check_details_text = f"{n_checks_succeeded} of {n_checks} expectations were met"

if result.success:
status = "Success 🎉"
else:
status = "Failed ❌"

return f"""Batch Validation Status: {status}
Expectation Suite Name: {suite_name}
Data Asset Name: {data_asset_name}
Run ID: {run_id}
Batch ID: {batch_id}
Summary: {check_details_text}"""

def _concatenate_text_blocks(
self, checkpoint_result: CheckpointResult, text_blocks: list[str]
) -> str:
checkpoint_name = checkpoint_result.checkpoint_config.name
success = checkpoint_result.success
run_id = checkpoint_result.run_id.run_time

title = f"Checkpoint: {checkpoint_name} - Run ID: {run_id}"
status = "Status: Failed ❌" if not success else "Status: Success 🎉"
return f"{title}\n{status}\n\n" + "\n\n".join(text_blocks)

def render(
self,
validation_result=None,
Expand Down