Skip to content

Commit

Permalink
Sammyt/detect issue in snapshots (#307)
Browse files Browse the repository at this point in the history
* feature: rename run with sync

* feature: add new command snapshot check

* feature: in snapshot check get date and display snapshot in markdown

* feature: implement modified line of summary

* feature: compute diff patterns

* feature: return the diff pattern

* feature: fix empty dataframe logging

* chore: add pytest

* feature: fix test
  • Loading branch information
Samox committed Jan 23, 2024
1 parent 3708f18 commit 2b373b0
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 18 deletions.
2 changes: 1 addition & 1 deletion tools/driftdb/driftdb/cli/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


@app.command()
def run(
def sync(
token: str = typer.Option(
os.environ.get("DATADRIFT_GITHUB_TOKEN", ""),
help="Token to access your repo. With PR and Content read and write rights",
Expand Down
57 changes: 41 additions & 16 deletions tools/driftdb/driftdb/cli/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import typer
from driftdb.dbt.snapshot import get_snapshot_dates, get_snapshot_diff, get_snapshot_nodes

from ..dbt.snapshot_to_drift import convert_snapshot_to_drift_summary
from .common import get_user_date_selection

app = typer.Typer()
Expand All @@ -15,22 +16,7 @@
@app.command()
def show(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
snapshot_nodes = get_snapshot_nodes()
if not snapshot_id:
questions = [
inquirer.List(
"choice",
message="Please choose a snapshot to show",
choices=[node["unique_id"] for node in snapshot_nodes],
),
]
answers = inquirer.prompt(questions)
if answers is None:
typer.echo("No choice selected. Exiting.")
raise typer.Exit(code=1)

snapshot_id = answers["choice"]

snapshot_node = [node for node in snapshot_nodes if node["unique_id"] == snapshot_id][0]
snapshot_node = get_or_prompt_snapshot_node(snapshot_id, snapshot_nodes)
snapshot_dates = get_snapshot_dates(snapshot_node)

snapshot_date = get_user_date_selection(snapshot_dates)
Expand Down Expand Up @@ -58,5 +44,44 @@ def show(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
webbrowser.open("file://" + html_file_path)


@app.command()
def check(snapshot_id: str = typer.Option(None, help="id of your snapshot")):
snapshot_node = get_or_prompt_snapshot_node(snapshot_id, get_snapshot_nodes())
snapshot_dates = get_snapshot_dates(snapshot_node)

snapshot_date = get_user_date_selection(snapshot_dates)
print(f"Getting {snapshot_node['unique_id']} for {snapshot_date}.")
diff = get_snapshot_diff(snapshot_node, snapshot_date)

print("Check drift for snapshot: " + snapshot_node["unique_id"])

drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=diff, id_column="month", date_column="month")

print("added_rows \n", drift_summary["added_rows"].to_markdown())
print("deleted_rows \n", drift_summary["deleted_rows"].to_markdown())
print("modified_patterns \n", drift_summary["modified_patterns"].to_markdown())
print("modified_rows_unique_keys \n", drift_summary["modified_rows_unique_keys"])


def get_or_prompt_snapshot_node(snapshot_id, snapshot_nodes):
if not snapshot_id:
questions = [
inquirer.List(
"choice",
message="Please choose a snapshot to show",
choices=[node["unique_id"] for node in snapshot_nodes],
),
]
answers = inquirer.prompt(questions)
if answers is None:
typer.echo("No choice selected. Exiting.")
raise typer.Exit(code=1)

snapshot_id = answers["choice"]

snapshot_node = [node for node in snapshot_nodes if node["unique_id"] == snapshot_id][0]
return snapshot_node


if __name__ == "__main__":
app()
3 changes: 2 additions & 1 deletion tools/driftdb/driftdb/dbt/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json

from pandas import DataFrame
from typing_extensions import List, TypedDict

from ..cli.common import dbt_adapter_query
Expand Down Expand Up @@ -49,7 +50,7 @@ def get_snapshot_dates(snapshot_node: SnapshotNode) -> List[str]:
return date_strings


def get_snapshot_diff(snapshot_node: SnapshotNode, snapshot_date: str):
def get_snapshot_diff(snapshot_node: SnapshotNode, snapshot_date: str) -> DataFrame:
from dbt.adapters.factory import get_adapter
from dbt.cli.main import dbtRunner
from dbt.config.runtime import RuntimeConfig, load_profile, load_project
Expand Down
72 changes: 72 additions & 0 deletions tools/driftdb/driftdb/dbt/snapshot_to_drift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import traceback

from driftdb.alerting.interface import DriftSummary
from driftdb.logger import get_logger
from pandas import DataFrame, Index

logger = get_logger("summarize_dataframe_updates")


def convert_snapshot_to_drift_summary(snapshot_diff: DataFrame, id_column="id", date_column="date") -> DriftSummary:
required_columns = [id_column, date_column, "record_status"]
for column in required_columns:
if column not in snapshot_diff.columns:
logger.warn(
f"The snapshot_diff DataFrame does not have the required column: {column}. Select a different id_column or date_column from {snapshot_diff.columns}"
)
raise ValueError(f"The snapshot_diff DataFrame does not have the required column: {column}")

initial_data = snapshot_diff[snapshot_diff["record_status"] == "before"].drop(
columns=["dbt_scd_id", "dbt_updated_at", "dbt_valid_from", "dbt_valid_to", "record_status"]
)
final_data = snapshot_diff[snapshot_diff["record_status"] == "after"].drop(
columns=["dbt_scd_id", "dbt_updated_at", "dbt_valid_from", "dbt_valid_to", "record_status"]
)

common_ids = initial_data[id_column][initial_data[id_column].isin(final_data[id_column])]
added_rows = final_data[~final_data[id_column].isin(common_ids)]
deleted_rows = initial_data[~initial_data[id_column].isin(common_ids)]

initial_data.set_index(id_column, inplace=True)
final_data.set_index(id_column, inplace=True)

# There may be rows that have not changed but pandas will consider them as changed
pattern_changes = {}
for key in common_ids:
for col in initial_data.columns:
try:
if initial_data.at[key, col] != final_data.at[key, col]:
old_value = initial_data.at[key, col]
new_value = final_data.at[key, col]
change_pattern = (col, old_value, new_value)
if change_pattern not in pattern_changes:
pattern_changes[change_pattern] = [key]
else:
pattern_changes[change_pattern].append(key)
except:
logger.warn(
f"Error while processing pattern change in row {key} and column {col} \n {traceback.format_exc()}"
)

patterns_list = []
for pattern, keys in pattern_changes.items():
col, old, new = pattern
patterns_list.append(
{
"unique_keys": keys,
"column": col,
"old_value": old,
"new_value": new,
"pattern_id": hash(pattern),
}
)

patterns_df = DataFrame(patterns_list)

driftSummary = DriftSummary(
added_rows=added_rows,
deleted_rows=deleted_rows,
modified_rows_unique_keys=Index(common_ids),
modified_patterns=DataFrame(patterns_df),
)
return driftSummary
1 change: 1 addition & 0 deletions tools/driftdb/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest
1 change: 1 addition & 0 deletions tools/driftdb/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ isort
pandas
pandas-stubs
PyGithub
tabulate
typing_extensions==4.7.1
typer[all]
76 changes: 76 additions & 0 deletions tools/driftdb/tests/test_convert_snapshot_diff_to_drift_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import unittest

import pandas as pd
import pytest
from driftdb.dbt.snapshot_to_drift import convert_snapshot_to_drift_summary


class TestConvertSnapshotDiffToDriftSummary(unittest.TestCase):
def test_convert_empty_snapshot(self):
with pytest.raises(ValueError):
convert_snapshot_to_drift_summary(pd.DataFrame())

def test_convert_snapshot_with_1_modification(self):
data = {
"month": [pd.Timestamp("2023-02-01"), pd.Timestamp("2023-02-01")],
"monthly_metric": [22547.9, 22547.9],
"dbt_scd_id": ["534dc75b6f6e75f74e2f6358959a3582", "db8cd259cd772f56da85969038a4267f"],
"dbt_updated_at": [pd.Timestamp("2023-12-26 09:23:18.569704"), pd.Timestamp("2024-01-22 16:01:31.618521")],
"dbt_valid_from": [pd.Timestamp("2023-12-26 09:23:18.569704"), pd.Timestamp("2024-01-22 16:01:31.618521")],
"dbt_valid_to": [pd.Timestamp("2024-01-22 16:01:31.618521"), pd.NaT],
"record_status": ["before", "after"],
}

df = pd.DataFrame(data)
drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")
assert drift_summary["added_rows"].empty
assert drift_summary["deleted_rows"].empty
assert drift_summary["modified_rows_unique_keys"].equals(pd.Index([pd.Timestamp("2023-02-01")]))

def test_convert_snapshot_with_1_addition(self):
data = {
"month": [pd.Timestamp("2022-11-01")],
"monthly_metric": [4007.95],
"dbt_scd_id": ["98353a44eca48d39bbb466c5351255b9"],
"dbt_updated_at": [pd.Timestamp("2023-12-26 09:23:18.569704")],
"dbt_valid_from": [pd.Timestamp("2023-12-26 09:23:18.569704")],
"dbt_valid_to": [pd.NaT],
"record_status": ["after"],
}

df = pd.DataFrame(data)

drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")

assert drift_summary["added_rows"].equals(
pd.DataFrame(
{
"month": [pd.Timestamp("2022-11-01")],
"monthly_metric": [4007.95],
}
)
)

def test_convert_snapshot_with_1_deletion(self):
data = {
"month": [pd.Timestamp("2022-11-01")],
"monthly_metric": [4007.95],
"dbt_scd_id": ["98353a44eca48d39bbb466c5351255b9"],
"dbt_updated_at": [pd.Timestamp("2023-12-26 09:23:18.569704")],
"dbt_valid_from": [pd.Timestamp("2023-12-26 09:23:18.569704")],
"dbt_valid_to": [pd.NaT],
"record_status": ["before"],
}

df = pd.DataFrame(data)

drift_summary = convert_snapshot_to_drift_summary(snapshot_diff=df, id_column="month", date_column="month")

assert drift_summary["deleted_rows"].equals(
pd.DataFrame(
{
"month": [pd.Timestamp("2022-11-01")],
"monthly_metric": [4007.95],
}
)
)

0 comments on commit 2b373b0

Please sign in to comment.