Skip to content

Commit

Permalink
Sammyt/add alert connector (#311)
Browse files Browse the repository at this point in the history
* feature: handle null date in show snapshot

* feature: rename my_handler by drift handler

* feature: get transport from user defined file

* feature: function loader handles array of function

* feature: add alert transport

* feature: replace print by console alerttransport

* feature: implement github alert transport

* feature: move the datadrift config in a .datadrift.py

* feature: clean transport folder

* feature: make compatibility with 3.8

* feature: add some logs

* feature: add a gitlab transport
  • Loading branch information
Samox committed Feb 2, 2024
1 parent c9a9ee0 commit 3500977
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 26 deletions.
4 changes: 4 additions & 0 deletions tools/driftdb/driftdb/alerting/transport/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .console import ConsoleAlertTransport
from .github import GithubAlertTransport
from .gitlab import GitlabAlertTransport
from .interface import AbstractAlertTransport
16 changes: 16 additions & 0 deletions tools/driftdb/driftdb/alerting/transport/console.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

from ..interface import DriftEvaluation, DriftEvaluatorContext
from .interface import AbstractAlertTransport


class ConsoleAlertTransport(AbstractAlertTransport):
def send(self, title: str, drift_evalutation: DriftEvaluation, drift_context: DriftEvaluatorContext) -> None:
print(title)
drift_summary = drift_context.summary
print("added_rows \n", drift_summary["added_rows"].to_markdown())
print("deleted_rows \n", drift_summary["deleted_rows"].to_markdown())
print("modified_patterns \n", drift_summary["modified_patterns"].to_markdown())
print("modified_rows_unique_keys \n", drift_summary["modified_rows_unique_keys"])

print("should alert \n", drift_evalutation.should_alert)
print("alert message \n", drift_evalutation.message)
29 changes: 29 additions & 0 deletions tools/driftdb/driftdb/alerting/transport/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from github import GithubException, Repository
from github.MainClass import Github

from ..interface import DriftEvaluation, DriftEvaluatorContext
from .interface import AbstractAlertTransport


class GithubAlertTransport(AbstractAlertTransport):
def __init__(self, github_client: Github, repository_name: str, assignees = []) -> None:
self.github_client = github_client
self.repo = github_client.get_repo(repository_name)
self.assignees = assignees

def send(self, title: str, drift_evalutation: DriftEvaluation, drift_context: DriftEvaluatorContext) -> None:
if not drift_evalutation.should_alert:
print("No alert needed")
return
try:
issue = self.repo.create_issue(
title=title,
body=drift_evalutation.message,
assignees=self.assignees,
)
print("Issue created: " + issue.html_url)
except GithubException as e:
if e.status == 422:
print("Issue already exists. skipping...")
else:
raise e
22 changes: 22 additions & 0 deletions tools/driftdb/driftdb/alerting/transport/gitlab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from gitlab import Gitlab

from ..interface import DriftEvaluation, DriftEvaluatorContext
from .interface import AbstractAlertTransport


class GitlabAlertTransport(AbstractAlertTransport):
def __init__(self, gitlab_client: Gitlab, project_id: str, assignees = []) -> None:
self.gitlab_client = gitlab_client
self.project = gitlab_client.projects.get(project_id)
self.assignees = assignees

def send(self, title: str, drift_evalutation: DriftEvaluation, drift_context: DriftEvaluatorContext) -> None:
if not drift_evalutation.should_alert:
print("No alert needed")
return
try:
issue = self.project.issues.create({'title': title, 'description': drift_evalutation.message})
print("Issue created: " + issue.attributes['web_url'])
except Exception as e:
print("Error creating the issue")
raise e
9 changes: 9 additions & 0 deletions tools/driftdb/driftdb/alerting/transport/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod

from ..interface import DriftEvaluation, DriftEvaluatorContext


class AbstractAlertTransport(ABC):
@abstractmethod
def send(self, title: str, drift_evalutation: DriftEvaluation, drift_context: DriftEvaluatorContext) -> None:
pass
6 changes: 2 additions & 4 deletions tools/driftdb/driftdb/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing_extensions import List, Optional


def prompt_from_list(prompt: str, choices: list):
def prompt_from_list(prompt: str, choices: List):
for idx, choice in enumerate(choices, start=1):
typer.echo(f"{idx}: {choice}")

Expand All @@ -32,13 +32,11 @@ def find_date_starting_with(dates, input_date):
return date
return None

def get_user_date_selection(dates: List[str], input_date: str) -> Optional[str]:
def get_user_date_selection(dates: List[str], input_date: Optional[str] = None) -> Optional[str]:
if input_date is not None:
if input_date == "today":
input_date = datetime.today().date().strftime("%Y-%m-%d")


print("dates",input_date)
matching_date = find_date_starting_with(dates, input_date)
return matching_date

Expand Down
47 changes: 32 additions & 15 deletions tools/driftdb/driftdb/cli/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typer

from ..alerting.handlers import alert_drift_handler
from ..alerting.transport import AbstractAlertTransport, ConsoleAlertTransport
from ..dbt.snapshot import (get_snapshot_dates, get_snapshot_diff,
get_snapshot_nodes)
from ..dbt.snapshot_to_drift import convert_snapshot_to_drift_summary
Expand All @@ -26,6 +27,9 @@ def show(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
snapshot_dates = get_snapshot_dates(snapshot_node)

snapshot_date = get_user_date_selection(snapshot_dates)
if snapshot_date is None:
typer.echo("No snapshot data for selected date. Exiting.")
raise typer.Exit(code=1)

diff = get_snapshot_diff(snapshot_node, snapshot_date)

Expand Down Expand Up @@ -53,7 +57,6 @@ def show(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
@app.command()
def check(snapshot_id: str = typer.Option(None, help="id of your snapshot"), date: str = typer.Option(None, help="date of your snapshot")):
snapshot_node = get_or_prompt_snapshot_node(snapshot_id, get_snapshot_nodes())
handler = get_snapshot_handler(snapshot_node)
snapshot_date = get_user_date_selection(get_snapshot_dates(snapshot_node), date)

if snapshot_date is None:
Expand All @@ -62,30 +65,44 @@ def check(snapshot_id: str = typer.Option(None, help="id of your snapshot"), dat

print(f"Getting {snapshot_node['unique_id']} for {snapshot_date}.")

[drift_handler, alert_transport] = get_user_defined_handlers(snapshot_node)

if not isinstance(alert_transport, AbstractAlertTransport):
print("Alert transport is not an instance of AbstractAlertTransport, defaulting to ConsoleAlertTransport.")
alert_transport = ConsoleAlertTransport()


diff = get_snapshot_diff(snapshot_node, snapshot_date)
context = convert_snapshot_to_drift_summary(snapshot_diff=diff, id_column="month", date_column="month")
alert = handler(context)
drift_summary = context.summary
print("added_rows \n", drift_summary["added_rows"].to_markdown())
print("deleted_rows \n", drift_summary["deleted_rows"].to_markdown())
print("modified_patterns \n", drift_summary["modified_patterns"].to_markdown())
print("modified_rows_unique_keys \n", drift_summary["modified_rows_unique_keys"])
alert = drift_handler(context)
alert_title = f"Drift alert for {snapshot_node['unique_id']} on {snapshot_date}"
print("alert_title", alert_title)
try:
alert_transport.send(alert_title, alert, context)
except Exception as e:
logger.error(f"Error sending alert: {e}")
logger.error("Alert not sent.")
raise typer.Exit(code=1)

print("should alert \n", alert.should_alert)
print("alert message \n", alert.message)
logger.info("Done.")


def get_snapshot_handler(snapshot_node):
def get_user_defined_handlers(snapshot_node):
snapshot_file_path = snapshot_node["original_file_path"]
directory_path = os.path.dirname(snapshot_file_path)
user_defined_file_path = os.path.join(directory_path, "datadrift.py")
snapshot_file_name = os.path.basename(snapshot_file_path)
snapshot_file_name_without_extension, _ = os.path.splitext(snapshot_file_name)
user_defined_file_path = os.path.join(directory_path, f"{snapshot_file_name_without_extension}.datadrift.py")

print(f"Looking for user defined handlers in {user_defined_file_path}")

try:
handler = import_user_defined_function(user_defined_file_path, "my_handler")
return handler
except:
[drift_handler, alert_transport] = import_user_defined_function(user_defined_file_path,[ "drift_handler", "alert_transport"])
return [drift_handler, alert_transport]
except Exception as e:
logger.error(f"Error importing user defined handler: {e}")
logger.warn("No user defined handler found. Using default handler.")
return alert_drift_handler
return [alert_drift_handler, None]


def get_or_prompt_snapshot_node(snapshot_id, snapshot_nodes):
Expand Down
17 changes: 11 additions & 6 deletions tools/driftdb/driftdb/user_defined_function.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import importlib.util
import os
import sys
from typing import List


def import_user_defined_function(file_path, function_name):
def import_user_defined_function(file_path: str, function_names: List[str]):
file_path = os.path.abspath(file_path)

module_name = os.path.splitext(os.path.basename(file_path))[0]
Expand All @@ -14,9 +15,13 @@ def import_user_defined_function(file_path, function_name):
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
user_functions = []
for function_name in function_names:
user_function = getattr(module, function_name, None)
if not user_function:
print(f"Function {function_name} not found in {module_name}")
user_functions.append(None)
continue
user_functions.append(user_function)

user_function = getattr(module, function_name, None)
if not user_function:
raise ImportError(f"Function {function_name} not found in {file_path}")

return user_function
return user_functions
2 changes: 1 addition & 1 deletion tools/driftdb/driftdb/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.1.3-a.10"
version = "0.1.3-a.13"

0 comments on commit 3500977

Please sign in to comment.