From 4d0d540389d021136095a671de90163cc9f0e0d5 Mon Sep 17 00:00:00 2001 From: Maarten Vandenbrande <44063787+maartyman@users.noreply.github.com> Date: Tue, 6 Feb 2024 08:40:21 +0100 Subject: [PATCH] Fix duplicate triples being emitted multiple times Closes #6 --- lib/StreamingStore.ts | 20 +++++++++++--- test/StreamingStore-test.ts | 53 ++++++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/lib/StreamingStore.ts b/lib/StreamingStore.ts index 779fefb..a161a38 100644 --- a/lib/StreamingStore.ts +++ b/lib/StreamingStore.ts @@ -5,6 +5,15 @@ import type { Readable } from 'readable-stream'; import { PassThrough } from 'readable-stream'; import { PendingStreamsIndex } from './PendingStreamsIndex'; +interface ILocalStore extends RDF.Store { + countQuads: ( + subject: RDF.Term | null, + predicate: RDF.Term | null, + object: RDF.Term | null, + graph: RDF.Term | null, + ) => number; +} + /** * A StreamingStore allows data lookup and insertion to happen in parallel. * Concretely, this means that `match()` calls happening before `import()` calls, will still consider those triples that @@ -14,7 +23,7 @@ import { PendingStreamsIndex } from './PendingStreamsIndex'; * * WARNING: `end()` MUST be called at some point, otherwise all `match` streams will remain unended. */ -export class StreamingStore = Store> +export class StreamingStore = Store> implements RDF.Source, RDF.Sink, EventEmitter> { protected readonly store: S; protected readonly pendingStreams: PendingStreamsIndex = new PendingStreamsIndex(); @@ -42,8 +51,13 @@ implements RDF.Source, RDF.Sink, EventEmitter> { protected importToListeners(stream: RDF.Stream): void { stream.on('data', (quad: Q) => { - for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) { - if (!this.ended) { + if (!this.ended && !this.store.countQuads( + quad.subject, + quad.predicate, + quad.object, + quad.graph, + )) { + for (const pendingStream of this.pendingStreams.getPendingStreamsForQuad(quad)) { pendingStream.push(quad); pendingStream.emit('quad', quad); } diff --git a/test/StreamingStore-test.ts b/test/StreamingStore-test.ts index a10b7bc..bca1fbc 100644 --- a/test/StreamingStore-test.ts +++ b/test/StreamingStore-test.ts @@ -3,7 +3,7 @@ import arrayifyStream from 'arrayify-stream'; import { promisifyEventEmitter } from 'event-emitter-promisify/dist'; import { Store } from 'n3'; import { DataFactory } from 'rdf-data-factory'; -import { Readable } from 'readable-stream'; +import { Readable, PassThrough } from 'readable-stream'; import { StreamingStore } from '../lib/StreamingStore'; const quad = require('rdf-quad'); const streamifyArray = require('streamify-array'); @@ -465,4 +465,55 @@ describe('StreamingStore', () => { quad('s4', 'p4', 'o', 'g'), ]); }); + + it('handles duplicates in import (set-semantics)', async() => { + const match = store.match(); + await promisifyEventEmitter(store.import(streamifyArray([ + quad('s1', 'p1', 'o1'), + quad('s1', 'p1', 'o1'), + ]))); + store.end(); + + expect(await arrayifyStream(match)).toEqualRdfQuadArray( + [ quad('s1', 'p1', 'o1') ], + ); + }); + + it('handles duplicates in import (set-semantics) during slow import', async() => { + const match = store.match(); + + const importStream = new Readable({ objectMode: true }); + importStream._read = () => { + setImmediate(() => { + importStream.push(quad('s1', 'p1', 'o1')); + }); + setImmediate(() => { + importStream.push(quad('s1', 'p1', 'o1')); + }); + setImmediate(() => { + importStream.push(null); + }); + }; + store.import(importStream); + await new Promise(resolve => importStream.on('end', resolve)); + store.end(); + + expect(await arrayifyStream(match)).toEqualRdfQuadArray( + [ quad('s1', 'p1', 'o1') ], + ); + }); + + it('handles errors in import', async() => { + const importStream = new PassThrough({ objectMode: true }); + const returnStream = store.import(importStream); + const error = new Error('myError'); + const callback = jest.fn(); + + returnStream.on('error', callback); + importStream.emit('error', error); + + expect(callback).toHaveBeenCalled(); + expect(callback).toHaveBeenCalledWith(error); + store.end(); + }); });