Skip to content

Commit

Permalink
qc: rework to remove SQLite saving
Browse files Browse the repository at this point in the history
Do not save to DB as data is corrupted on insert
  • Loading branch information
abhidg committed Oct 19, 2023
1 parent 7a1c6aa commit 1cd6338
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 163 deletions.
28 changes: 12 additions & 16 deletions adtl/qc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,22 @@ def wrapper(df, **kwargs):
ratio_success=ratio_success,
success=bool(ratio_success >= mostly),
mostly=mostly,
rows_fail_idx=rows_fail_idx,
fail_data=df.loc[rows_fail_idx][columns],
rows_fail_idx=",".join(map(str, rows_fail_idx)),
fail_data=json.dumps(
df.loc[rows_fail_idx][columns].to_dict(orient="records"),
sort_keys=True,
),
)
elif isinstance(series, bool):
return dict(
rows_success=None,
rows_fail=None,
rows=None,
ratio_success=None,
success=series,
mostly=mostly,
rows_fail_idx=None,
fail_data=None,
)

return wrapper
Expand All @@ -108,7 +115,7 @@ def wrapper(df, **kwargs):
def schema(
schema_path: str, pattern: str = "*.csv", mostly: float = 0.95
) -> Callable[[pd.DataFrame], List[WorkUnitResult]]:
schema_path = Path(schema_path)
schema_path = Path(schema_path) # type: ignore
with schema_path.open() as fp:
schema = json.load(fp)
validator = fastjsonschema.compile(schema)
Expand Down Expand Up @@ -142,10 +149,10 @@ def rule_schema(df: pd.DataFrame):
res.append(
dict(
rows=count,
rows_success=None,
rows_success=0,
rows_fail=count,
ratio_success=0,
success=0,
success=False,
mostly=0,
rows_fail_idx=list(
valid_data.loc[valid_data.reason == reason].index
Expand All @@ -162,17 +169,6 @@ def rule_schema(df: pd.DataFrame):
return rule_schema


def get_result_from_insertion(data: Dict[str, Any]) -> WorkUnitResult:
result: Dict[str, Any] = copy.deepcopy(data) # type: ignore
if result.get("fail_data"):
result["fail_data"] = pd.DataFrame(json.loads(result["fail_data"]))
if result.get("rows_fail_idx"):
result["rows_fail_idx"] = [
int(float(x)) for x in str(result["rows_fail_idx"]).split(",")
]
return result


def main(args=None):
from .runner import _main

Expand Down
98 changes: 36 additions & 62 deletions adtl/qc/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
"""

import json
import sqlite3
from string import Template
from pathlib import Path
from typing import List, Any, Dict
from functools import partial

import pandas as pd

from . import get_result_from_insertion, WorkUnitResult, Rule
from . import Rule

RULES_SUBFOLDER = "r"
DATASET_SUBFOLDER = "d"
Expand All @@ -21,22 +20,28 @@


def render_result(result: Dict[str, Any], show_rule: bool = False) -> str:
result = get_result_from_insertion(result) # type: ignore
result["reason"] = result.get("reason", "")
result["reason_str"] = f" ({result['reason']}) " if result["reason"] else ""
result["rule_str"] = (
f"""<a href="../r/{result["rule"]}.html">{result["rule"]}</a>, """
if show_rule
else ""
)
tmpl = (
"<li><tt>[{rows_fail} / {rows}]</tt> {rule_str}{reason_str}{dataset} / {file}".format(
**result
)
if result["success"] != 1
else "<li>✔ {rule_str}{reason_str}{dataset} / {file}</li>".format(**result)
)
if result.get("fail_data"):
if not result["success"]:
if result["reason_str"] and result["mostly"] == 0: # schema reasons
tmpl = (
"<li><tt>[{rows_fail}]</tt> {rule_str} {reason_str}<strong>{dataset}</strong>: {file}"
).format(**result)
else:
tmpl = "<li><tt>[{rows_fail} / {rows}]</tt> {rule_str}{reason_str}<strong>{dataset}</strong>: {file}".format(
**result
)
else:
tmpl = (
"<li>✔ {rule_str} {reason_str}<strong>{dataset}</strong>: {file}</li>"
).format(**result)

if result.get("fail_data") and json.loads(result["fail_data"]):
fail_data = pd.DataFrame(json.loads(result["fail_data"]))
tmpl += """
<details>
Expand All @@ -50,14 +55,17 @@ def render_result(result: Dict[str, Any], show_rule: bool = False) -> str:
return tmpl


def render_results_by_rule(
results: List[WorkUnitResult], rules: List[Rule]
) -> Dict[str, str]:
def render_results_by_rule(results: pd.DataFrame, rules: List[Rule]) -> Dict[str, str]:
def results_for_rule(rule_name: str) -> str:
return "\n".join(render_result(r) for r in results if r["rule"] == rule_name) # type: ignore
return "\n".join(
map(
render_result,
results[results.rule == rule_name].to_dict(orient="records"),
)
) # type: ignore

out = {}
for rule_name in set(r["rule"] for r in results):
for rule_name in results.rule.unique():
rule = [r for r in rules if r["name"] == rule_name][0]
out[rule_name] = Template((TEMPLATES / "rule.html").read_text()).substitute(
dict(
Expand All @@ -75,24 +83,24 @@ def results_for_rule(rule_name: str) -> str:


def render_results_by_dataset(
results: List[WorkUnitResult], datasets: List[str]
results: pd.DataFrame, datasets: List[str]
) -> Dict[str, str]:
def filter_dataset(dataset: str) -> List[WorkUnitResult]:
return [r for r in results if r["dataset"] == dataset]

out = {}

for dataset in datasets:
result_data = "\n".join(
map(partial(render_result, show_rule=True), filter_dataset(dataset))
)
map(
partial(render_result, show_rule=True),
results[results.dataset == dataset].to_dict(orient="records"),
)
) # type: ignore
out[dataset] = Template((TEMPLATES / "dataset.html").read_text()).substitute(
dataset=dataset, results=result_data
)
return out


def render_index(rules: List[Dict[str, Any]], datasets: List[str]) -> str:
def render_index(rules: List[Rule], datasets: List[str]) -> str:
dataset_index = "\n".join(
f"""<li><a href="d/{dataset}.html">{dataset}</a></li>""" for dataset in datasets
)
Expand All @@ -105,49 +113,19 @@ def render_index(rules: List[Dict[str, Any]], datasets: List[str]) -> str:
)


def read_sql(
conn: sqlite3.Connection, sql: str, columns: List[str]
) -> List[Dict[str, Any]]:
cur = conn.cursor()
res = cur.execute(sql)
return [dict(zip(columns, r)) for r in res.fetchall()]


def make_report(store_database: str, output_folder: Path = Path("qc_report")):
def make_report(
results: pd.DataFrame, rules: List[Rule], output_folder: Path = Path("qc_report")
):
"Makes report from results database"

output_folder.mkdir(exist_ok=True)
(output_folder / "r").mkdir(exist_ok=True)
(output_folder / "d").mkdir(exist_ok=True)

conn = sqlite3.connect(store_database)
datasets = read_sql(conn, "SELECT DISTINCT dataset FROM results", ["dataset"])
datasets = [n["dataset"] if n["dataset"] else "_unlabelled" for n in datasets]
rules = read_sql(
conn,
"SELECT name, description, long_description FROM rules",
["name", "description", "long_description"],
)
datasets = list(results.dataset.unique())
(output_folder / "style.css").write_text(STYLE.read_text())
(output_folder / INDEX).write_text(render_index(rules, datasets))
results = read_sql(
conn,
"SELECT * from results",
[
"rule",
"dataset",
"file",
"rows_success",
"rows_fail",
"rows",
"ratio_success",
"rows_fail_idx",
"success",
"mostly",
"reason",
"fail_data",
],
)

results_by_rule = render_results_by_rule(results, rules)
results_by_dataset = render_results_by_dataset(results, datasets)
for rule in results_by_rule:
Expand All @@ -158,7 +136,3 @@ def make_report(store_database: str, output_folder: Path = Path("qc_report")):
results_by_dataset[dataset]
)
print(f"wrote d/{dataset}.html")


if __name__ == "__main__":
make_report("adtl-qc.db")
Loading

0 comments on commit 1cd6338

Please sign in to comment.