Skip to content

Commit

Permalink
Sammyt/detect alerts in drift summary (#308)
Browse files Browse the repository at this point in the history
* feature: return a context instead of a summary

* feature: compute alert and print it

* feature: get configuration from user defined handler

* chore: bump version

* feature: use default handler

* feature: fix CLI usage

* chore: deploy

* chore: ignore deploy files
  • Loading branch information
Samox committed Feb 1, 2024
1 parent 2b373b0 commit e2f318b
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 28 deletions.
1 change: 1 addition & 0 deletions tools/driftdb/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
/driftdb-*

# C extensions
*.so
Expand Down
6 changes: 3 additions & 3 deletions tools/driftdb/driftdb/cli/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
import pandas as pd
import pytz
import typer
from driftdb.cli.server import start_server
from driftdb.connectors.github_connector import GithubConnector
from driftdb.connectors.local_connector import LocalConnector
from github.MainClass import Github

from ..cli.server import start_server
from ..connectors.github_connector import GithubConnector
from ..connectors.local_connector import LocalConnector
from ..logger import get_logger
from .common import dbt_adapter_query, prompt_from_list

Expand Down
2 changes: 1 addition & 1 deletion tools/driftdb/driftdb/cli/seed.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import typer
from driftdb.connectors.local_connector import LocalConnector
from ..connectors.local_connector import LocalConnector

from ..dataframe.seed import generate_dataframe, insert_drift
from .common import prompt_from_list
Expand Down
37 changes: 29 additions & 8 deletions tools/driftdb/driftdb/cli/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import base64
import json
import os
import webbrowser

import inquirer
import pkg_resources
import typer
from driftdb.dbt.snapshot import get_snapshot_dates, get_snapshot_diff, get_snapshot_nodes

from ..alerting.handlers import alert_drift_handler
from ..dbt.snapshot import get_snapshot_dates, get_snapshot_diff, get_snapshot_nodes
from ..dbt.snapshot_to_drift import convert_snapshot_to_drift_summary
from ..logger import get_logger
from ..user_defined_function import import_user_defined_function
from .common import get_user_date_selection

app = typer.Typer()

logger = get_logger(__name__)


@app.command()
def show(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
Expand Down Expand Up @@ -47,21 +53,36 @@ def show(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
@app.command()
def check(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
snapshot_node = get_or_prompt_snapshot_node(snapshot_id, get_snapshot_nodes())
snapshot_dates = get_snapshot_dates(snapshot_node)
handler = get_snapshot_handler(snapshot_node)
snapshot_date = get_user_date_selection(get_snapshot_dates(snapshot_node))

snapshot_date = get_user_date_selection(snapshot_dates)
print(f"Getting {snapshot_node['unique_id']} for {snapshot_date}.")
diff = get_snapshot_diff(snapshot_node, snapshot_date)

print("Check drift for snapshot: " + snapshot_node["unique_id"])

drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=diff, id_column="month", date_column="month")

diff = get_snapshot_diff(snapshot_node, snapshot_date)
context = convert_snapshot_to_drift_summary(snapshot_diff=diff, id_column="month", date_column="month")
alert = handler(context)
drift_summary = context.summary
print("added_rows \n", drift_summary["added_rows"].to_markdown())
print("deleted_rows \n", drift_summary["deleted_rows"].to_markdown())
print("modified_patterns \n", drift_summary["modified_patterns"].to_markdown())
print("modified_rows_unique_keys \n", drift_summary["modified_rows_unique_keys"])

print("should alert \n", alert.should_alert)
print("alert message \n", alert.message)


def get_snapshot_handler(snapshot_node):
snapshot_file_path = snapshot_node["original_file_path"]
directory_path = os.path.dirname(snapshot_file_path)
user_defined_file_path = os.path.join(directory_path, "datadrift.py")

try:
handler = import_user_defined_function(user_defined_file_path, "my_handler")
return handler
except:
logger.warn("No user defined handler found. Using default handler.")
return alert_drift_handler


def get_or_prompt_snapshot_node(snapshot_id, snapshot_nodes):
if not snapshot_id:
Expand Down
2 changes: 1 addition & 1 deletion tools/driftdb/driftdb/cli/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pandas as pd
import typer
from driftdb.connectors.local_connector import LocalConnector
from ..connectors.local_connector import LocalConnector

from .common import prompt_from_list

Expand Down
Empty file.
11 changes: 7 additions & 4 deletions tools/driftdb/driftdb/dbt/snapshot_to_drift.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import traceback

from driftdb.alerting.interface import DriftSummary
from driftdb.logger import get_logger
from pandas import DataFrame, Index

from ..alerting.interface import DriftEvaluatorContext, DriftSummary
from ..logger import get_logger

logger = get_logger("summarize_dataframe_updates")


def convert_snapshot_to_drift_summary(snapshot_diff: DataFrame, id_column="id", date_column="date") -> DriftSummary:
def convert_snapshot_to_drift_summary(
snapshot_diff: DataFrame, id_column="id", date_column="date"
) -> DriftEvaluatorContext:
required_columns = [id_column, date_column, "record_status"]
for column in required_columns:
if column not in snapshot_diff.columns:
Expand Down Expand Up @@ -69,4 +72,4 @@ def convert_snapshot_to_drift_summary(snapshot_diff: DataFrame, id_column="id",
modified_rows_unique_keys=Index(common_ids),
modified_patterns=DataFrame(patterns_df),
)
return driftSummary
return DriftEvaluatorContext(initial_data, final_data, driftSummary)
2 changes: 1 addition & 1 deletion tools/driftdb/driftdb/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import typer
from driftdb.cli.server import start_server

from . import version
from .cli.dbt import app as dbt
from .cli.seed import app as seed
from .cli.server import start_server
from .cli.snapshot import app as snapshot
from .cli.store import app as store

Expand Down
22 changes: 22 additions & 0 deletions tools/driftdb/driftdb/user_defined_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import importlib.util
import os
import sys


def import_user_defined_function(file_path, function_name):
file_path = os.path.abspath(file_path)

module_name = os.path.splitext(os.path.basename(file_path))[0]

print(f"Importing {module_name} from {file_path}")

spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)

user_function = getattr(module, function_name, None)
if not user_function:
raise ImportError(f"Function {function_name} not found in {file_path}")

return user_function
2 changes: 1 addition & 1 deletion tools/driftdb/driftdb/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.1.1"
version = "0.1.3-a.9"
6 changes: 5 additions & 1 deletion tools/driftdb/setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from driftdb.version import version
from setuptools import find_packages, setup

with open("requirements.txt") as f:
requirements = f.read().splitlines()


setup(
name="driftdb",
version=version,
Expand All @@ -26,5 +30,5 @@
],
},
python_requires=">=3.6",
install_requires=["pandas", "PyGithub", "click", "GitPython", "Faker", "typer"],
install_requires=requirements,
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def test_convert_snapshot_with_1_modification(self):
}

