/
repository.clj
600 lines (495 loc) · 20.3 KB
/
repository.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
(ns ^{:added "0.12.1"}
grafter-2.rdf4j.repository
"Functions for constructing and working with various RDF4j repositories."
(:require [clojure.java.io :as io]
[grafter-2.rdf.protocols :as pr]
[grafter-2.rdf4j.formats :as format]
[grafter-2.rdf4j.io :as rio]
[me.raynes.fs :as fs])
(:import [org.eclipse.rdf4j.model Resource Statement URI IRI Value]
[org.eclipse.rdf4j.query BindingSet BooleanQuery GraphQuery Query QueryLanguage TupleQuery Update]
[org.eclipse.rdf4j.repository Repository RepositoryConnection]
[org.eclipse.rdf4j.sail.inferencer.fc CustomGraphQueryInferencer DirectTypeHierarchyInferencer ForwardChainingRDFSInferencer]
[org.eclipse.rdf4j.common.iteration CloseableIteration Iteration])
(:import grafter_2.rdf.SPARQLRepository
grafter_2.rdf.protocols.IStatement
org.eclipse.rdf4j.model.impl.URIImpl
org.eclipse.rdf4j.query.impl.DatasetImpl
org.eclipse.rdf4j.repository.event.base.NotifyingRepositoryWrapper
org.eclipse.rdf4j.repository.http.HTTPRepository
org.eclipse.rdf4j.repository.sail.SailRepository
org.eclipse.rdf4j.sail.memory.MemoryStore
org.eclipse.rdf4j.sail.nativerdf.NativeStore))
(defprotocol ToConnection
(->connection [repo] "Given an RDF4j repository return a connection to it.
->connection is designed to be used with the macro with-open"))
(defn- resource-array #^"[Lorg.eclipse.rdf4j.model.Resource;" [& rs]
(into-array Resource rs))
(defn- statements? [coll]
(or (nil? coll)
(sequential? coll)
(set? coll)
(instance? IStatement coll)))
(extend-type RepositoryConnection
pr/ITripleWriteable
(pr/add-statement
([this statement]
{:pre [(instance? IStatement statement)]}
(let [^Statement sesame-statement (rio/quad->backend-quad statement)
resources (if-let [graph (pr/context statement)] (resource-array (rio/->rdf4j-uri graph)) (resource-array))]
(doto this (.add sesame-statement resources))))
([this graph statement]
{:pre [(instance? IStatement statement)]}
(let [^Statement stm (rio/quad->backend-quad statement)
resources (resource-array (rio/->rdf4j-uri graph))]
(doto this
(.add stm resources)))))
(pr/add
([this triples]
{:pre [(statements? triples)]}
(if (not (instance? IStatement triples))
(when (seq triples)
(let [^Iterable stmts (map rio/quad->backend-quad triples)]
(.add this stmts (resource-array))))
(pr/add-statement this triples))
this)
([this graph triples]
{:pre [(statements? triples)]}
(if (not (instance? IStatement triples))
(when (seq triples)
(let [^Iterable stmts (map rio/quad->backend-quad triples)]
(.add this stmts (resource-array (rio/->rdf4j-uri graph)))))
(pr/add-statement this graph triples))
this)
([this graph format triple-stream]
(doto this
(.add triple-stream nil format (resource-array (rio/->rdf4j-uri graph)))))
([this graph base-uri format triple-stream]
(doto this
(.add triple-stream base-uri format (resource-array (rio/->rdf4j-uri graph)))))))
(definline throw-deprecated-exception!
"Throw a more helpful error message alerting people to the need to
change code.
This is technically a breaking change, but it should indicate sites
which have a bug in them anyway."
[]
;; Use a definline to remove extra stack frame from output so
;; exception is closer to call site.
`(throw (ex-info "This function is no longer extended to Repository. You will need to update your code to call it on a repository connection instead."
{:error :deprecated-function})))
(extend-type Repository
pr/ITripleWriteable
(pr/add-statement
([this statement]
(throw-deprecated-exception!))
([this graph statement]
(throw-deprecated-exception!)))
(pr/add
([this triples]
(throw-deprecated-exception!))
([this graph triples]
(throw-deprecated-exception!))
([this graph format triple-stream]
(throw-deprecated-exception!))
([this graph base-uri format triple-stream]
(throw-deprecated-exception!)))
pr/ITripleDeleteable
(pr/delete
([this quads]
(throw-deprecated-exception!))
([this graph quads]
(throw-deprecated-exception!))))
(defn memory-store
"Instantiate a sesame RDF MemoryStore."
[]
(MemoryStore.))
(defn native-store
"Instantiate a sesame RDF NativeStore."
([datadir]
(native-store datadir "spoc,posc,cosp"))
([datadir indexes]
(NativeStore. (io/file datadir) indexes)))
(defn http-repo
"Given a URL as a String return a Sesame HTTPRepository for e.g.
interacting with the OpenRDF Workbench."
[repo-url]
(doto (HTTPRepository. repo-url)
(.initialize)))
(defn sparql-repo
"Given a query-url (String or IURI) and an optional update-url String
or IURI, return a Sesame SPARQLRepository for communicating with
remote repositories."
([query-url]
(doto (SPARQLRepository. (str query-url))
(.initialize)))
([query-url update-url]
(doto (SPARQLRepository. (str query-url)
(str update-url))
(.initialize))))
(defn notifying-repo
"Wrap the given repository in an RDF4j NotifyingRepositoryWrapper.
Once wrapped you can capture events on the underlying repository.
Supports two arities:
- Takes just a repo to wrap.
- Takes a repo to wrap and a boolean indicating whether to report
deltas on operations."
([^Repository repo]
(NotifyingRepositoryWrapper. repo))
([repo report-deltas]
(NotifyingRepositoryWrapper. repo report-deltas)))
(defn rdfs-inferencer
"Returns a Sesame ForwardChainingRDFSInferencer using the rules from
the RDF Semantics Recommendation (10 February 2004).
You can instantiate a repository with a memory store or a native
store or with any SAIL that returns InferencerConnections. e.g. to
instantiate a repository with a memory-store:
`(repo (rdfs-inferencer (memory-store)))`"
([]
(ForwardChainingRDFSInferencer.))
([notifying-sail]
(ForwardChainingRDFSInferencer. notifying-sail)))
(defn direct-type-inferencer
"A forward-chaining inferencer that infers the direct-type hierarchy
relations sesame:directSubClassOf, sesame:directSubPropertyOf and
sesame:directType."
([]
(DirectTypeHierarchyInferencer.))
([notifying-sail]
(DirectTypeHierarchyInferencer. notifying-sail)))
(defn custom-query-inferencer
"A forward-chaining inferencer that infers new statements using a
SPARQL graph query."
([]
(CustomGraphQueryInferencer.))
([query-text matcher-text]
(CustomGraphQueryInferencer. QueryLanguage/SPARQL query-text matcher-text))
([notifying-sail query-text matcher-text]
(CustomGraphQueryInferencer. notifying-sail QueryLanguage/SPARQL query-text matcher-text)))
(defn sail-repo
"Given a sesame Sail of some type, return a sesame SailRepository.
Finally you can also optionally supply a sesame sail to wrap the
repository, which can be used to configure a sesame NativeStore.
By default this function will return a repository initialised with a
Sesame MemoryStore."
([] (sail-repo (MemoryStore.)))
([sail]
(doto (SailRepository. sail)
(.initialize))))
(defn add->repo [repo]
(fn
([] (->connection repo))
([^RepositoryConnection acc] (.close acc) repo)
([^RepositoryConnection acc v]
(try (if (reduced? acc)
acc
(pr/add acc v))
(catch Throwable ex
(.close acc)
(throw (ex-info "Exception when adding to repository" {} ex)))))))
(defn- statements-with-inferred-format [res]
(if (seq? res)
res
(rio/statements res :format (format/->rdf-format (fs/extension (str res))))))
(defn fixture-repo
"adds the specified data to a sparql repository. if the first
argument is a repository that object is used, otherwise the first
and remaining arguments are assumed to be
grafter-2.core/ITripleWriteable and are loaded into a sesame
memorystore sail-repo.
this function is most useful for loading fixture data from files e.g.
(fixture-repo \"test-data.trig\" \"more-test-data.trig\")"
([] (sail-repo))
([repo-or-data & data]
(let [repo (if (instance? Repository repo-or-data)
repo-or-data
(let [repo (sail-repo)]
(with-open [conn (->connection repo)]
(pr/add conn (statements-with-inferred-format repo-or-data))
repo)))]
(let [xf (mapcat (fn [d]
(cond
(satisfies? pr/ITripleReadable d) (statements-with-inferred-format d)
(seq d) d)))]
(transduce xf (add->repo repo) data)))))
(defn resource-repo
"Like fixture repo but assumes all supplied data is on the java
resource path. For example:
(repo/resource-repo \"grafter/rdf/sparql/sparql-data.trig\"
\"grafter/rdf/rdf-types.trig\")
Will load the supplied RDF files from the resource path into a
single memory repository for testing.
If you want to use a custom repository the first argument can be a
repository."
([] (sail-repo))
([repo-or-data & data]
(let [repo (let [repo (sail-repo)]
(if (instance? Repository repo-or-data)
repo-or-data
(with-open [conn (->connection repo)]
(pr/add conn (statements-with-inferred-format (io/resource repo-or-data)))
repo)))]
(apply fixture-repo repo (map io/resource data)))))
(defn- query-bindings->map [^BindingSet qbs]
(let [boundvars (.getBindingNames qbs)]
(->> boundvars
(mapcat (fn [k]
[(keyword k) (some-> qbs (.getBinding k) .getValue pr/->grafter-type)]))
(apply hash-map))))
(extend-protocol pr/ITransactable
Repository
(begin [repo]
(throw-deprecated-exception!))
(commit [repo]
(throw-deprecated-exception!))
(rollback [repo]
(throw-deprecated-exception!))
RepositoryConnection
(begin [repo]
(-> repo .begin))
(commit [repo]
(-> repo .commit))
(rollback [repo]
(-> repo .rollback)))
(defmacro with-transaction
"Wraps the given forms in a transaction on the supplied repository.
Exceptions are rolled back on failure."
[repo & forms]
`(try
(pr/begin ~repo)
(let [return# (do ~@forms)]
(pr/commit ~repo)
return#)
(catch Exception e#
(pr/rollback ~repo)
(throw e#))))
(extend-type Repository
clojure.core.protocols/CollReduce
(coll-reduce
([this f]
(clojure.core.protocols/coll-reduce this f (f)))
([this f val]
(with-open [c (.getConnection this)]
(reduce f val c))))
pr/ISPARQLable
(pr/query-dataset
([this query-str model] (throw-deprecated-exception!))
([this query-str model opts] (throw-deprecated-exception!)))
pr/ISPARQLUpdateable
(pr/update! [this query-str]
(throw-deprecated-exception!))
pr/ITripleReadable
(pr/to-statements [this options]
(throw-deprecated-exception!)))
(defn ^:no-doc sesame-results->seq
"Convert a sesame results object into a lazy sequence of results."
([prepared-query] (sesame-results->seq prepared-query identity))
([^Query prepared-query converter-f]
(let [^CloseableIteration results (.evaluate prepared-query)
run-query (fn pull-query []
(try
(if (.hasNext results)
(let [current-result (try
(converter-f (.next results))
(catch Exception e
(.close results)
(throw (ex-info "Error reading results" {:prepared-query prepared-query} e))))]
(lazy-cat
[current-result]
(pull-query)))
(.close results))
(catch Exception e
(throw (ex-info "Error waiting on results" {:prepared-query prepared-query} e)))))]
(run-query))))
(defprotocol IQueryEvaluator
(evaluate [this] "Low level protocol to evaluate a sesame RDF Query
object, and convert the results into a grafter representation."))
(extend-protocol IQueryEvaluator
BooleanQuery
(evaluate [this]
(.evaluate this))
TupleQuery
(evaluate [this]
(sesame-results->seq this query-bindings->map))
GraphQuery
(evaluate [this]
(sesame-results->seq this rio/backend-quad->grafter-quad))
Update
(evaluate [this]
(.execute this)))
(defprotocol IPrepareQuery
(prepare-query* ^Query [this sparql-string restriction]
"Low level function to prepare (parse, but not process) a sesame RDF
query. Takes a repository a query string and an optional sesame
Dataset to act as a query restriction.
Prepared queries still need to be evaluated with evaluate."))
(extend-protocol IPrepareQuery
Repository
(prepare-query* [repo sparql-string restriction]
(println "WARNING: prepare-query* was called on a repository not a connection. This usage is deprecated and will be removed.")
(let [conn (->connection repo)]
(prepare-query* conn sparql-string restriction)))
RepositoryConnection
(prepare-query* [repo sparql-string restriction]
(let [conn (->connection repo)
pq (.prepareQuery conn
QueryLanguage/SPARQL
sparql-string)]
(when restriction (.setDataset pq restriction))
pq)))
(defn prepare-query
"Low level function to prepare (parse, but not process) a sesame RDF
query. Takes a repository a query string and an optional sesame
Dataset to act as a query restriction.
Prepared queries still need to be evaluated with evaluate."
(^Query [repo sparql-string] (prepare-query repo sparql-string nil))
(^Query [repo sparql-string restriction]
(prepare-query repo sparql-string restriction nil))
(^Query [repo sparql-string restriction {:keys [reasoning?] :as opts}]
(doto (prepare-query* repo sparql-string restriction)
(.setIncludeInferred (or reasoning? false)))))
(defn prepare-update
"Prepare (parse but don't process) a SPARQL update request.
Prepared updates still need to be evaluated with evaluate."
([repo sparql-update-str] (prepare-update repo sparql-update-str nil))
([repo sparql-update-str dataset]
(let [conn (->connection repo)]
(let [pu (.prepareUpdate conn
QueryLanguage/SPARQL
sparql-update-str)]
(when dataset
(.setDataset pu dataset))
pu))))
(extend-type RepositoryConnection
clojure.core.protocols/CollReduce
(coll-reduce
([this f]
(reduce f (f) (pr/to-statements this {})))
([this f val]
(reduce f val (pr/to-statements this {}))))
pr/ISPARQLable
(pr/query-dataset
([this sparql-string dataset]
(pr/query-dataset this sparql-string dataset {}))
([this sparql-string dataset opts]
(let [preped-query (prepare-query this sparql-string dataset opts)]
(evaluate preped-query))))
pr/ISPARQLUpdateable
(pr/update! [this sparql-string]
(let [prepared-query (.prepareUpdate this
QueryLanguage/SPARQL
sparql-string)]
(.execute prepared-query)))
pr/ITripleDeleteable
(pr/delete-statement
([this statement]
{:pre [(instance? IStatement statement)]}
(let [^Statement sesame-statement (rio/quad->backend-quad statement)
resources (if-let [graph (pr/context statement)] (resource-array (rio/->rdf4j-uri graph)) (resource-array))]
(doto this (.remove sesame-statement resources))))
([this graph statement]
{:pre [(instance? IStatement statement)]}
(let [^Statement stm (rio/quad->backend-quad statement)
resources (resource-array (rio/->rdf4j-uri graph))]
(doto this
(.remove stm resources)))))
(pr/delete
([this quads]
{:pre [(statements? quads)]}
(if (not (instance? IStatement quads))
(when (seq quads)
(let [^Iterable stmts
(map rio/quad->backend-quad quads)]
(.remove this stmts (resource-array))))
(pr/delete-statement this quads)))
([this graph triples]
{:pre [(statements? triples)]}
(if (not (instance? IStatement triples))
(when (seq triples)
(let [^Iterable stmts (map rio/quad->backend-quad triples)]
(.remove this stmts (resource-array (rio/->rdf4j-uri graph)))))
(pr/delete-statement this graph triples)))))
(extend-protocol ToConnection
RepositoryConnection
(->connection [conn]
conn)
Repository
(->connection [^Repository repo]
(.getConnection repo)))
(defn make-restricted-dataset
"Build a dataset to act as a graph restriction. You can specify for
both `:default-graph` and `:named-graphs`. Both of which take sequences
of URI strings."
[& {:as options}]
(let [->uri (fn [graph]
(if (instance? URI graph)
graph
(URIImpl. graph)))]
(when (or (:named-graphs options) (:default-graph options))
(let [{:keys [default-graph named-graphs]
:or {default-graph [] named-graphs []}} options
private-graph "urn:private-graph-to-force-restrictions-when-no-graphs-are-listed"
dataset (DatasetImpl.)]
(if (string? default-graph)
(.addDefaultGraph dataset (->uri default-graph))
(doseq [graph (conj default-graph private-graph)]
(.addDefaultGraph dataset (->uri graph))))
(if (string? named-graphs)
(.addNamedGraph dataset (->uri named-graphs))
(doseq [graph named-graphs]
(.addNamedGraph dataset (->uri graph))))
dataset))))
(defn- mapply [f & args]
(apply f (apply concat (butlast args) (last args))))
(defn- build-sparql-prefixes-block [prefix-map]
(str (reduce (fn [^StringBuffer sb [prefix uri]]
(.append sb (str "PREFIX " prefix ": <" uri ">\n")))
(StringBuffer.) prefix-map)))
(defn query
"Run an arbitrary SPARQL query. Works with ASK, DESCRIBE, CONSTRUCT
and SELECT queries.
You can call this on a Repository however if you do you may in some
cases cause a resource leak, for example if the sequence of results
isn't fully consumed.
To use this without leaking resources it is recommended that you
call ->connection on your repository, inside a with-open; and then
consume all your results inside of a nested doseq/dorun/etc...
e.g.
````
(with-open [conn (->connection repo)]
(doseq [res (query conn \"SELECT * WHERE { ?s ?p ?o .}\")]
(println res)))
````
Takes a repo and sparql string and an optional set of k/v argument
pairs, and executes the sparql query on the repository.
Options are:
- `:default-graph` a seq of URI strings representing named graphs to be set as
the default union graph for the query.
- `:named-graphs` a seq of URI strings representing the named graphs to be
used in the query.
- `:reasoning?` `true|false` whether or not reasoning/inference should be used
in the query. DEFAULT: `false`
If no options are passed then we use the default of no graph
restrictions whilst the union graph is the union of all graphs."
[repo sparql & {:as options :keys [prefixes]}]
;; we could call .setNamespace on the connection, but
;; connection/namespaces are mutable so better to prepend the
;; prefixes onto the SPARQL string ourselves.
(let [sparql (str (build-sparql-prefixes-block prefixes) sparql)
dataset (mapply make-restricted-dataset (or options {}))]
(pr/query-dataset repo sparql dataset options)))
(extend-type RepositoryConnection
pr/ITripleReadable
(pr/to-statements [this {:keys [:grafter-2.repository/infer] :or {infer true}}]
(let [f (fn next-item [^Iteration i]
(when (.hasNext i)
(let [v (.next i)]
(lazy-seq (cons (rio/backend-quad->grafter-quad v) (next-item i))))))
^Resource subj nil
^IRI pred nil
^Value obj nil
iter (.getStatements this subj pred obj (boolean infer) (resource-array))]
(f iter))))
(defn shutdown
"Cleanly shutsdown the repository."
[^Repository repo]
(.shutDown repo))