/
BackwardChainingStore.py
541 lines (487 loc) · 21.6 KB
/
BackwardChainingStore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# -*- coding: utf-8 -*-
import sys
from pprint import pprint
from rdflib import RDF, URIRef, Variable
from rdflib.util import first
from rdflib.store import Store
from rdflib.plugins.stores.regexmatching import NATIVE_REGEX
# from rdflib.plugins.sparql.algebra import RenderSPARQLAlgebra
from rdflib.plugins.sparql.algebra import translateQuery as RenderSPARQLAlgebra
# from rdflib.plugins.sparql.algebra import NonSymmetricBinaryOperator
# from rdflib.plugins.sparql.graph import BasicGraphPattern
from FuXi.Rete.Magic import SetupDDLAndAdornProgram
from FuXi.Rete.Magic import DerivedPredicateIterator
from FuXi.Rete.RuleStore import SetupRuleStore
from FuXi.Rete.TopDown import PrepareSipCollection
from FuXi.Rete.TopDown import RDFTuplesToSPARQL
from FuXi.Rete.TopDown import mergeMappings1To2
from FuXi.Rete.SidewaysInformationPassing import GetOp
from FuXi.Rete.SidewaysInformationPassing import SIPRepresentation
from FuXi.Rete.Util import LOG
from FuXi.LP.BackwardFixpointProcedure import BackwardFixpointProcedure
from FuXi.LP import IdentifyHybridPredicates
from FuXi.Horn.PositiveConditions import BuildUnitermFromTuple
from FuXi.SPARQL import EDBQuery
# for docstring/test purposes
from rdflib import Graph
assert Graph
from rdflib import RDFS
assert RDFS
from rdflib.plugins.sparql.parserutils import Expr as AlgebraExpression
assert AlgebraExpression
from rdflib.plugins.sparql.sparql import Query
assert Query
TOP_DOWN_METHOD = 0
BFP_METHOD = 1
DEFAULT_BUILTIN_MAP = {
LOG.equal: "%s = %s",
LOG.notEqualTo: "%s != %s"
}
class NonSymmetricBinaryOperator(AlgebraExpression):
def fetchTerminalExpression(self):
if self.right.name == 'BGP':
yield self.right
else:
for i in self.right.fetchTerminalExpression():
yield i
class TopDownSPARQLEntailingStore(Store):
"""
A Store which uses FuXi's magic set "sip strategies" and the in-memory SPARQL Algebra
implementation as a store-agnostic, top-down decision procedure for
semanic web SPARQL (OWL2-RL/RIF/N3) entailment regimes. Exposed
as a rdflib / layercake-python API for SPARQL datasets with entailment regimes
Queries are mediated over the SPARQL protocol using global schemas captured
as SW theories which describe and distinguish their predicate symbols
"""
context_aware = True
formula_aware = True
transaction_aware = True
regex_matching = NATIVE_REGEX
batch_unification = True
def getDerivedPredicates(self, expr, prologue):
# if isinstance(expr, BasicGraphPattern):
if expr.name == 'BGP':
for s, p, o, func in expr.patterns:
derivedPred = self.derivedPredicateFromTriple((s, p, o))
if derivedPred is not None:
yield derivedPred
elif isinstance(expr, NonSymmetricBinaryOperator):
for term in self.getDerivedPredicates(expr.left, prologue):
yield term
for term in self.getDerivedPredicates(expr.right, prologue):
yield term
else:
raise NotImplementedError(expr)
def isaBaseQuery(self, queryString, queryObj=None):
"""
If the given SPARQL query involves purely base predicates
it returns it (as a parsed string), otherwise it returns a SPARQL algebra
instance for top-down evaluation using this store
>>> graph=Graph()
>>> topDownStore = TopDownSPARQLEntailingStore(graph.store, graph, derivedPredicates=[RDFS.seeAlso], nsBindings={u"rdfs": str(RDFS)})
>>> rt=topDownStore.isaBaseQuery("SELECT * { [] rdfs:seeAlso [] }")
>>> isinstance(rt,(BasicGraphPattern, AlgebraExpression))
True
>>> rt=topDownStore.isaBaseQuery("SELECT * { [] a [] }")
>>> isinstance(rt,(Query, basestring)) #doctest: +SKIP
True
>>> rt=topDownStore.isaBaseQuery("SELECT * { [] a [] OPTIONAL { [] rdfs:seeAlso [] } }")
>>> isinstance(rt,(BasicGraphPattern, AlgebraExpression))
True
"""
from rdflib.plugins.sparql.query import Prologue
from rdflib.plugins.sparql.parser import parseQuery
from rdflib.plugins.sparql import sparql as sparqlModule
if queryObj:
query = queryObj
else:
query = parseQuery(queryString)
if not query.prologue:
query.prologue = Prologue(None, [])
query.prologue.prefixBindings.update(self.nsBindings)
else:
for prefix, nsInst in list(self.nsBindings.items()):
if prefix not in query.prologue.prefixBindings:
query.prologue.prefixBindings[prefix] = nsInst
sparqlModule.prologue = query.prologue
algebra = RenderSPARQLAlgebra(query, nsMappings=self.nsBindings)
return first(self.getDerivedPredicates(algebra, sparqlModule.prologue)) and algebra or query
def __init__(self,
store,
edb,
derivedPredicates=None,
idb=None,
DEBUG=False,
nsBindings={},
decisionProcedure=BFP_METHOD,
templateMap=None,
identifyHybridPredicates=False,
hybridPredicates=[]):
self.dataset = store
if hasattr(store, '_db'):
self._db = store._db
self.idb = idb and idb or set()
self.edb = edb
if derivedPredicates is None:
self.derivedPredicates = list(
DerivedPredicateIterator(self.edb, self.idb))
else:
self.derivedPredicates = derivedPredicates
self.DEBUG = DEBUG
self.nsBindings = nsBindings
self.edb.templateMap = DEFAULT_BUILTIN_MAP if templateMap is None\
else templateMap
self.queryNetworks = []
self.edbQueries = set()
if identifyHybridPredicates:
self.hybridPredicates = IdentifyHybridPredicates(edb,
self.derivedPredicates)
else:
self.hybridPredicates = hybridPredicates if hybridPredicates else []
# Update derived predicate list for synchrony with hybrid predicate
# rules
for hybridPred in self.hybridPredicates:
self.derivedPredicates.remove(hybridPred)
if isinstance(self.derivedPredicates, list):
self.derivedPredicates.append(URIRef(hybridPred + u'_derived'))
elif isinstance(self.derivedPredicates, set):
self.derivedPredicates.add(URIRef(hybridPred + u'_derived'))
else:
import warnings
warnings.warn(
"Collection of derived predicates is neither a list or a set.",
RuntimeWarning)
# Add a cache of the namespace bindings to use later in coining Qnames in
# generated queries
self.edb.revNsMap = {}
self.edb.nsMap = {}
for k, v in list(nsBindings.items()):
self.edb.revNsMap[v] = k
self.edb.nsMap[k] = v
for key, uri in self.edb.namespaces():
self.edb.revNsMap[uri] = key
self.edb.nsMap[key] = uri
def invokeDecisionProcedure(self, tp, factGraph, bindings, debug, sipCollection):
isNotGround = first(filter(
lambda i: isinstance(i, Variable), tp))
rule_store, rule_graph, network = SetupRuleStore(makeNetwork=True)
bfp = BackwardFixpointProcedure(
factGraph,
network,
self.derivedPredicates,
tp,
sipCollection,
hybridPredicates=self.hybridPredicates,
debug=self.DEBUG)
bfp.createTopDownReteNetwork(self.DEBUG)
# rt = bfp.answers(debug=self.DEBUG)
self.queryNetworks.append((bfp.metaInterpNetwork, tp))
self.edbQueries.update(bfp.edbQueries)
if self.DEBUG:
print("Goal/Query: ", tp)
print("Query was not ground"
if isNotGround is not None else "Query was ground")
if isNotGround is not None:
for item in bfp.goalSolutions:
yield item, None
else:
yield True, None
if debug:
print(bfp.metaInterpNetwork)
bfp.metaInterpNetwork.reportConflictSet(True, sys.stderr)
for query in self.edbQueries:
print("Dispatched query against dataset: ", query.asSPARQL())
def conjunctiveSipStrategy(self, goalsRemaining, factGraph, bindings=None):
"""
Given a conjunctive set of triples, invoke sip-strategy passing
on intermediate solutions to facilitate 'join' behavior
"""
bindings = bindings if bindings else {}
try:
tp = next(goalsRemaining)
assert isinstance(bindings, dict)
dPred = self.derivedPredicateFromTriple(tp)
if dPred is None:
baseEDBQuery = EDBQuery([BuildUnitermFromTuple(tp)],
self.edb,
bindings=bindings)
if self.DEBUG:
print("Evaluating TP against EDB:%s" %
baseEDBQuery.asSPARQL())
query, rt = baseEDBQuery.evaluate()
# _vars = baseEDBQuery.returnVars
for item in rt:
bindings.update(item)
for ansDict in self.conjunctiveSipStrategy(
goalsRemaining,
factGraph,
bindings):
yield ansDict
else:
queryLit = BuildUnitermFromTuple(tp)
currentOp = GetOp(queryLit)
queryLit.setOperator(currentOp)
query = EDBQuery(
[queryLit], self.edb, bindings=bindings)
if bindings:
tp = first(query.formulae).toRDFTuple()
if self.DEBUG:
print("Goal/Query: ", query.asSPARQL())
SetupDDLAndAdornProgram(
self.edb,
self.idb,
[tp],
derivedPreds=self.derivedPredicates,
ignoreUnboundDPreds=True,
hybridPreds2Replace=self.hybridPredicates)
if self.hybridPredicates:
lit = BuildUnitermFromTuple(tp)
op = GetOp(lit)
if op in self.hybridPredicates:
lit.setOperator(URIRef(op + u'_derived'))
tp = lit.toRDFTuple()
sipCollection = PrepareSipCollection(self.edb.adornedProgram)
if self.DEBUG and sipCollection:
for sip in SIPRepresentation(sipCollection):
print(sip)
pprint(list(self.edb.adornedProgram), sys.stderr)
elif self.DEBUG:
print("No SIP graph.")
for nextAnswer, ns in self.invokeDecisionProcedure(
tp,
factGraph,
bindings,
self.DEBUG,
sipCollection):
nonGroundGoal = isinstance(nextAnswer, dict)
if nonGroundGoal or nextAnswer:
# Either we recieved bindings from top-down evaluation
# or we (successfully) proved a ground query
if not nonGroundGoal:
# Attempt to prove a ground query, return the
# response
rt = nextAnswer
else:
# Recieved solutions to 'open' query, merge with given bindings
# and continue
rt = mergeMappings1To2(bindings, nextAnswer)
# either answers were provided (the goal wasn't grounded) or
# the goal was ground and successfully proved
for ansDict in self.conjunctiveSipStrategy(
goalsRemaining,
factGraph,
rt):
yield ansDict
except StopIteration:
yield bindings
def derivedPredicateFromTriple(self, triple):
"""
Given a triple, return its predicate (if derived)
or None otherwise
"""
(s, p, o) = triple
if p in self.derivedPredicates or p in self.hybridPredicates:
return p
elif p == RDF.type and o != p and (
o in self.derivedPredicates or o in self.hybridPredicates):
return o
else:
return None
def sparql_query(self,
queryString,
queryObj,
graph,
dataSetBase,
extensionFunctions,
initBindings={},
initNs={},
DEBUG=False):
"""
The default 'native' SPARQL implementation is based on sparql-p's expansion trees
layered on top of the read-only RDF APIs of the underlying store
"""
from rdflib.sparql.Algebra import TopEvaluate
from rdflib.QueryResult import QueryResult
from rdflib import plugin
from rdflib.sparql.bison.Query import AskQuery
_expr = self.isaBaseQuery(None, queryObj)
if isinstance(queryObj.query, AskQuery) and \
_expr.name == 'BGP':
# isinstance(_expr, BasicGraphPattern):
# This is a ground, BGP, involving IDB and can be solved directly
# using top-down decision procedure
# First separate out conjunct into EDB and IDB predicates
# (solving the former first)
from FuXi.SPARQL import EDBQuery
groundConjunct = []
derivedConjunct = []
for s, p, o, func in _expr.patterns:
if self.derivedPredicateFromTriple((s, p, o)) is None:
groundConjunct.append(BuildUnitermFromTuple((s, p, o)))
else:
derivedConjunct.append(BuildUnitermFromTuple((s, p, o)))
if groundConjunct:
baseEDBQuery = EDBQuery(groundConjunct, self.edb)
subQuery, ans = baseEDBQuery.evaluate(DEBUG)
assert isinstance(ans, bool), ans
if groundConjunct and not ans:
askResult = False
else:
askResult = True
for derivedLiteral in derivedConjunct:
goal = derivedLiteral.toRDFTuple()
# Solve ground, derived goal directly
SetupDDLAndAdornProgram(
self.edb,
self.idb,
[goal],
derivedPreds=self.derivedPredicates,
ignoreUnboundDPreds=True,
hybridPreds2Replace=self.hybridPredicates)
if self.hybridPredicates:
lit = BuildUnitermFromTuple(goal)
op = GetOp(lit)
if op in self.hybridPredicates:
lit.setOperator(URIRef(op + u'_derived'))
goal = lit.toRDFTuple()
sipCollection = PrepareSipCollection(
self.edb.adornedProgram)
if self.DEBUG and sipCollection:
for sip in SIPRepresentation(sipCollection):
print(sip)
pprint(list(self.edb.adornedProgram))
elif self.DEBUG:
print("No SIP graph.")
rt, node = first(self.invokeDecisionProcedure(
goal,
self.edb,
{},
self.DEBUG,
sipCollection))
if not rt:
askResult = False
break
return plugin.get('SPARQLQueryResult', QueryResult)(askResult)
else:
rt = TopEvaluate(queryObj,
graph,
initBindings,
DEBUG=self.DEBUG,
dataSetBase=dataSetBase,
extensionFunctions=extensionFunctions)
return plugin.get('SPARQLQueryResult', QueryResult)(rt)
def batch_unify(self, patterns):
"""
Perform RDF triple store-level unification of a list of triple
patterns (4-item tuples which correspond to a SPARQL triple pattern
with an additional constraint for the graph name).
Uses a SW sip-strategy implementation to solve the conjunctive goal
and yield unified bindings
:Parameters:
- `patterns`: a list of 4-item tuples where any of the items can be
one of: Variable, URIRef, BNode, or Literal.
Returns a generator over dictionaries of solutions to the list of
triple patterns that are entailed by the regime.
"""
dPreds = set()
goals = []
for s, p, o, g in patterns:
goals.append((s, p, o))
dPred = o if p == RDF.type else p
if dPred in self.hybridPredicates:
dPreds.add(URIRef(dPred + u'_derived'))
else:
dPreds.add(p == RDF.type and o or p)
if set(dPreds).intersection(self.derivedPredicates):
# Patterns involve derived predicates
self.batch_unification = False
for ansDict in self.conjunctiveSipStrategy(
iter(goals),
self.edb):
yield ansDict
self.batch_unification = True
else:
# conjunctive query involving EDB predicateso only
vars = []
triples = []
for pat in patterns:
triples.append(BuildUnitermFromTuple(pat[:3]))
vars.extend([term for term in pat[:3]
if isinstance(term, Variable)])
query = RDFTuplesToSPARQL(triples, self.edb, vars=vars)
if self.DEBUG:
print("Batch unify resolved against EDB")
print(query)
rt = self.edb.query(query, initNs=self.nsBindings)
rt = len(vars) > 1 and (dict([(vars[idx], i)
for idx, i in enumerate(v)])
for v in rt) \
or (dict([(vars[0], v)]) for v in rt)
for item in rt:
yield item
def close(self, commit_pending_transaction=False):
"""
This closes the database connection. The commit_pending_transaction parameter specifies whether to
commit all pending transactions before closing (if the store is transactional).
"""
return self.dataset.close(commit_pending_transaction)
def destroy(self, configuration):
"""
This destroys the instance of the store identified by the configuration string.
"""
return self.dataset.destroy(configuration)
def triples_choices(self, triple, context=None):
"""
A variant of triples that can take a list of terms instead of a single
term in any slot. Stores can implement this to optimize the response time
from the default 'fallback' implementation, which will iterate
over each term in the list and dispatch to tripless
"""
for rt in self.dataset.triples_choices(triple, context):
yield rt
def triples(self, triple, context=None):
"""
A generator over all the triples matching the pattern. Pattern can
include any objects for used for comparing against nodes in the store, for
example, REGEXTerm, URIRef, Literal, BNode, Variable, Graph, QuotedGraph, Date? DateRange?
A conjunctive query can be indicated by either providing a value of None
for the context or the identifier associated with the Conjunctive Graph (if it's context aware).
"""
for rt in self.dataset.triples(triple, context):
yield rt
def __len__(self, context=None):
"""
Number of statements in the store. This should only account for non-quoted (asserted) statements
if the context is not specified, otherwise it should return the number of statements in the formula or context given.
"""
return len(self.dataset)
def contexts(self, triple=None):
"""
Generator over all contexts in the graph. If triple is specified, a generator over all
contexts the triple is in.
"""
for ctx in self.dataset.contexts(triple):
yield ctx
# Optional Namespace methods
def bind(self, prefix, namespace):
self.nsBindings[prefix] = namespace
# self.targetGraph.bind(prefix, namespace)
def prefix(self, namespace):
revDict = dict([(v, k) for k, v in list(self.nsBindings.items())])
return revDict.get(namespace)
def namespace(self, prefix):
return self.nsBindings.get(prefix)
def namespaces(self):
for prefix, nsUri in list(self.nsBindings.items()):
yield prefix, nsUri
# Optional Transactional methods
def commit(self):
self.dataset.commit()
def rollback(self):
self.dataset.rollback()
def test():
import doctest
doctest.testmod()
if __name__ == '__main__':
test()
# from FuXi.SPARQL.BackwardChainingStore import TopDownSPARQLEntailingStore