-
Notifications
You must be signed in to change notification settings - Fork 121
/
SparqlDataAccessor.ts
337 lines (302 loc) · 11.8 KB
/
SparqlDataAccessor.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import type { Readable } from 'stream';
import arrayifyStream from 'arrayify-stream';
import { SparqlEndpointFetcher } from 'fetch-sparql-endpoint';
import { DataFactory } from 'n3';
import type { NamedNode, Quad } from 'rdf-js';
import type {
ConstructQuery, GraphPattern,
GraphQuads,
InsertDeleteOperation,
SparqlGenerator,
Update,
UpdateOperation,
} from 'sparqljs';
import {
Generator,
} from 'sparqljs';
import type { Representation } from '../../ldp/representation/Representation';
import { RepresentationMetadata } from '../../ldp/representation/RepresentationMetadata';
import type { ResourceIdentifier } from '../../ldp/representation/ResourceIdentifier';
import { getLoggerFor } from '../../logging/LogUtil';
import { INTERNAL_QUADS } from '../../util/ContentTypes';
import { ConflictHttpError } from '../../util/errors/ConflictHttpError';
import { isNativeError } from '../../util/errors/ErrorUtil';
import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError';
import { NotImplementedHttpError } from '../../util/errors/NotImplementedHttpError';
import { UnsupportedMediaTypeHttpError } from '../../util/errors/UnsupportedMediaTypeHttpError';
import { guardStream } from '../../util/GuardedStream';
import type { Guarded } from '../../util/GuardedStream';
import type { IdentifierStrategy } from '../../util/identifiers/IdentifierStrategy';
import { isContainerIdentifier } from '../../util/PathUtil';
import { CONTENT_TYPE, LDP } from '../../util/Vocabularies';
import type { DataAccessor } from './DataAccessor';
const { defaultGraph, namedNode, quad, variable } = DataFactory;
/**
* Stores all data and metadata of resources in a SPARQL backend.
* Communication is done by sending SPARQL queries.
* Queries are constructed in such a way to keep everything consistent,
* such as updating containment triples and deleting old data when it is overwritten.
*
* Since metadata is hidden, no containment triples are stored for metadata files.
*
* All input container metadata is stored in its metadata identifier.
* The containment triples are stored in the graph corresponding to the actual identifier
* so those don't get overwritten.
*/
export class SparqlDataAccessor implements DataAccessor {
protected readonly logger = getLoggerFor(this);
private readonly endpoint: string;
private readonly identifierStrategy: IdentifierStrategy;
private readonly fetcher: SparqlEndpointFetcher;
private readonly generator: SparqlGenerator;
public constructor(endpoint: string, identifierStrategy: IdentifierStrategy) {
this.endpoint = endpoint;
this.identifierStrategy = identifierStrategy;
this.fetcher = new SparqlEndpointFetcher();
this.generator = new Generator();
}
/**
* Only Quad data streams are supported.
*/
public async canHandle(representation: Representation): Promise<void> {
if (representation.binary || representation.metadata.contentType !== INTERNAL_QUADS) {
throw new UnsupportedMediaTypeHttpError('Only Quad data is supported.');
}
}
/**
* Returns all triples stored for the corresponding identifier.
* Note that this will not throw a 404 if no results were found.
*/
public async getData(identifier: ResourceIdentifier): Promise<Guarded<Readable>> {
const name = namedNode(identifier.path);
return await this.sendSparqlConstruct(this.sparqlConstruct(name));
}
/**
* Returns the metadata for the corresponding identifier.
* Will throw 404 if no metadata was found.
*/
public async getMetadata(identifier: ResourceIdentifier): Promise<RepresentationMetadata> {
const name = namedNode(identifier.path);
const query = isContainerIdentifier(identifier) ?
this.sparqlConstructContainer(name) :
this.sparqlConstruct(this.getMetadataNode(name));
const stream = await this.sendSparqlConstruct(query);
const quads = await arrayifyStream(stream);
if (quads.length === 0) {
throw new NotFoundHttpError();
}
const metadata = new RepresentationMetadata(identifier).addQuads(quads);
if (!isContainerIdentifier(identifier)) {
metadata.contentType = INTERNAL_QUADS;
}
return metadata;
}
/**
* Writes the given metadata for the container.
*/
public async writeContainer(identifier: ResourceIdentifier, metadata: RepresentationMetadata): Promise<void> {
const { name, parent } = this.getRelatedNames(identifier);
return this.sendSparqlUpdate(this.sparqlInsert(name, metadata, parent));
}
/**
* Reads the given data stream and stores it together with the metadata.
*/
public async writeDocument(identifier: ResourceIdentifier, data: Guarded<Readable>, metadata: RepresentationMetadata):
Promise<void> {
if (this.isMetadataIdentifier(identifier)) {
throw new ConflictHttpError('Not allowed to create NamedNodes with the metadata extension.');
}
const { name, parent } = this.getRelatedNames(identifier);
const triples = await arrayifyStream(data) as Quad[];
const def = defaultGraph();
if (triples.some((triple): boolean => !def.equals(triple.graph))) {
throw new NotImplementedHttpError('Only triples in the default graph are supported.');
}
// Not relevant since all content is triples
metadata.removeAll(CONTENT_TYPE);
return this.sendSparqlUpdate(this.sparqlInsert(name, metadata, parent, triples));
}
/**
* Removes all graph data relevant to the given identifier.
*/
public async deleteResource(identifier: ResourceIdentifier): Promise<void> {
const { name, parent } = this.getRelatedNames(identifier);
return this.sendSparqlUpdate(this.sparqlDelete(name, parent));
}
/**
* Helper function to get named nodes corresponding to the identifier and its parent container.
* In case of a root container only the name will be returned.
*/
private getRelatedNames(identifier: ResourceIdentifier): { name: NamedNode; parent?: NamedNode } {
const name = namedNode(identifier.path);
// Root containers don't have a parent
if (this.identifierStrategy.isRootContainer(identifier)) {
return { name };
}
const parentIdentifier = this.identifierStrategy.getParentContainer(identifier);
const parent = namedNode(parentIdentifier.path);
return { name, parent };
}
/**
* Creates the name for the metadata of a resource.
* @param name - Name of the (non-metadata) resource.
*/
private getMetadataNode(name: NamedNode): NamedNode {
return namedNode(`meta:${name.value}`);
}
/**
* Checks if the given identifier corresponds to the names used for metadata identifiers.
*/
private isMetadataIdentifier(identifier: ResourceIdentifier): boolean {
return identifier.path.startsWith('meta:');
}
/**
* Creates a CONSTRUCT query that returns all quads contained within a single resource.
* @param name - Name of the resource to query.
*/
private sparqlConstruct(name: NamedNode): ConstructQuery {
const pattern = quad(variable('s'), variable('p'), variable('o'));
return {
queryType: 'CONSTRUCT',
template: [ pattern ],
where: [ this.sparqlSelectGraph(name, [ pattern ]) ],
type: 'query',
prefixes: {},
};
}
private sparqlConstructContainer(name: NamedNode): ConstructQuery {
const pattern = quad(variable('s'), variable('p'), variable('o'));
return {
queryType: 'CONSTRUCT',
template: [ pattern ],
where: [{
type: 'union',
patterns: [
this.sparqlSelectGraph(name, [ pattern ]),
this.sparqlSelectGraph(this.getMetadataNode(name), [ pattern ]),
],
}],
type: 'query',
prefixes: {},
};
}
private sparqlSelectGraph(name: NamedNode, triples: Quad[]): GraphPattern {
return {
type: 'graph',
name,
patterns: [{ type: 'bgp', triples }],
};
}
/**
* Creates an update query that overwrites the data and metadata of a resource.
* If there are no triples we assume it's a container (so don't overwrite the main graph with containment triples).
* @param name - Name of the resource to update.
* @param parent - Name of the parent to update the containment triples.
* @param metadata - New metadata of the resource.
* @param triples - New data of the resource.
*/
private sparqlInsert(name: NamedNode, metadata: RepresentationMetadata, parent?: NamedNode, triples?: Quad[]):
Update {
const metaName = this.getMetadataNode(name);
// Insert new metadata and containment triple
const insert: GraphQuads[] = [ this.sparqlUpdateGraph(metaName, metadata.quads()) ];
if (parent) {
insert.push(this.sparqlUpdateGraph(parent, [ quad(parent, LDP.terms.contains, name) ]));
}
// Necessary updates: delete metadata and insert new data
const updates: UpdateOperation[] = [
this.sparqlUpdateDeleteAll(metaName),
{
updateType: 'insert',
insert,
},
];
// Only overwrite data triples for documents
if (triples) {
// This needs to be first so it happens before the insert
updates.unshift(this.sparqlUpdateDeleteAll(name));
if (triples.length > 0) {
insert.push(this.sparqlUpdateGraph(name, triples));
}
}
return {
updates,
type: 'update',
prefixes: {},
};
}
/**
* Creates a query that deletes everything related to the given name.
* @param name - Name of resource to delete.
* @param parent - Parent of the resource to delete so the containment triple can be removed (unless root).
*/
private sparqlDelete(name: NamedNode, parent?: NamedNode): Update {
const update: Update = {
updates: [
this.sparqlUpdateDeleteAll(name),
this.sparqlUpdateDeleteAll(this.getMetadataNode(name)),
],
type: 'update',
prefixes: {},
};
if (parent) {
update.updates.push({
updateType: 'delete',
delete: [ this.sparqlUpdateGraph(parent, [ quad(parent, LDP.terms.contains, name) ]) ],
});
}
return update;
}
/**
* Helper function for creating SPARQL update queries.
* Creates an operation for deleting all triples in a graph.
* @param name - Name of the graph to delete.
*/
private sparqlUpdateDeleteAll(name: NamedNode): InsertDeleteOperation {
return {
updateType: 'deletewhere',
delete: [ this.sparqlUpdateGraph(name, [ quad(variable(`s`), variable(`p`), variable(`o`)) ]) ],
};
}
/**
* Helper function for creating SPARQL update queries.
* Creates a Graph selector with the given triples.
* @param name - Name of the graph.
* @param triples - Triples/triple patterns to select.
*/
private sparqlUpdateGraph(name: NamedNode, triples: Quad[]): GraphQuads {
return { type: 'graph', name, triples };
}
/**
* Sends a SPARQL CONSTRUCT query to the endpoint and returns a stream of quads.
* @param sparqlQuery - Query to execute.
*/
private async sendSparqlConstruct(sparqlQuery: ConstructQuery): Promise<Guarded<Readable>> {
const query = this.generator.stringify(sparqlQuery);
this.logger.info(`Sending SPARQL CONSTRUCT query to ${this.endpoint}: ${query}`);
try {
return guardStream(await this.fetcher.fetchTriples(this.endpoint, query));
} catch (error: unknown) {
if (isNativeError(error)) {
this.logger.error(`SPARQL endpoint ${this.endpoint} error: ${error.message}`);
}
throw error;
}
}
/**
* Sends a SPARQL update query to the stored endpoint.
* @param sparqlQuery - Query to send.
*/
private async sendSparqlUpdate(sparqlQuery: Update): Promise<void> {
const query = this.generator.stringify(sparqlQuery);
this.logger.info(`Sending SPARQL UPDATE query to ${this.endpoint}: ${query}`);
try {
return await this.fetcher.fetchUpdate(this.endpoint, query);
} catch (error: unknown) {
if (isNativeError(error)) {
this.logger.error(`SPARQL endpoint ${this.endpoint} error: ${error.message}`);
}
throw error;
}
}
}