From 1726926972753ddc06cfd1e2ba2c5cf9169ee44a Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Mon, 1 Apr 2024 23:23:23 +0100 Subject: [PATCH] chore: upgrade to comunica v3 --- src/ComunicaEngine.ts | 78 ++++------------------------------ test/ComunicaEngine-test.ts | 85 ++++++++----------------------------- 2 files changed, 26 insertions(+), 137 deletions(-) diff --git a/src/ComunicaEngine.ts b/src/ComunicaEngine.ts index 4db118e..c4332e4 100644 --- a/src/ComunicaEngine.ts +++ b/src/ComunicaEngine.ts @@ -1,9 +1,8 @@ import { QueryEngine } from '@comunica/query-sparql-solid'; import type * as RDF from '@rdfjs/types'; -import type { BindingsStream, Bindings, DataSources, IDataSource } from '@comunica/types'; - -export type DataSource = URL | RDF.NamedNode | IDataSource; +import type { Bindings, SourceType } from '@comunica/types'; +export type DataSource = URL | RDF.NamedNode | SourceType; // Change over to this type once https://github.com/microsoft/TypeScript/issues/47208#issuecomment-1014128087 is resolved // export type RawDataSources = DataSource | Promise | RawDataSources[]; export type MaybePromiseArray = T | T[] | Promise; @@ -39,13 +38,13 @@ export interface EngineSettings { * Asynchronous iterator wrapper for the Comunica SPARQL query engine. */ export default class ComunicaEngine { - private sources: Promise; + private sources: Promise; private engine: QueryEngine; private options: any; - private destination: Promise | undefined; + private destination: Promise | undefined; /** * Create a ComunicaEngine to query the given default source. @@ -79,7 +78,7 @@ export default class ComunicaEngine { } else if (sources.length !== 0) { // Execute the query and yield the results - for await (const binding of this.streamToAsyncIterable(await this.engine.queryBindings(sparql, { sources, ...this.options }))) { + for await (const binding of await this.engine.queryBindings(sparql, { sources, ...this.options })) { yield new Proxy(binding, { get(target, name) { if (name === 'values') @@ -96,7 +95,7 @@ export default class ComunicaEngine { * Creates an asynchronous iterable with the results of the SPARQL UPDATE query. */ async* executeUpdate(sparql: string, source?: RawDataSources): AsyncIterableIterator { - let sources: DataSources; + let sources: SourceType[]; // Need to await the destination const destination = await this.destination; @@ -124,7 +123,7 @@ export default class ComunicaEngine { /** * Parses the source(s) into an array of Comunica sources. */ - private async parseSources(source?: RawDataSources, defaultSources: Promise | DataSources = []): Promise { + private async parseSources(source?: RawDataSources, defaultSources: Promise | SourceType[] = []): Promise { const sources = await source; if (!sources) return defaultSources; @@ -145,7 +144,7 @@ export default class ComunicaEngine { // Needs to be after the string check since those also have a match functions if ('match' in sources && typeof sources.match === 'function') - return [{ value: sources, type: 'rdfjsSource' }]; + return [sources]; // Wrap a single source in an array if ('value' in sources && typeof sources.value === 'string') @@ -155,70 +154,11 @@ export default class ComunicaEngine { throw new Error(`Unsupported source: ${sources}`); } - /** - * Transforms the readable into an asynchronously iterable object - */ - private streamToAsyncIterable(readable: BindingsStream): AsyncIterableIterator { - let done = false; - let pendingError: Error | undefined; - let pendingPromise: { resolve: (bindings: IteratorResult) => void, reject: (err: Error) => void } | null; - - readable.on('readable', settlePromise); - readable.on('error', finish); - readable.on('end', finish); - - return { - next: () => new Promise(trackPromise), - [Symbol.asyncIterator]() { return this; }, - }; - - function trackPromise(resolve: (bindings: IteratorResult) => void, reject: (err: Error) => void) { - pendingPromise = { resolve, reject }; - settlePromise(); - } - - function settlePromise() { - // Finish if the stream errored or ended - if (done || pendingError) { - finish(); - } - // Try to resolve the promise with a value - else if (pendingPromise) { - const value = readable.read(); - if (value !== null) { - pendingPromise.resolve({ value }); - pendingPromise = null; - } - } - } - - function finish(error?: Error) { - // Finish with or without an error - if (!pendingError) { - done = true; - pendingError = error; - } - // Try to emit the result - if (pendingPromise) { - if (!pendingError) - // @ts-ignore - pendingPromise.resolve({ done }); - else - pendingPromise.reject(pendingError); - pendingPromise = null; - } - // Detach listeners - readable.on('readable', settlePromise); - readable.on('error', finish); - readable.on('end', finish); - } - } - /** * Removes the given document (or all, if not specified) from the cache, * such that fresh results are obtained next time. */ - async clearCache(document: string) { + async clearCache(document?: string) { await this.engine.invalidateHttpCache(document); } } diff --git a/test/ComunicaEngine-test.ts b/test/ComunicaEngine-test.ts index 86333bc..4f29b94 100644 --- a/test/ComunicaEngine-test.ts +++ b/test/ComunicaEngine-test.ts @@ -5,8 +5,7 @@ import ComunicaEngine from '../src/ComunicaEngine'; import { mockHttp, readAll } from './util'; import { Store, DataFactory } from 'n3'; -const { namedNode, defaultGraph, quad } = DataFactory; -import { Readable } from 'stream'; +const { namedNode, quad } = DataFactory; const SELECT_TYPES = ` SELECT ?subject ?type WHERE { @@ -76,22 +75,20 @@ describe('An ComunicaEngine instance without default source', () => { }); it('yields results for a SELECT query with an RDF/JS source', async () => { - // Create source with specific result stream - const stream = new Readable({ objectMode: true }); - stream.push({ - subject: namedNode(PROFILE_URL), - predicate: namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'), - object: namedNode('http://xmlns.com/foaf/0.1/Person'), - graph: defaultGraph, - }); - stream.push({ - subject: namedNode(PROFILE_URL), - predicate: namedNode('http://example.org/#custom'), - object: namedNode('http://xmlns.com/foaf/0.1/Agent'), - graph: defaultGraph, - }); - stream.push(null); - const source = { match: jest.fn(() => stream) }; + const source = new Store([ + quad( + namedNode(PROFILE_URL), + namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'), + namedNode('http://xmlns.com/foaf/0.1/Person'), + ), + quad( + namedNode(PROFILE_URL), + namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'), + namedNode('http://xmlns.com/foaf/0.1/Agent'), + ), + ]); + + jest.spyOn(source, 'match'); // Count query results const result = engine.execute(SELECT_TYPES, source); @@ -99,20 +96,8 @@ describe('An ComunicaEngine instance without default source', () => { expect(items).toHaveLength(2); // Verify correct usage of source - expect(source.match).toHaveBeenCalled(); - expect(source.match.mock.calls[0]).toHaveLength(4); - // @ts-ignore - expect(source.match.mock.calls[0][0]).toBe(undefined); - // @ts-ignore - expect(source.match.mock.calls[0][1] - .equals(namedNode('http://www.w3.org/1999/02/22-rdf-syntax-ns#type'))) - .toBe(true); - // @ts-ignore - expect(source.match.mock.calls[0][2]).toBe(undefined); - // @ts-ignore - expect(source.match.mock.calls[0][3] - .equals(defaultGraph())) - .toBe(true); + expect(source.match).toHaveBeenCalledWith(undefined, expect.objectContaining({ termType: 'NamedNode', value: 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type' }), undefined, expect.objectContaining({ termType: 'DefaultGraph', value: '' })); + expect(source.match).toHaveBeenCalledTimes(1); }); it('throws an error with an unsupported source', async () => { @@ -128,42 +113,6 @@ describe('An ComunicaEngine instance without default source', () => { await expect(readAll(result)).rejects.toThrow('Parse error'); }); - it('reads an ended stream', async () => { - const stream = new Readable(); - stream.push(null); - // @ts-expect-error - const result = engine.streamToAsyncIterable(stream); - expect(await readAll(result)).toHaveLength(0); - }); - - it('reads a stream that ends immediately', async () => { - const stream = new Readable(); - // @ts-expect-error - const result = engine.streamToAsyncIterable(stream); - stream.push(null); - await new Promise(resolve => setImmediate(resolve)); - expect(await readAll(result)).toHaveLength(0); - }); - - it('throws an error when the stream errors before reading starts', async () => { - const stream = new Readable(); - stream._read = () => {}; - // @ts-expect-error - const result = engine.streamToAsyncIterable(stream); - stream.emit('error', new Error('my error')); - stream.emit('error', new Error('my other error')); - await expect(readAll(result)).rejects.toThrow('my error'); - }); - - it('throws an error when the stream errors after reading has started', async () => { - const stream = new Readable(); - stream._read = () => {}; - // @ts-expect-error - const result = engine.streamToAsyncIterable(stream); - setImmediate(() => stream.emit('error', new Error('my error'))); - await expect(readAll(result)).rejects.toThrow('my error'); - }); - it('clears the cache for a given document', async () => { // @ts-ignore set up mock const internalEngine = engine.engine;