Skip to content

Commit

Permalink
qc: add schema rules
Browse files Browse the repository at this point in the history
Schema rules validate dataframes against a JSON Schema
  • Loading branch information
abhidg committed Oct 18, 2023
1 parent d7e28be commit dbe9603
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
39 changes: 36 additions & 3 deletions adtl/qc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import json
import functools
from pathlib import Path
from typing import List, Union, TypedDict, Any, Optional, Dict
from typing import List, Union, TypedDict, Any, Optional, Dict, Callable, Tuple

import pandas as pd
import numpy as np
import fastjsonschema


class Rule(TypedDict):
Expand Down Expand Up @@ -104,8 +105,40 @@ def wrapper(df, **kwargs):
return decorator_rule


def schema(schema_path: Union[str, Path], pattern: str = "*.csv"):
pass
def schema(
schema_path: str, pattern: str = "*.csv", mostly: float = 0.95
) -> Callable[[pd.DataFrame], List[WorkUnitResult]]:
schema_path = Path(schema_path)
with schema_path.open() as fp:
schema = json.load(fp)
validator = fastjsonschema.compile(schema)

def rule_schema(df: pd.DataFrame):
valids: List[Tuple[bool, str, str]] = []
for row in df.to_dict(orient="records"):
try:
validator(row)
valids.append((True, "", ""))
except fastjsonschema.exceptions.JsonSchemaValueException as e:
valids.append((False, e.message, ";".join(e.path[1:])))
rows_success = sum(1 for v in valids if v[0] is True)
rows_fail = len(valids) - rows_success
ratio_success = rows_success / (rows_success + rows_fail)
return dict(
rows_success=rows_success,
rows_fail=rows_fail,
rows=rows_success + rows_fail,
ratio_success=ratio_success,
success=bool(ratio_success >= mostly),
mostly=mostly,
rows_fail_idx=[i for i, v in enumerate(valids) if v[0] is False],
fail_data=None,
)

rule_schema.__doc__ = f"{schema.get('title', schema_path)} schema"
rule_schema.__name__ = "schema_" + schema_path.stem.split(".")[0]
rule_schema.pattern = pattern
return rule_schema


def get_result_from_insertion(data: Dict[str, Any]) -> WorkUnitResult:
Expand Down
8 changes: 4 additions & 4 deletions adtl/qc/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ def render_result(result: Dict[str, Any], show_rule: bool = False) -> str:
def render_results_by_rule(
results: List[WorkUnitResult], rules: List[Rule]
) -> Dict[str, str]:
result_data = "\n".join(map(render_result, results))
rules_in_results = [r["rule"] for r in results]
def results_for_rule(rule_name: str) -> str:
return "\n".join(render_result(r) for r in results if r["rule"] == rule_name) # type: ignore

out = {}
for rule_name in rules_in_results:
for rule_name in set(r["rule"] for r in results):
rule = [r for r in rules if r["name"] == rule_name][0]
out[rule_name] = Template((TEMPLATES / "rule.html").read_text()).substitute(
dict(
Expand All @@ -66,7 +66,7 @@ def render_results_by_rule(
+ "</p>"
if rule["long_description"]
else "",
results=result_data,
results=results_for_rule(rule_name),
)
)
return out
Expand Down
4 changes: 2 additions & 2 deletions adtl/qc/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def collect_rules(root: Path = Path("qc")) -> List[Rule]:
module = importlib.import_module(
str(rule_file).replace(".py", "").replace("/", ".")
)
rules = [x for x in dir(module) if x.startswith("rule_")]
rules = [x for x in dir(module) if x.startswith("rule_") or x.startswith("schema_")]
all_rules.extend([make_rule(module, r) for r in rules])
return all_rules

Expand Down Expand Up @@ -118,7 +118,7 @@ def prepare_result_for_insertion(work_unit_result: WorkUnitResult) -> Dict[str,
result: Dict[str, Any] = copy.deepcopy(work_unit_result) # type: ignore
result["fail_data"] = (
""
if result["fail_data"].empty
if result.get("fail_data") is None or result["fail_data"].empty
else json.dumps(result["fail_data"].to_dict(orient="records"))
)
result["rows_fail_idx"] = (
Expand Down

0 comments on commit dbe9603

Please sign in to comment.