-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
226 additions
and
286 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,209 +1,123 @@ | ||
import json | ||
import sys | ||
from collections import defaultdict, Counter | ||
from itertools import groupby | ||
from operator import itemgetter | ||
from ..utils import get_intersection | ||
|
||
|
||
def append_duplicates( | ||
category_check: dict, | ||
category: dict, | ||
def find_errors_in_questions( | ||
config: dict, | ||
duplicates: dict, | ||
types: str, | ||
ck: dict, | ||
errors: list, | ||
name: str, | ||
form: int, | ||
question=None, | ||
other=False, | ||
) -> None: | ||
if category_check["name"] != category["name"]: | ||
if ck["question"] == config["question"]: | ||
intersect = get_intersection(ck["options"], config["options"]) | ||
if len(intersect): | ||
duplicates.append( | ||
errors = [] | ||
if "name" not in config: | ||
errors.append( | ||
{ | ||
"name": name, | ||
"question": question, | ||
"form": form, | ||
"error": "name", | ||
"other": other, | ||
} | ||
) | ||
for q in config.get("questions"): | ||
for o in q.get("other") if "other" in q else []: | ||
find_errors_in_questions( | ||
config=o, | ||
errors=errors, | ||
name=name, | ||
form=form, | ||
question=q.get("id"), | ||
other=True, | ||
) | ||
if "else" in q: | ||
if not len( | ||
set.intersection( | ||
set(q["else"].keys()), set(["name", "ignore"]) | ||
) | ||
): | ||
ek = False if "ignore" in q["else"] else True | ||
errors.append( | ||
{ | ||
"question": ck["question"], | ||
"name": category_check["name"], | ||
"option": ck["options"], | ||
"type": types, | ||
"name": name, | ||
"question": q.get("id"), | ||
"form": form, | ||
"error": "name", | ||
"other_key": other, | ||
"else_key": ek, | ||
} | ||
) | ||
|
||
|
||
def loop_duplicates( | ||
category: dict, config: dict, category_check: dict, duplicates: list | ||
) -> None: | ||
for types in ["and", "or"]: | ||
if types in category_check: | ||
for ck in category_check[types]: | ||
append_duplicates( | ||
category_check=category_check, | ||
category=category, | ||
config=config, | ||
duplicates=duplicates, | ||
types=types, | ||
ck=ck, | ||
) | ||
|
||
|
||
def get_duplicates(category: dict, categories: list): | ||
total = 1 if "or" in category else 0 | ||
configs = [] | ||
if "or" in category: | ||
configs += category["or"] | ||
else: | ||
category["or"] = [] | ||
if "and" in category: | ||
configs += category["and"] | ||
total += len(category["and"]) | ||
|
||
duplicates = [] | ||
for config in configs: | ||
for category_check in categories: | ||
loop_duplicates( | ||
category=category, | ||
config=config, | ||
category_check=category_check, | ||
duplicates=duplicates, | ||
) | ||
return configs, duplicates, total | ||
|
||
|
||
def append_duplicates_dict( | ||
duplicate: dict, duplicates: list, duplicates_dict: dict | ||
): | ||
for types in ["and", "or"]: | ||
dp = len( | ||
list( | ||
filter( | ||
lambda x: x["type"] == types | ||
and x["name"] == duplicate["name"] | ||
and x["question"] == duplicate["question"], | ||
duplicates, | ||
) | ||
) | ||
) | ||
if dp: | ||
if duplicate["name"] in duplicates_dict: | ||
if types in duplicates_dict[duplicate["name"]]: | ||
duplicates_dict[duplicate["name"]][types] += 1 | ||
else: | ||
duplicates_dict[duplicate["name"]][types] = 1 | ||
duplicates_dict[duplicate["name"]]["total"] += 1 | ||
else: | ||
duplicates_dict[duplicate["name"]] = { | ||
"and": 1 if "and" == types else 0, | ||
"or": 1 if "or" == types else 0, | ||
"total": 1, | ||
} | ||
|
||
|
||
def merge_grouped_data(data: list) -> list: | ||
merged_data = [] | ||
for key, values in data.items(): | ||
merged = {"name": key} | ||
for value in values: | ||
merged.update(value) | ||
merged_data.append(merged) | ||
return merged_data | ||
|
||
|
||
def get_uniq_categories(categories: list) -> list: | ||
names = [o["name"] for o in categories] | ||
dn = [item for item, count in Counter(names).items() if count > 1] | ||
ld = list(filter(lambda d: d["name"] in dn, categories)) | ||
gd = { | ||
k: [i for i in g] | ||
for k, g in groupby( | ||
sorted(ld, key=itemgetter("name")), key=itemgetter("name") | ||
) | ||
} | ||
mgd = merge_grouped_data(data=gd) | ||
categories = mgd + list(filter(lambda d: d["name"] not in dn, categories)) | ||
return categories | ||
def find_errors_in_config(config: dict, name: str) -> list: | ||
errors = [] | ||
if "form" not in config: | ||
errors.append({"name": name, "error": "form"}) | ||
if "categories" not in config: | ||
errors.append({"name": name, "error": "categories"}) | ||
return errors | ||
|
||
|
||
def get_options(data: dict) -> list: | ||
if "categories" not in data: | ||
sys.exit(0) | ||
options = [] | ||
categories = [c for c in data["categories"]] | ||
categories = get_uniq_categories(categories=categories) | ||
for category in categories: | ||
duplicates_dict = defaultdict() | ||
configs, duplicates, total = get_duplicates( | ||
category=category, categories=categories | ||
def get_all_questions(config: dict, form: str, qs: list) -> list: | ||
for q in config.get("questions"): | ||
qs.append( | ||
{"form": form, "id": q.get("id"), "options": q.get("options")} | ||
) | ||
for duplicate in duplicates: | ||
append_duplicates_dict( | ||
duplicate=duplicate, | ||
duplicates=duplicates, | ||
duplicates_dict=duplicates_dict, | ||
) | ||
category.update( | ||
{ | ||
"configs": configs, | ||
"total": total, | ||
"duplicates": duplicates, | ||
"total_duplicate": dict(duplicates_dict), | ||
} | ||
) | ||
options.append(category) | ||
return options | ||
|
||
|
||
def loop_options( | ||
opt: dict, | ||
td: dict, | ||
printed: dict, | ||
form_id: str, | ||
title: str = None, | ||
info: bool = True, | ||
) -> None: | ||
if opt["total_duplicate"][td]["total"] >= opt["total"]: | ||
if opt["total_duplicate"][td]["or"] == len(opt["or"]) and opt[ | ||
"total_duplicate" | ||
][td]["and"] == len(opt["and"]): | ||
duplicate = list( | ||
filter(lambda x: x["name"] == td, opt["duplicates"]) | ||
) | ||
if info: | ||
print( | ||
title, | ||
f"POTENTIAL DUPLICATE: {opt['name']} WITH {td}, ", | ||
f"FORM ID: {form_id}" | ||
if q.get("other"): | ||
for o in q.get("other"): | ||
get_all_questions( | ||
config=o, | ||
form=form, | ||
qs=qs, | ||
) | ||
for d in duplicate: | ||
if info: | ||
print( | ||
f""" | ||
QUESTION: {d['question']} | ||
OPTIONS: {d['option']} | ||
""", | ||
"===================================", | ||
) | ||
printed.update({td: True}) | ||
return qs | ||
|
||
|
||
def get_error_messages(errors: list) -> list: | ||
messages = [] | ||
for error in errors: | ||
prefix = f"NAME: {error['name']} | " if "name" in error else "" | ||
if error.get("form"): | ||
msg = f"{prefix}FORM: {error['form']} | " | ||
if "question" in error: | ||
msg += f"QUESTION: {error['question']} | " | ||
key_name = ( | ||
"`other`" | ||
if error.get("other_key") | ||
else "`else`" | ||
if error.get("else_key") | ||
else "`categories`" | ||
) | ||
msg += f"CATEGORY NAME is required in {key_name}" | ||
if error["error"] == "form": | ||
msg = f"{prefix}FORM is required" | ||
if error["error"] == "categories": | ||
msg = f"{prefix}`categories` is typo or not present" | ||
messages.append(msg) | ||
return messages | ||
|
||
|
||
def check_config(file_config: str, info: bool = True) -> int: | ||
def check_config(file_config: str, info: bool = True): | ||
with open(file_config) as f: | ||
data = f.read() | ||
data = json.loads(data) | ||
counter = 0 | ||
for d in data: | ||
printed = {} | ||
options = get_options(data=d) | ||
for opt in options: | ||
if opt["name"] in printed: | ||
continue | ||
for td in opt["total_duplicate"]: | ||
title = d["name"] if "name" in d else None | ||
loop_options( | ||
opt=opt, | ||
td=td, | ||
printed=printed, | ||
title=title, | ||
info=info, | ||
form_id=d["form"], | ||
errors = [] | ||
questions = [] | ||
for config in data: | ||
qs = [] | ||
cname = config.get("name") | ||
errors += find_errors_in_config(config=config, name=cname) | ||
|
||
if "form" in config and "categories" in config: | ||
for c in config["categories"]: | ||
find_errors_in_questions( | ||
config=c, errors=errors, name=cname, form=config["form"] | ||
) | ||
if len(printed) > 0: | ||
counter += len(printed) | ||
return counter | ||
qs = get_all_questions(config=c, form=config["form"], qs=qs) | ||
questions.append(qs) | ||
errors = get_error_messages(errors=errors) | ||
if info: | ||
for error in errors: | ||
print(error) | ||
print("=============================") | ||
return errors, questions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import itertools | ||
from ..db import get_option_by_questions | ||
|
||
|
||
def check_options(rows, questions: list, info: bool = True): | ||
errors = 0 | ||
ls = [{"question": row["qid"], "option": row["name"]} for row in rows] | ||
for key, group in itertools.groupby(ls, key=lambda x: x["question"]): | ||
options = [o["option"] for o in list(group) if o["option"] is not None] | ||
fq = list(filter(lambda x: x["id"] == key, questions)) | ||
if len(fq) and len(options): | ||
cg_items = set(fq[0]["options"]) | ||
db_items = set(options) | ||
isec = set.intersection(cg_items, db_items) | ||
diff = list(cg_items - isec) | ||
if len(diff): | ||
errors += 1 | ||
if info: | ||
qid = fq[0]["id"] | ||
print( | ||
f"QUESTION ID: {qid} | options not found:" | ||
f" {','.join(diff)}" | ||
) | ||
return errors == 0 | ||
|
||
|
||
def check_questions(connection, questions: list, info: bool = True) -> bool: | ||
errors = 0 | ||
ids = [q["id"] for q in questions] | ||
rq = get_option_by_questions(connection=connection, questions=ids) | ||
if rq: | ||
rows = set([row["qid"] for row in rq]) | ||
diff = list(set(ids) - rows) | ||
if len(diff): | ||
errors += 1 | ||
if info: | ||
qd = ", ".join([str(q) for q in diff]) | ||
print(f"QUESTION ID NOT FOUND: {qd}") | ||
option_exists = check_options(rows=rq, questions=questions) | ||
if not option_exists: | ||
errors += 1 | ||
return errors == 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.