Skip to content

Commit

Permalink
chore: upgrade to comunica v3
Browse files Browse the repository at this point in the history
  • Loading branch information
jeswr committed Apr 1, 2024
1 parent 25ef2a1 commit 1726926
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 137 deletions.
78 changes: 9 additions & 69 deletions src/ComunicaEngine.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { QueryEngine } from '@comunica/query-sparql-solid';
import type * as RDF from '@rdfjs/types';
import type { BindingsStream, Bindings, DataSources, IDataSource } from '@comunica/types';

export type DataSource = URL | RDF.NamedNode | IDataSource;
import type { Bindings, SourceType } from '@comunica/types';

export type DataSource = URL | RDF.NamedNode | SourceType;
// Change over to this type once https://github.com/microsoft/TypeScript/issues/47208#issuecomment-1014128087 is resolved
// export type RawDataSources = DataSource | Promise<RawDataSources> | RawDataSources[];
export type MaybePromiseArray<T> = T | T[] | Promise<T>;
Expand Down Expand Up @@ -39,13 +38,13 @@ export interface EngineSettings {
* Asynchronous iterator wrapper for the Comunica SPARQL query engine.
*/
export default class ComunicaEngine {
private sources: Promise<DataSources>;
private sources: Promise<SourceType[]>;

private engine: QueryEngine;

private options: any;

private destination: Promise<DataSources> | undefined;
private destination: Promise<SourceType[]> | undefined;

/**
* Create a ComunicaEngine to query the given default source.
Expand Down Expand Up @@ -79,7 +78,7 @@ export default class ComunicaEngine {
}
else if (sources.length !== 0) {
// Execute the query and yield the results
for await (const binding of this.streamToAsyncIterable(await this.engine.queryBindings(sparql, { sources, ...this.options }))) {
for await (const binding of await this.engine.queryBindings(sparql, { sources, ...this.options })) {
yield new Proxy(binding, {
get(target, name) {
if (name === 'values')
Expand All @@ -96,7 +95,7 @@ export default class ComunicaEngine {
* Creates an asynchronous iterable with the results of the SPARQL UPDATE query.
*/
async* executeUpdate(sparql: string, source?: RawDataSources): AsyncIterableIterator<never> {
let sources: DataSources;
let sources: SourceType[];
// Need to await the destination
const destination = await this.destination;

Expand Down Expand Up @@ -124,7 +123,7 @@ export default class ComunicaEngine {
/**
* Parses the source(s) into an array of Comunica sources.
*/
private async parseSources(source?: RawDataSources, defaultSources: Promise<DataSources> | DataSources = []): Promise<DataSources> {
private async parseSources(source?: RawDataSources, defaultSources: Promise<SourceType[]> | SourceType[] = []): Promise<SourceType[]> {
const sources = await source;
if (!sources)
return defaultSources;
Expand All @@ -145,7 +144,7 @@ export default class ComunicaEngine {

// Needs to be after the string check since those also have a match functions
if ('match' in sources && typeof sources.match === 'function')
return [{ value: sources, type: 'rdfjsSource' }];
return [sources];

// Wrap a single source in an array
if ('value' in sources && typeof sources.value === 'string')
Expand All @@ -155,70 +154,11 @@ export default class ComunicaEngine {
throw new Error(`Unsupported source: ${sources}`);
}

/**
* Transforms the readable into an asynchronously iterable object
*/
private streamToAsyncIterable(readable: BindingsStream): AsyncIterableIterator<Bindings> {
let done = false;
let pendingError: Error | undefined;
let pendingPromise: { resolve: (bindings: IteratorResult<Bindings>) => void, reject: (err: Error) => void } | null;

readable.on('readable', settlePromise);
readable.on('error', finish);
readable.on('end', finish);

return {
next: () => new Promise(trackPromise),
[Symbol.asyncIterator]() { return this; },
};

function trackPromise(resolve: (bindings: IteratorResult<Bindings>) => void, reject: (err: Error) => void) {
pendingPromise = { resolve, reject };
settlePromise();
}

function settlePromise() {
// Finish if the stream errored or ended
if (done || pendingError) {
finish();
}
// Try to resolve the promise with a value
else if (pendingPromise) {
const value = readable.read();
if (value !== null) {
pendingPromise.resolve({ value });
pendingPromise = null;
}
}
}

function finish(error?: Error) {
// Finish with or without an error
if (!pendingError) {
done = true;
pendingError = error;
}
// Try to emit the result
if (pendingPromise) {
if (!pendingError)
// @ts-ignore
pendingPromise.resolve({ done });
else
pendingPromise.reject(pendingError);
pendingPromise = null;
}
// Detach listeners
readable.on('readable', settlePromise);
readable.on('error', finish);
readable.on('end', finish);
}
}

/**
* Removes the given document (or all, if not specified) from the cache,
* such that fresh results are obtained next time.
*/
async clearCache(document: string) {
async clearCache(document?: string) {
await this.engine.invalidateHttpCache(document);
}
}
Expand Down
85 changes: 17 additions & 68 deletions test/ComunicaEngine-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import ComunicaEngine from '../src/ComunicaEngine';

import { mockHttp, readAll } from './util';
import { Store, DataFactory } from 'n3';
const { namedNode, defaultGraph, quad } = DataFactory;
import { Readable } from 'stream';
const { namedNode, quad } = DataFactory;

const SELECT_TYPES = `
SELECT ?subject ?type WHERE {
Expand Down Expand Up @@ -76,43 +75,29 @@ describe('An ComunicaEngine instance without default source', () => {
});

it('yields results for a SELECT query with an RDF/JS source', async () => {
// Create source with specific result stream
const stream = new Readable({ objectMode: true });
stream.push({
subject: namedNode(PROFILE_URL),
predicate: namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
object: namedNode('http://xmlns.com/foaf/0.1/Person'),
graph: defaultGraph,
});
stream.push({
subject: namedNode(PROFILE_URL),
predicate: namedNode('http://example.org/#custom'),
object: namedNode('http://xmlns.com/foaf/0.1/Agent'),
graph: defaultGraph,
});
stream.push(null);
const source = { match: jest.fn(() => stream) };
const source = new Store([
quad(
namedNode(PROFILE_URL),
namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
namedNode('http://xmlns.com/foaf/0.1/Person'),
),
quad(
namedNode(PROFILE_URL),
namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'),
namedNode('http://xmlns.com/foaf/0.1/Agent'),
),
]);

jest.spyOn(source, 'match');

// Count query results
const result = engine.execute(SELECT_TYPES, source);
const items = await readAll(result);
expect(items).toHaveLength(2);

// Verify correct usage of source
expect(source.match).toHaveBeenCalled();
expect(source.match.mock.calls[0]).toHaveLength(4);
// @ts-ignore
expect(source.match.mock.calls[0][0]).toBe(undefined);
// @ts-ignore
expect(source.match.mock.calls[0][1]
.equals(namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type')))
.toBe(true);
// @ts-ignore
expect(source.match.mock.calls[0][2]).toBe(undefined);
// @ts-ignore
expect(source.match.mock.calls[0][3]
.equals(defaultGraph()))
.toBe(true);
expect(source.match).toHaveBeenCalledWith(undefined, expect.objectContaining({ termType: 'NamedNode', value: 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type' }), undefined, expect.objectContaining({ termType: 'DefaultGraph', value: '' }));
expect(source.match).toHaveBeenCalledTimes(1);
});

it('throws an error with an unsupported source', async () => {
Expand All @@ -128,42 +113,6 @@ describe('An ComunicaEngine instance without default source', () => {
await expect(readAll(result)).rejects.toThrow('Parse error');
});

it('reads an ended stream', async () => {
const stream = new Readable();
stream.push(null);
// @ts-expect-error
const result = engine.streamToAsyncIterable(stream);
expect(await readAll(result)).toHaveLength(0);
});

it('reads a stream that ends immediately', async () => {
const stream = new Readable();
// @ts-expect-error
const result = engine.streamToAsyncIterable(stream);
stream.push(null);
await new Promise(resolve => setImmediate(resolve));
expect(await readAll(result)).toHaveLength(0);
});

it('throws an error when the stream errors before reading starts', async () => {
const stream = new Readable();
stream._read = () => {};
// @ts-expect-error
const result = engine.streamToAsyncIterable(stream);
stream.emit('error', new Error('my error'));
stream.emit('error', new Error('my other error'));
await expect(readAll(result)).rejects.toThrow('my error');
});

it('throws an error when the stream errors after reading has started', async () => {
const stream = new Readable();
stream._read = () => {};
// @ts-expect-error
const result = engine.streamToAsyncIterable(stream);
setImmediate(() => stream.emit('error', new Error('my error')));
await expect(readAll(result)).rejects.toThrow('my error');
});

it('clears the cache for a given document', async () => {
// @ts-ignore set up mock
const internalEngine = engine.engine;
Expand Down

0 comments on commit 1726926

Please sign in to comment.