Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ pygit2 = "*"
types-docker = "*"
types-pygit2 = "*"
flask = "*"
jsondiff = "*"
boto3 = "*"
deepdiff = "*"

[dev-packages]
black = "*"
Expand Down
409 changes: 185 additions & 224 deletions Pipfile.lock

Large diffs are not rendered by default.

57 changes: 44 additions & 13 deletions abdiff/core/calc_ab_diffs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import logging
import time
from collections.abc import Generator
from pathlib import Path

import pyarrow as pa
import pyarrow.dataset as ds
from jsondiff import diff
from deepdiff import DeepDiff

from abdiff.core.utils import update_or_create_run_json, write_to_dataset

Expand All @@ -22,6 +23,7 @@
pa.field("record_a", pa.binary()),
pa.field("record_b", pa.binary()),
pa.field("ab_diff", pa.string()),
pa.field("modified_timdex_fields", pa.list_(pa.string())),
pa.field("has_diff", pa.string()),
)
)
Expand Down Expand Up @@ -61,23 +63,52 @@ def get_diffed_batches_iter(

# convert batch to pandas dataframe and calc values for new columns
df = batch.to_pandas() # noqa: PD901
df["ab_diff"] = df.apply(

# calculate all diffs and unpack into separate columns
diff_results = df.apply(
lambda row: calc_record_diff(row["record_a"], row["record_b"]), axis=1
)
df["has_diff"] = df["ab_diff"].apply(lambda diff_value: diff_value != "{}")
df["ab_diff"] = diff_results.apply(lambda x: x[0])
df["modified_timdex_fields"] = diff_results.apply(
lambda x: list(x[1]) if x[1] else []
)
df["has_diff"] = diff_results.apply(lambda x: x[2])

yield pa.RecordBatch.from_pandas(df) # type: ignore[attr-defined]


def calc_record_diff(record_a: bytes | None, record_b: bytes | None) -> str | None:
"""Calculate symmetric diff from two JSON strings."""
def calc_record_diff(
record_a: str | bytes | dict | None,
record_b: str | bytes | dict | None,
*,
ignore_order: bool = True,
report_repetition: bool = True,
) -> tuple[str | None, list[str] | None, bool]:
"""Calculate diff from two JSON byte strings.

The DeepDiff library has the property 'affected_root_keys' on the produced diff object
that is very useful for our purposes. At this time, we simply want to know if
anything about a particular root level TIMDEX field (e.g. 'dates' or 'title') has
changed which this method provides explicitly. We also serialize the full diff to
JSON via the to_json() method for storage and possible further analysis.

This method returns a tuple:
- ab_diff: [str] - full diff as JSON
- modified_timdex_fields: list[str] - list of modified root keys (TIMDEX fields)
- has_diff: bool - True/False if any diff present
"""
if record_a is None or record_b is None:
return None

return diff(
record_a.decode(),
record_b.decode(),
syntax="symmetric",
load=True,
dump=True,
return None, None, False

diff = DeepDiff(
json.loads(record_a) if isinstance(record_a, str | bytes) else record_a,
json.loads(record_b) if isinstance(record_b, str | bytes) else record_b,
ignore_order=ignore_order,
report_repetition=report_repetition,
)

ab_diff = diff.to_json()
modified_timdex_fields = diff.affected_root_keys
has_diff = bool(modified_timdex_fields)

return ab_diff, modified_timdex_fields, has_diff
57 changes: 14 additions & 43 deletions abdiff/core/calc_ab_metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# ruff: noqa: S608

import json
import logging
import os
import time
Expand Down Expand Up @@ -48,12 +47,11 @@ def create_record_diff_matrix_dataset(
) -> str:
"""Create a boolean sparse matrix of modified fields for all records.

This writes a single parquet file with rows for each record, and columns for each
TIMDEX field, and a value of integer 1 if that field has a diff and 0 if not. This
provides a handy way to calculate aggregate metrics for a given field or source in
later steps. The column "has_diff" is also carried over from the diffs dataset to
provide a single column to check if ANY of the field columns indicate a diff for a
record row.
This writes a single parquet file with rows for each record, columns for each TIMDEX
field, and a value of integer 1 if that field has a diff and 0 if not. This provides
a handy way to calculate aggregate metrics for a given field or source in later steps.
The column "has_diff" is also carried over from the diffs dataset to provide a single
column to check if ANY of the field columns indicate a diff for a record row.

This code momentarily creates a single dataframe in memory for all rows. This is safe
given the nature of the dataframe: there may be 10m rows, and potentially 20-30
Expand All @@ -66,32 +64,31 @@ def create_record_diff_matrix_dataset(
for i, batch in enumerate(
diffs_ds.to_batches(
batch_size=batch_size,
columns=["timdex_record_id", "source", "ab_diff", "has_diff"],
columns=["timdex_record_id", "source", "modified_timdex_fields", "has_diff"],
)
):
start_time = time.time()
batch_df = batch.to_pandas()

# parse diff JSON to dictionary for batch
batch_df["ab_diff"] = batch_df["ab_diff"].apply(
lambda diff_json: json.loads(diff_json)
)

batch_metrics = []
for _, row in batch_df.iterrows():
record_metrics = {
"timdex_record_id": row["timdex_record_id"],
"source": row["source"],
"has_diff": 1 if row["has_diff"] == "true" else 0,
"has_diff": (1 if row["has_diff"] == "true" else 0),
}
diff_data = row["ab_diff"]
record_metrics.update(generate_field_diff_bools_for_record(diff_data))

# for each modified field (root key in diff), set column and value = 1 (True)
if row["modified_timdex_fields"] is not None:
for field in row["modified_timdex_fields"]:
record_metrics[field] = 1

batch_metrics.append(record_metrics)

# build dataframe for batch
batch_metrics_df = pd.DataFrame(batch_metrics)
batch_metrics_dfs.append(batch_metrics_df)
logger.info(f"batch: {i+1}, elapsed: {time.time()-start_time}")
logger.info(f"batch: {i + 1}, elapsed: {time.time() - start_time}")

# concatenate all dataframes into single dataframe for writing and replace None with 0
metrics_df = pd.concat(batch_metrics_dfs)
Expand All @@ -107,32 +104,6 @@ def create_record_diff_matrix_dataset(
return metrics_dataset


def generate_field_diff_bools_for_record(diff_data: dict) -> dict:
"""Function to return dictionary of fields that have a diff.

Determining if a field had a diff is as straight-forward as looking to see if it shows
up in the parsed diff JSON. The fields may be at the root of the diff, or they could
be nested under "$insert" or "$delete" nodes in the diff.

If a field from the original A/B records are not in the diff at all, then they did not
have changes, and therefore will not receive a 1 here to indicate a diff.
"""
fields_with_diffs = {}

for key in diff_data:

# identify modified fields nested in $insert or $delete blocks
if key in ("$insert", "$delete"):
for subfield in diff_data[key]:
fields_with_diffs[subfield] = 1

# identified modified fields at root of diff
else:
fields_with_diffs[key] = 1

return fields_with_diffs


Comment on lines -110 to -135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woohoo! ✨

def calculate_metrics_data(field_matrix_parquet: str) -> dict:
"""Create a dictionary of metrics via DuckDB queries."""
summary: dict = {}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ disallow_untyped_defs = true
exclude = ["tests/", "output/"]

[[tool.mypy.overrides]]
module = ["docker", "docker.models.containers", "duckdb", "duckdb.duckdb", "jsondiff", "pandas", "ijson"]
module = ["docker", "docker.models.containers", "duckdb", "duckdb.duckdb", "deepdiff", "pandas", "ijson"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
61 changes: 54 additions & 7 deletions tests/test_calc_ab_diffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,66 @@
def test_calc_record_diff_has_diff():
a = {"color": "green"}
b = {"color": "red"}
assert calc_record_diff(json.dumps(a).encode(), json.dumps(b).encode()) == json.dumps(
{"color": ["green", "red"]}
ab_diff, modified_timdex_fields, has_diff = calc_record_diff(a, b)
assert ab_diff == json.dumps(
{"values_changed": {"root['color']": {"new_value": "red", "old_value": "green"}}}
)
assert modified_timdex_fields == {"color"}
assert has_diff


def test_calc_record_diff_no_diff():
a = {"color": "green"}
b = a
assert calc_record_diff(json.dumps(a).encode(), json.dumps(b).encode()) == json.dumps(
{} # no diff
)
ab_diff, modified_timdex_fields, has_diff = calc_record_diff(a, b)
assert ab_diff == json.dumps({}) # no diff
assert not modified_timdex_fields
assert not has_diff


def test_calc_record_diff_one_input_is_none():
a = {"color": "green"}
assert calc_record_diff(json.dumps(a).encode(), None) is None
assert calc_record_diff(a, None) == (None, None, False)


def test_calc_record_diff_array_by_default_order_not_a_diff():
"""Arrays with the same values, but differently ordered, not considered a diff."""
a = {"colors": ["green", "red"]}
b = {"colors": ["red", "green"]}
ab_diff, modified_timdex_fields, has_diff = calc_record_diff(a, b)
assert ab_diff == json.dumps({}) # no diff
assert not modified_timdex_fields
assert not has_diff


def test_calc_record_diff_array_set_flag_order_is_a_diff():
"""Arrays with the same values, but differently ordered, not considered a diff."""
a = {"colors": ["green", "red"]}
b = {"colors": ["red", "green"]}
_, _, has_diff = calc_record_diff(a, b, ignore_order=False)
assert has_diff


def test_calc_record_diff_array_repetition_is_reported_when_diff():
"""Same array values, but different in repetition, is considered a diff."""
a = {"colors": ["red", "green"]}
b = {"colors": ["red", "green", "green"]}
ab_diff, modified_timdex_fields, has_diff = calc_record_diff(a, b)
assert ab_diff == json.dumps(
{
"repetition_change": {
"root['colors'][1]": {
"old_repeat": 1,
"new_repeat": 2,
"old_indexes": [1],
"new_indexes": [1, 2],
"value": "green",
}
}
}
)
assert modified_timdex_fields == {"colors"}
assert has_diff


def test_diffed_batches_yields_pyarrow_record_batch(collated_dataset):
Expand All @@ -46,7 +90,9 @@ def test_diffed_batches_first_batch_has_diff(collated_dataset):
batch_one = next(batch_iter).to_pandas()
row = batch_one.iloc[0]

assert row.ab_diff == json.dumps({"color": ["green", "red"]})
assert row.ab_diff == json.dumps(
{"values_changed": {"root['color']": {"new_value": "red", "old_value": "green"}}}
)
assert row.has_diff


Expand All @@ -73,6 +119,7 @@ def test_calc_ab_diffs_writes_dataset(caplog, run_directory, collated_dataset_di
pa.field("record_a", pa.binary()),
pa.field("record_b", pa.binary()),
pa.field("ab_diff", pa.string()),
pa.field("modified_timdex_fields", pa.list_(pa.string())),
pa.field("has_diff", pa.string()),
)
)
22 changes: 0 additions & 22 deletions tests/test_calc_ab_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,10 @@
calc_ab_metrics,
calculate_metrics_data,
create_record_diff_matrix_dataset,
generate_field_diff_bools_for_record,
)
from abdiff.core.utils import load_dataset, read_run_json


def test_record_field_diffs_no_diffs():
diff_data = {}
assert generate_field_diff_bools_for_record(diff_data) == {}


def test_record_field_diffs_one_diff():
diff_data = {"color": "green"}
assert generate_field_diff_bools_for_record(diff_data) == {"color": 1}


def test_record_field_diffs_diff_from_inserts_and_deletes_counted_only_once():
diff_data = {
"$insert": {"fruits": "strawberry"},
"$delete": {"vegetables": "onion"},
}
assert generate_field_diff_bools_for_record(diff_data) == {
"fruits": 1,
"vegetables": 1,
}


def test_sparse_matrix_dataset_created_success(run_directory, diffs_dataset_directory):
diff_matrix_dataset_filepath = create_record_diff_matrix_dataset(
run_directory, diffs_dataset_directory
Expand Down