-
Notifications
You must be signed in to change notification settings - Fork 4
/
check_consistency_with_ontology.py
142 lines (117 loc) · 5.7 KB
/
check_consistency_with_ontology.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# sourcery skip: use-fstring-for-concatenation
from datetime import datetime
from pathlib import Path
import regex
from lxml import etree
from regex import Pattern
from dsp_tools.commands.xmlupload.models.ontology_diagnose_models import InvalidOntologyElements, OntoCheckInformation
from dsp_tools.commands.xmlupload.ontology_client import OntologyClient
from dsp_tools.models.exceptions import UserError
defaultOntologyColon: Pattern[str] = regex.compile(r"^:\w+$")
knoraUndeclared: Pattern[str] = regex.compile(r"^\w+$")
genericPrefixedOntology: Pattern[str] = regex.compile(r"^[\w\-]+:\w+$")
def do_xml_consistency_check(onto_client: OntologyClient, root: etree._Element) -> None:
"""
This function takes an OntologyClient and the root of an XML.
It retrieves the ontologies from the server.
It iterates over the root.
If it finds any invalid properties or classes, they are printed out and a UserError is raised.
Args:
onto_client: client for the ontology retrieval
root: root of the XML
Raises:
UserError: if there are any invalid properties or classes
"""
onto_check_info = OntoCheckInformation(
default_ontology_prefix=onto_client.default_ontology, onto_lookup=onto_client.get_all_ontologies_from_server()
)
classes, properties = _get_all_classes_and_properties(root)
_find_problems_in_classes_and_properties(classes, properties, onto_check_info)
def _find_problems_in_classes_and_properties(
classes: dict[str, list[str]], properties: dict[str, list[str]], onto_check_info: OntoCheckInformation
) -> None:
class_problems = _diagnose_all_classes(classes, onto_check_info)
property_problems = _diagnose_all_properties(properties, onto_check_info)
if not class_problems and not property_problems:
return None
problems = InvalidOntologyElements(
classes=class_problems, properties=property_problems, ontos_on_server=list(onto_check_info.onto_lookup.keys())
)
msg, df = problems.execute_problem_protocol()
if df is not None:
csv_file = f"XML_syntax_errors_{datetime.now().strftime('%Y-%m-%d_%H%M%S')}.csv"
df.to_csv(path_or_buf=Path(Path.cwd(), csv_file), index=False)
msg += (
"\n\n---------------------------------------\n\n"
f"\nAll the problems are listed in the file: '{Path.cwd()}/{csv_file}'"
)
raise UserError(msg)
def _get_all_classes_and_properties(root: etree._Element) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
cls_dict = _get_all_class_types_and_ids(root)
prop_dict: dict[str, list[str]] = {}
for resource in root.iterchildren(tag="resource"):
prop_dict = _get_all_property_names_and_resource_ids_one_resource(resource, prop_dict)
return cls_dict, prop_dict
def _get_all_class_types_and_ids(root: etree._Element) -> dict[str, list[str]]:
cls_dict: dict[str, list[str]] = {}
for resource in root.iterchildren(tag="resource"):
restype = resource.attrib["restype"]
if restype in cls_dict:
cls_dict[restype].append(resource.attrib["id"])
else:
cls_dict[restype] = [resource.attrib["id"]]
return cls_dict
def _get_all_property_names_and_resource_ids_one_resource(
resource: etree._Element, prop_dict: dict[str, list[str]]
) -> dict[str, list[str]]:
for prop in resource.iterchildren():
if prop.tag != "bitstream":
prop_name = prop.attrib["name"]
if prop_name in prop_dict:
prop_dict[prop_name].append(resource.attrib["id"])
else:
prop_dict[prop_name] = [resource.attrib["id"]]
return prop_dict
def _diagnose_all_classes(
classes: dict[str, list[str]], onto_check_info: OntoCheckInformation
) -> list[tuple[str, list[str], str]]:
problem_list = []
for cls_type, ids in classes.items():
if problem := _diagnose_class(cls_type, onto_check_info):
problem_list.append((cls_type, ids, problem))
return problem_list
def _diagnose_class(cls_type: str, onto_check_info: OntoCheckInformation) -> str | None:
prefix, cls_ = _get_prefix_and_prop_or_cls_identifier(cls_type, onto_check_info.default_ontology_prefix)
if not prefix:
return "Property name does not follow a known ontology pattern"
if onto := onto_check_info.onto_lookup.get(prefix):
return "Invalid Class Type" if cls_ not in onto.classes else None
else:
return "Unknown ontology prefix"
def _diagnose_all_properties(
properties: dict[str, list[str]], onto_check_info: OntoCheckInformation
) -> list[tuple[str, list[str], str]]:
problem_list = []
for prop_name, ids in properties.items():
if problem := _diagnose_property(prop_name, onto_check_info):
problem_list.append((prop_name, ids, problem))
return problem_list
def _diagnose_property(prop_name: str, onto_check_info: OntoCheckInformation) -> str | None:
prefix, prop = _get_prefix_and_prop_or_cls_identifier(prop_name, onto_check_info.default_ontology_prefix)
if not prefix:
return "Property name does not follow a known ontology pattern"
if onto := onto_check_info.onto_lookup.get(prefix):
return "Invalid Property" if prop not in onto.properties else None
else:
return "Unknown ontology prefix"
def _get_prefix_and_prop_or_cls_identifier(
prop_or_cls: str, default_ontology_prefix: str
) -> tuple[str, ...] | tuple[None, None]:
if defaultOntologyColon.match(prop_or_cls):
return default_ontology_prefix, prop_or_cls.lstrip(":")
elif knoraUndeclared.match(prop_or_cls):
return "knora-api", prop_or_cls
elif genericPrefixedOntology.match(prop_or_cls):
return tuple(prop_or_cls.split(":"))
else:
return None, None