/
shape.py
593 lines (546 loc) · 23.1 KB
/
shape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
# -*- coding: utf-8 -*-
#
import logging
import sys
from decimal import Decimal
from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Type, Union
from rdflib import RDF, BNode, Literal, URIRef
from .consts import (
RDF_type,
RDFS_Class,
RDFS_subClassOf,
SH_alternativePath,
SH_deactivated,
SH_description,
SH_inversePath,
SH_jsFunctionName,
SH_JSTarget,
SH_JSTargetType,
SH_message,
SH_name,
SH_oneOrMorePath,
SH_order,
SH_property,
SH_select,
SH_severity,
SH_SPARQLTarget,
SH_SPARQLTargetType,
SH_target,
SH_targetClass,
SH_targetNode,
SH_targetObjectsOf,
SH_targetSubjectsOf,
SH_Violation,
SH_zeroOrMorePath,
SH_zeroOrOnePath,
)
from .errors import ConstraintLoadError, ConstraintLoadWarning, ReportableRuntimeError, ShapeLoadError
from .helper import get_query_helper_cls
from .pytypes import GraphLike
if TYPE_CHECKING:
from pyshacl.shapes_graph import ShapesGraph
module = sys.modules[__name__]
class Shape(object):
__slots__ = (
'logger',
'sg',
'node',
'_p',
'_path',
'_advanced',
'_deactivated',
'_severity',
'_messages',
'_names',
'_descriptions',
)
def __init__(
self,
sg: 'ShapesGraph',
node: Union[URIRef, BNode],
p=False,
path: Optional[Union[URIRef, BNode]] = None,
logger=None,
):
"""
Shape
:type sg: ShapesGraph
:type node: URIRef | BNode
:type p: bool
:type path: URIRef | BNode | None
:type logger: logging.Logger
"""
self.logger = logger or logging.getLogger(__name__)
self.sg = sg
self.node = node
self._p = p
self._path = path
self._advanced = False
deactivated_vals = set(self.objects(SH_deactivated))
if len(deactivated_vals) > 1:
# TODO:coverage: we don't have any tests for invalid shapes
raise ShapeLoadError(
"A SHACL Shape cannot have more than one sh:deactivated predicate.",
"https://www.w3.org/TR/shacl/#deactivated",
)
elif len(deactivated_vals) < 1:
self._deactivated = False # type: bool
else:
d = next(iter(deactivated_vals))
if not isinstance(d, Literal):
# TODO:coverage: we don't have any tests for invalid shapes
raise ShapeLoadError(
"The value of sh:deactivated predicate on a SHACL Shape must be a Literal.",
"https://www.w3.org/TR/shacl/#deactivated",
)
self._deactivated = bool(d.value)
severity = set(self.objects(SH_severity))
if len(severity):
self._severity = next(iter(severity)) # type: Union[URIRef, BNode, Literal]
else:
self._severity = SH_Violation
messages = set(self.objects(SH_message))
if len(messages):
self._messages = messages # type: Set
else:
self._messages = set()
names = set(self.objects(SH_name))
if len(names):
self._names = names # type: Set
else:
self._names = set()
descriptions = set(self.objects(SH_description))
if len(descriptions):
self._descriptions = descriptions # type: Set
else:
self._descriptions = set()
def set_advanced(self, val):
self._advanced = bool(val)
def get_other_shape(self, shape_node):
try:
return self.sg.lookup_shape_from_node(shape_node)
except (KeyError, AttributeError):
# TODO:coverage: we never hit this during a successful test run
return None
@property
def is_property_shape(self):
return bool(self._p)
def property_shapes(self):
# TODO:coverage: this is never used?
return self.sg.graph.objects(self.node, SH_property)
@property
def deactivated(self):
return self._deactivated
@property
def severity(self):
return self._severity
@property
def message(self):
if self._messages is None:
return
for m in self._messages:
yield m
@property
def name(self):
if self._names is None:
return
for n in self._names:
yield n
def __str__(self):
try:
name = next(iter(self.name))
except Exception:
name = str(self.node)
return "<Shape {}>".format(name)
@property
def description(self):
# TODO:coverage: this is never used?
if self._descriptions is None:
return
for d in self._descriptions:
yield d
def objects(self, predicate=None):
return self.sg.graph.objects(self.node, predicate)
@property
def order(self):
order_nodes = list(self.objects(SH_order))
if len(order_nodes) < 1:
return Decimal("0.0")
if len(order_nodes) > 1:
raise ShapeLoadError(
"A SHACL Shape can have only one sh:order property.", "https://www.w3.org/TR/shacl-af/#rules-order"
)
order_node = next(iter(order_nodes))
if not isinstance(order_node, Literal):
raise ShapeLoadError(
"A SHACL Shape must be a numeric literal.", "https://www.w3.org/TR/shacl-af/#rules-order"
)
if isinstance(order_node.value, Decimal):
order = order_node.value
elif isinstance(order_node.value, int):
order = Decimal(order_node.value)
elif isinstance(order_node.value, float):
order = Decimal(str(order_node.value))
else:
raise ShapeLoadError(
"A SHACL Shape must be a numeric literal.", "https://www.w3.org/TR/shacl-af/#rules-order"
)
return order
def target_nodes(self):
return self.sg.graph.objects(self.node, SH_targetNode)
def target_classes(self):
return self.sg.graph.objects(self.node, SH_targetClass)
def implicit_class_targets(self):
types = list(self.sg.graph.objects(self.node, RDF_type))
subclasses = list(self.sg.graph.subjects(RDFS_subClassOf, RDFS_Class))
subclasses.append(RDFS_Class)
for t in types:
if t in subclasses:
return [self.node]
return []
def target_objects_of(self):
return self.sg.graph.objects(self.node, SH_targetObjectsOf)
def target_subjects_of(self):
return self.sg.graph.objects(self.node, SH_targetSubjectsOf)
def path(self):
if not self.is_property_shape:
return None
if self._path is not None:
return self._path
raise RuntimeError("property shape has no _path!") # pragma: no cover
def target(self):
target_nodes = self.target_nodes()
target_classes = self.target_classes()
implicit_targets = self.implicit_class_targets()
target_objects_of = self.target_objects_of()
target_subjects_of = self.target_subjects_of()
return (target_nodes, target_classes, implicit_targets, target_objects_of, target_subjects_of)
def advanced_target(self):
custom_targets = set(self.sg.objects(self.node, SH_target))
result_set = dict()
if self.sg.js_enabled:
use_JSTarget: Union[bool, Type] = True
else:
use_JSTarget = False
for c in custom_targets:
ct = dict()
selects = list(self.sg.objects(c, SH_select))
has_select = len(selects) > 0
fn_names = list(self.sg.objects(c, SH_jsFunctionName))
has_fnname = len(fn_names) > 0
is_types = set(self.sg.objects(c, RDF_type))
if has_select or (SH_SPARQLTarget in is_types):
ct['type'] = SH_SPARQLTarget
SPARQLQueryHelper = get_query_helper_cls()
qh = SPARQLQueryHelper(self, c, selects[0], deactivated=self._deactivated)
qh.collect_prefixes()
ct['qh'] = qh
elif has_fnname or (SH_JSTarget in is_types):
if use_JSTarget:
JST = getattr(module, "JSTarget", None)
if not JST:
# Lazy-import JS-Target to prevent RDFLib import error
from pyshacl.extras.js.target import JSTarget as JST
setattr(module, "JSTarget", JST)
ct['type'] = SH_JSTarget
ct['targeter'] = JST(self.sg, c)
else:
# Found JSTarget, but JS is not enabled in PySHACL. Ignore this target.
pass
else:
found_tt = None
for t in is_types:
try:
found_tt = self.sg.get_shacl_target_type(t)
break
except LookupError:
continue
if not found_tt:
msg = "None of these types match a TargetType: {}".format(" ".join(is_types))
raise ShapeLoadError(msg, "https://www.w3.org/TR/shacl-af/#SPARQLTargetType")
bound_tt = found_tt.bind(self, c)
ct['type'] = bound_tt.shacl_constraint_class()
if ct['type'] == SH_SPARQLTargetType:
ct['qt'] = bound_tt
elif ct['type'] == SH_JSTargetType:
ct['targeter'] = bound_tt
result_set[c] = ct
return result_set
def focus_nodes(self, data_graph):
"""
The set of focus nodes for a shape may be identified as follows:
specified in a shape using target declarations
specified in any constraint that references a shape in parameters of shape-expecting constraint parameters (e.g. sh:node)
specified as explicit input to the SHACL processor for validating a specific RDF term against a shape
:return:
"""
(target_nodes, target_classes, implicit_classes, target_objects_of, target_subjects_of) = self.target()
if self._advanced:
advanced_targets = self.advanced_target()
else:
advanced_targets = False
found_node_targets = set()
# Just add _all_ target_nodes to the set,
# they don't need to actually exist in the graph
found_node_targets.update(iter(target_nodes))
target_classes = set(target_classes)
target_classes.update(set(implicit_classes))
found_target_instances = set()
for tc in target_classes:
s = data_graph.subjects(RDF_type, tc)
found_target_instances.update(s)
subc = data_graph.subjects(RDFS_subClassOf, tc)
for subclass in iter(subc):
if subclass == tc:
continue
s1 = data_graph.subjects(RDF_type, subclass)
found_target_instances.update(s1)
found_node_targets.update(found_target_instances)
found_target_subject_of = set()
for s_of in target_subjects_of:
subs = {s for s, o in data_graph.subject_objects(s_of)}
found_target_subject_of.update(subs)
found_node_targets.update(found_target_subject_of)
found_target_object_of = set()
for o_of in target_objects_of:
objs = {o for s, o in data_graph.subject_objects(o_of)}
found_target_object_of.update(objs)
found_node_targets.update(found_target_object_of)
if advanced_targets:
for at_node, at in advanced_targets.items():
if at['type'] == SH_SPARQLTarget:
qh = at['qh']
select = qh.apply_prefixes(qh.select_text)
results = data_graph.query(select, initBindings=None)
if not results or len(results.bindings) < 1:
continue
for r in results:
t = r['this']
found_node_targets.add(t)
elif at['type'] in (SH_JSTarget, SH_JSTargetType):
results = at['targeter'].find_targets(data_graph)
for r in results:
found_node_targets.add(r)
else:
results = at['qt'].find_targets(data_graph)
if not results or len(results.bindings) < 1:
continue
for r in results:
t = r['this']
found_node_targets.add(t)
return found_node_targets
@classmethod
def value_nodes_from_path(cls, sg, focus, path_val, target_graph, recursion=0):
# Link: https://www.w3.org/TR/shacl/#property-paths
if isinstance(path_val, URIRef):
return set(target_graph.objects(focus, path_val))
elif isinstance(path_val, Literal):
raise ReportableRuntimeError("Values of a property path cannot be a Literal.")
# At this point, path_val _must_ be a BNode
# TODO, the path_val BNode must be value of exactly one sh:path subject in the SG.
if recursion >= 10:
raise ReportableRuntimeError("Path traversal depth is too much!")
find_list = set(sg.graph.objects(path_val, RDF.first))
if len(find_list) > 0:
first_node = next(iter(find_list))
rest_nodes = set(sg.graph.objects(path_val, RDF.rest))
go_deeper = True
if len(rest_nodes) < 1:
if recursion == 0:
raise ReportableRuntimeError("A list of SHACL Paths must contain at least two path items.")
else:
go_deeper = False
rest_node = next(iter(rest_nodes))
if rest_node == RDF.nil:
if recursion == 0:
raise ReportableRuntimeError("A list of SHACL Paths must contain at least two path items.")
else:
go_deeper = False
this_level_nodes = cls.value_nodes_from_path(sg, focus, first_node, target_graph, recursion=recursion + 1)
if not go_deeper:
return this_level_nodes
found_value_nodes = set()
for tln in iter(this_level_nodes):
value_nodes = cls.value_nodes_from_path(sg, tln, rest_node, target_graph, recursion=recursion + 1)
found_value_nodes.update(value_nodes)
return found_value_nodes
find_inverse = set(sg.graph.objects(path_val, SH_inversePath))
if len(find_inverse) > 0:
inverse_path = next(iter(find_inverse))
return set(target_graph.subjects(inverse_path, focus))
find_alternatives = set(sg.graph.objects(path_val, SH_alternativePath))
if len(find_alternatives) > 0:
alternatives_list = next(iter(find_alternatives))
all_collected = set()
visited_alternatives = 0
for a in sg.graph.items(alternatives_list):
found_nodes = cls.value_nodes_from_path(sg, focus, a, target_graph, recursion=recursion + 1)
visited_alternatives += 1
all_collected.update(found_nodes)
if visited_alternatives < 2:
raise ReportableRuntimeError("List of SHACL alternate paths must have at least two path items.")
return all_collected
find_zero_or_more = set(sg.graph.objects(path_val, SH_zeroOrMorePath))
if len(find_zero_or_more) > 0:
zm_path = next(iter(find_zero_or_more))
collection_set = set()
# Note, the zero-or-more path always includes the current subject too!
collection_set.add(focus)
found_nodes = cls.value_nodes_from_path(sg, focus, zm_path, target_graph, recursion=recursion + 1)
search_deeper_nodes = set(iter(found_nodes))
while len(search_deeper_nodes) > 0:
current_node = search_deeper_nodes.pop()
if current_node in collection_set:
continue
collection_set.add(current_node)
found_more_nodes = cls.value_nodes_from_path(
sg, current_node, zm_path, target_graph, recursion=recursion + 1
)
search_deeper_nodes.update(found_more_nodes)
return collection_set
find_one_or_more = set(sg.graph.objects(path_val, SH_oneOrMorePath))
if len(find_one_or_more) > 0:
one_or_more_path = next(iter(find_one_or_more))
collection_set = set()
found_nodes = cls.value_nodes_from_path(sg, focus, one_or_more_path, target_graph, recursion=recursion + 1)
# Note, the one-or-more path should _not_ include the current focus
search_deeper_nodes = set(iter(found_nodes))
while len(search_deeper_nodes) > 0:
current_node = search_deeper_nodes.pop()
if current_node in collection_set:
continue
collection_set.add(current_node)
found_more_nodes = cls.value_nodes_from_path(
sg, current_node, one_or_more_path, target_graph, recursion=recursion + 1
)
search_deeper_nodes.update(found_more_nodes)
return collection_set
find_zero_or_one = set(sg.graph.objects(path_val, SH_zeroOrOnePath))
if len(find_zero_or_one) > 0:
zero_or_one_path = next(iter(find_zero_or_one))
collection_set = set()
# Note, the zero-or-one path always includes the current subject too!
collection_set.add(focus)
found_nodes = cls.value_nodes_from_path(sg, focus, zero_or_one_path, target_graph, recursion=recursion + 1)
collection_set.update(found_nodes)
return collection_set
raise NotImplementedError("That path method to get value nodes of property shapes is not yet implemented.")
def value_nodes(self, target_graph, focus):
"""
For each focus node, you can get a set of value nodes.
For a Node Shape, each focus node has just one value node,
which is just the focus_node
:param target_graph:
:param focus:
:return:
"""
if not isinstance(focus, (tuple, list, set)):
focus = [focus]
if not self.is_property_shape:
return {f: set((f,)) for f in focus}
path_val = self.path()
focus_dict = {}
for f in focus:
focus_dict[f] = self.value_nodes_from_path(self.sg, f, path_val, target_graph)
return focus_dict
def find_custom_constraints(self):
applicable_custom_constraints = set()
for c in self.sg.custom_constraints:
mandatory = (p for p in c.parameters if not p.optional)
found_all_mandatory = True
for mandatory_param in mandatory:
path = mandatory_param.path()
assert isinstance(path, URIRef)
found_vals = set(self.sg.objects(self.node, path))
# found_vals = self._value_nodes_from_path(self.node, mandatory_param.path(), self.sg.graph)
found_all_mandatory = found_all_mandatory and bool(len(found_vals) > 0)
if found_all_mandatory:
applicable_custom_constraints.add(c)
return applicable_custom_constraints
def validate(
self,
target_graph: GraphLike,
focus: Optional[
Union[
Tuple[Union[URIRef, BNode]],
List[Union[URIRef, BNode]],
Set[Union[URIRef, BNode]],
Union[URIRef, BNode],
]
] = None,
bail_on_error: Optional[bool] = False,
_evaluation_path: Optional[List] = None,
):
if self.deactivated:
return True, []
if focus is not None:
if not isinstance(focus, (tuple, list, set)):
focus = [focus]
else:
focus = self.focus_nodes(target_graph)
if len(focus) < 1:
# Its possible for shapes to have _no_ focus nodes
# (they are called in other ways)
return True, []
if _evaluation_path is None:
_evaluation_path = []
elif len(_evaluation_path) >= 30:
# 27 is the depth required to successfully do the meta-shacl test on shacl.ttl
path_str = "->".join((str(e) for e in _evaluation_path))
raise ReportableRuntimeError("Evaluation path too deep!\n{}".format(path_str))
# Lazy import here to avoid an import loop
CONSTRAINT_PARAMETERS, PARAMETER_MAP = getattr(module, 'CONSTRAINT_PARAMS', (None, None))
if not CONSTRAINT_PARAMETERS:
from .constraints import ALL_CONSTRAINT_PARAMETERS, CONSTRAINT_PARAMETERS_MAP
setattr(module, 'CONSTRAINT_PARAMS', (ALL_CONSTRAINT_PARAMETERS, CONSTRAINT_PARAMETERS_MAP))
CONSTRAINT_PARAMETERS = ALL_CONSTRAINT_PARAMETERS
PARAMETER_MAP = CONSTRAINT_PARAMETERS_MAP
if self.sg.js_enabled:
search_parameters = CONSTRAINT_PARAMETERS.copy()
constraint_map = PARAMETER_MAP.copy()
from pyshacl.extras.js.constraint import JSConstraint, SH_js
search_parameters.append(SH_js)
constraint_map[SH_js] = JSConstraint
else:
search_parameters = CONSTRAINT_PARAMETERS
constraint_map = PARAMETER_MAP
parameters = (p for p, v in self.sg.predicate_objects(self.node) if p in search_parameters)
reports = []
focus_value_nodes = self.value_nodes(target_graph, focus)
non_conformant = False
done_constraints = set()
run_count = 0
_evaluation_path.append(self)
constraint_components = [constraint_map[p] for p in iter(parameters)]
for constraint_component in constraint_components:
if constraint_component in done_constraints:
continue
try:
c = constraint_component(self)
except ConstraintLoadWarning as w:
self.logger.warning(repr(w))
continue
except ConstraintLoadError as e:
self.logger.error(repr(e))
raise e
_e_p = _evaluation_path[:]
_e_p.append(c)
_is_conform, _r = c.evaluate(target_graph, focus_value_nodes, _e_p)
non_conformant = non_conformant or (not _is_conform)
reports.extend(_r)
run_count += 1
done_constraints.add(constraint_component)
if non_conformant and bail_on_error:
break
applicable_custom_constraints = self.find_custom_constraints()
for a in applicable_custom_constraints:
if non_conformant and bail_on_error:
break
_e_p = _evaluation_path[:]
validator = a.make_validator_for_shape(self)
_e_p.append(validator)
_is_conform, _r = validator.evaluate(target_graph, focus_value_nodes, _e_p)
non_conformant = non_conformant or (not _is_conform)
reports.extend(_r)
run_count += 1
return (not non_conformant), reports