diff --git a/adtl/qc/__init__.py b/adtl/qc/__init__.py
index d5adbcf..bf3a677 100644
--- a/adtl/qc/__init__.py
+++ b/adtl/qc/__init__.py
@@ -1,10 +1,11 @@
"""
Quality Control module for ADTL
"""
+import copy
+import json
import functools
-from typing import List, Union
from pathlib import Path
-from typing import TypedDict, Dict, List, Any, Optional
+from typing import List, Union, TypedDict, Any, Optional, Dict
import pandas as pd
import numpy as np
@@ -35,6 +36,7 @@ class WorkUnitResult(TypedDict):
file: str
rows_success: int
rows_fail: int
+ rows: int
ratio_success: float
rows_fail_idx: List[int]
success: bool
@@ -81,6 +83,7 @@ def wrapper(df, **kwargs):
return dict(
rows_success=int(rows_success),
rows_fail=int(rows_fail),
+ rows=int(rows_success) + int(rows_fail),
ratio_success=ratio_success,
success=bool(ratio_success >= mostly),
mostly=mostly,
@@ -104,6 +107,17 @@ def schema(schema_path: Union[str, Path], pattern: str = "*.csv"):
pass
+def get_result_from_insertion(data: Dict[str, Any]) -> WorkUnitResult:
+ result: Dict[str, Any] = copy.deepcopy(data) # type: ignore
+ if result["fail_data"]:
+ result["fail_data"] = pd.DataFrame(json.loads(result["fail_data"]))
+ if result["rows_fail_idx"]:
+ result["rows_fail_idx"] = [
+ int(float(x)) for x in str(result["rows_fail_idx"]).split(",")
+ ]
+ return result
+
+
def main(args=None):
from .runner import _main
diff --git a/adtl/qc/report.py b/adtl/qc/report.py
new file mode 100644
index 0000000..5bfa272
--- /dev/null
+++ b/adtl/qc/report.py
@@ -0,0 +1,161 @@
+"""
+Quality Control module for ADTL, report submodule
+"""
+
+import json
+import sqlite3
+from string import Template
+from pathlib import Path
+from typing import List, Any, Dict
+from functools import partial
+
+import pandas as pd
+
+from . import get_result_from_insertion, WorkUnitResult, Rule
+
+RULES_SUBFOLDER = "r"
+DATASET_SUBFOLDER = "d"
+TEMPLATES = Path(__file__).parent / "templates"
+STYLE = TEMPLATES / "style.css"
+INDEX = "index.html"
+
+
+def render_result(result: Dict[str, Any], show_rule: bool = False) -> str:
+ result = get_result_from_insertion(result) # type: ignore
+ result["rule_str"] = (
+ f"""{result["rule"]}, """
+ if show_rule
+ else ""
+ )
+ tmpl = (
+ "
[{rows_fail} / {rows}] {rule_str}{dataset} / {file}".format(
+ **result
+ )
+ if result["success"] != 1
+ else "✔ {rule_str}{dataset} / {file}".format(**result)
+ )
+ if result["fail_data"]:
+ fail_data = pd.DataFrame(json.loads(result["fail_data"]))
+ tmpl += """
+
+ Failed rows
+ {log}
+
""".format(
+ log=str(fail_data)
+ )
+ else:
+ tmpl += ""
+ return tmpl
+
+
+def render_results_by_rule(
+ results: List[WorkUnitResult], rules: List[Rule]
+) -> Dict[str, str]:
+ result_data = "\n".join(map(render_result, results))
+ rules_in_results = [r["rule"] for r in results]
+
+ out = {}
+ for rule_name in rules_in_results:
+ rule = [r for r in rules if r["name"] == rule_name][0]
+ out[rule_name] = Template((TEMPLATES / "rule.html").read_text()).substitute(
+ dict(
+ name=rule["name"],
+ description=rule["description"],
+ long_description='\n'
+ + rule["long_description"]
+ + "
"
+ if rule["long_description"]
+ else "",
+ results=result_data,
+ )
+ )
+ return out
+
+
+def render_results_by_dataset(
+ results: List[WorkUnitResult], datasets: List[str]
+) -> Dict[str, str]:
+ def filter_dataset(dataset: str) -> List[WorkUnitResult]:
+ return [r for r in results if r["dataset"] == dataset]
+
+ out = {}
+
+ for dataset in datasets:
+ result_data = "\n".join(
+ map(partial(render_result, show_rule=True), filter_dataset(dataset))
+ )
+ out[dataset] = Template((TEMPLATES / "dataset.html").read_text()).substitute(
+ dataset=dataset, results=result_data
+ )
+ return out
+
+
+def render_index(rules: List[Dict[str, Any]], datasets: List[str]) -> str:
+ dataset_index = "\n".join(
+ f"""{dataset}""" for dataset in datasets
+ )
+ rule_index = "\n".join(
+ f"""{r["description"]}"""
+ for r in rules
+ )
+ return Template((TEMPLATES / "index.html").read_text()).substitute(
+ dict(dataset_index=dataset_index, rule_index=rule_index)
+ )
+
+
+def read_sql(
+ conn: sqlite3.Connection, sql: str, columns: List[str]
+) -> List[Dict[str, Any]]:
+ cur = conn.cursor()
+ res = cur.execute(sql)
+ return [dict(zip(columns, r)) for r in res.fetchall()]
+
+
+def make_report(store_database: str, output_folder: Path = Path("qc_report")):
+ "Makes report from results database"
+
+ output_folder.mkdir(exist_ok=True)
+ (output_folder / "r").mkdir(exist_ok=True)
+ (output_folder / "d").mkdir(exist_ok=True)
+
+ conn = sqlite3.connect(store_database)
+ datasets = read_sql(conn, "SELECT DISTINCT dataset FROM results", ["dataset"])
+ datasets = [n["dataset"] if n["dataset"] else "_unlabelled" for n in datasets]
+ rules = read_sql(
+ conn,
+ "SELECT name, description, long_description FROM rules",
+ ["name", "description", "long_description"],
+ )
+ (output_folder / "style.css").write_text(STYLE.read_text())
+ (output_folder / INDEX).write_text(render_index(rules, datasets))
+ results = read_sql(
+ conn,
+ "SELECT * from results",
+ [
+ "rule",
+ "dataset",
+ "file",
+ "rows_success",
+ "rows_fail",
+ "rows",
+ "ratio_success",
+ "rows_fail_idx",
+ "success",
+ "mostly",
+ "fail_data",
+ ],
+ )
+ results_by_rule = render_results_by_rule(results, rules)
+ results_by_dataset = render_results_by_dataset(results, datasets)
+ for rule in results_by_rule:
+ (output_folder / "r" / (rule + ".html")).write_text(results_by_rule[rule])
+ print(f"wrote r/{rule}.html")
+ for dataset in datasets:
+ (output_folder / "d" / (dataset + ".html")).write_text(
+ results_by_dataset[dataset]
+ )
+ print(f"wrote d/{dataset}.html")
+
+
+if __name__ == "__main__":
+ make_report("adtl-qc.db")
diff --git a/adtl/qc/runner.py b/adtl/qc/runner.py
index c624e92..352380b 100644
--- a/adtl/qc/runner.py
+++ b/adtl/qc/runner.py
@@ -17,6 +17,7 @@
import pandas as pd
from . import Dataset, Rule, WorkUnit, WorkUnitResult
+from .report import make_report
DEFAULT_PATTERN = "*.csv"
@@ -26,6 +27,7 @@
file TEXT,
rows_success INTEGER,
rows_fail INTEGER,
+ rows INTEGER,
ratio_success REAL,
rows_fail_idx TEXT,
success INTEGER,
@@ -34,14 +36,14 @@
)"""
DDL_RULES = """CREATE TABLE IF NOT EXISTS rules (
- rule TEXT,
+ name TEXT,
description TEXT,
long_description TEXT
)"""
INSERT_RESULTS = """INSERT INTO results VALUES (
:rule, :dataset, :file, :rows_success,
- :rows_fail, :ratio_success, :rows_fail_idx,
+ :rows_fail, :rows, :ratio_success, :rows_fail_idx,
:success, :mostly, :fail_data
)"""
@@ -59,7 +61,10 @@ def collect_datasets(
folders = defaultdict(list)
for f in files:
folders[f.parent.stem].append(f)
- return [Dataset(folder=folder, files=folders[folder]) for folder in folders]
+ return [
+ Dataset(folder=folder if folder else "_unlabelled", files=folders[folder])
+ for folder in folders
+ ]
def collect_rules(root: Path = Path("qc")) -> List[Rule]:
@@ -111,12 +116,12 @@ def collect_work_units(datasets: List[Dataset], rules: List[Rule]) -> List[WorkU
def prepare_result_for_insertion(work_unit_result: WorkUnitResult) -> Dict[str, Any]:
result: Dict[str, Any] = copy.deepcopy(work_unit_result) # type: ignore
result["fail_data"] = (
- None
+ ""
if result["fail_data"].empty
else json.dumps(result["fail_data"].to_dict(orient="records"))
)
result["rows_fail_idx"] = (
- None
+ ""
if not result["rows_fail_idx"]
else ",".join(map(str, result["rows_fail_idx"]))
)
@@ -125,15 +130,6 @@ def prepare_result_for_insertion(work_unit_result: WorkUnitResult) -> Dict[str,
return result
-def get_result_from_insertion(data: Dict[str, Any]) -> WorkUnitResult:
- result: Dict[str, Any] = copy.deepcopy(data) # type: ignore
- if result["fail_data"]:
- result["fail_data"] = pd.DataFrame(json.loads(result["fail_data"]))
- if result["rows_fail_idx"]:
- result["rows_fail_idx"] = [int(x) for x in result["rows_fail_idx"].split(",")]
- return result
-
-
def process_work_unit(unit: WorkUnit, save_db: Optional[str] = None) -> WorkUnitResult:
rule = unit["rule"]
module = importlib.import_module(rule["module"])
@@ -158,6 +154,7 @@ def start(
rules_path: Path = Path("qc"),
data_file_formats: List[str] = ["csv"],
store_database: Optional[str] = None,
+ disable_report: bool = False,
) -> List[WorkUnitResult]:
rules = collect_rules(rules_path)
datasets = collect_datasets(data_path, data_file_formats)
@@ -174,7 +171,10 @@ def start(
pool = multiprocessing.Pool()
process_work_unit_db = functools.partial(process_work_unit, save_db=store_database)
- return pool.map(process_work_unit_db, work_units)
+ res = pool.map(process_work_unit_db, work_units)
+ if store_database and not disable_report:
+ make_report(store_database)
+ return res
def _main(args=None):
@@ -198,4 +198,5 @@ def _main(args=None):
Path(args.rule_root),
data_file_formats=args.format.split(","),
store_database=args.database,
+ disable_report=args.no_report,
)
diff --git a/adtl/qc/templates/dataset.html b/adtl/qc/templates/dataset.html
index 8336c15..939f489 100644
--- a/adtl/qc/templates/dataset.html
+++ b/adtl/qc/templates/dataset.html
@@ -3,45 +3,28 @@
- ✱ dataset - {{ dataset }}
-
+ ✱ dataset - $dataset
+
- {{ dataset }}
- Files
-
- {{ #files }}
- - {{ hash }} {{ file }}
- {{ /files }}
-
-
+ $dataset
Triggered rules
-
- {{#rules}}
- - [{{ success }} / {{ total }}] {{ rule }} - {{ file }}
-
- {{ #log }}
-
- Failed rows
- {{ log }}
-
- {{ /log }}
-
- {{/rules}}
+
-