df = pd.DataFrame(data)
drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")
assert drift_summary["added_rows"].empty
assert drift_summary["deleted_rows"].empty
assert drift_summary["modified_rows_unique_keys"].equals(pd.Index([pd.Timestamp("2023-02-01")]))
context = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")
assert context.summary["added_rows"].empty
assert context.summary["deleted_rows"].empty
assert context.summary["modified_rows_unique_keys"].equals(pd.Index([pd.Timestamp("2023-02-01")]))

def test_convert_snapshot_with_1_addition(self):
data = {
Expand All @@ -40,9 +40,9 @@ def test_convert_snapshot_with_1_addition(self):

df = pd.DataFrame(data)

drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")
context = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")

assert drift_summary["added_rows"].equals(
assert context.summary["added_rows"].equals(
pd.DataFrame(
{
"month": [pd.Timestamp("2022-11-01")],
Expand All @@ -64,9 +64,9 @@ def test_convert_snapshot_with_1_deletion(self):

df = pd.DataFrame(data)

drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")
context = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")

assert drift_summary["deleted_rows"].equals(
assert context.summary["deleted_rows"].equals(
pd.DataFrame(
{
"month": [pd.Timestamp("2022-11-01")],
Expand Down

0 comments on commit e2f318b

Please sign in to comment.