diff --git a/README.md b/README.md index 364d8a8..359bc82 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ It supports gracefully stopping the pipeline. * `waitAfterEmpty` - Delay in milliseconds if there is (temporarily) no data available. Default is `5000`. * `waitAfterLow` - Delay in milliseconds if there is (temporarily) less data available than chunk size. Default is `1000`. * `waitAfterError` - Delay in milliseconds if there is a (temporary) reading problem. Default is `10000`. +* `autoStop` - The intention of this package was to provide easy streaming without stopping the stream when the source is (temporarily) empty. But sometimes, however, this is exactly what you want. Default is `false`. Set to `true` to automatically and gracefully stop streaming when `readData()` returns no data or less than requested. #### Methods diff --git a/package-lock.json b/package-lock.json index e6170ff..cd5e9a7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "continuous-streams", - "version": "1.0.7", + "version": "1.1.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "continuous-streams", - "version": "1.0.7", + "version": "1.1.0", "license": "MIT", "dependencies": { "delay": "^5.0.0", diff --git a/package.json b/package.json index c0e4550..ae7aab3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "continuous-streams", - "version": "1.0.7", + "version": "1.1.0", "description": "Special purpose Node streams", "main": "src/index.js", "types": "src/index.d.ts", diff --git a/src/ContinuousReader.js b/src/ContinuousReader.js index 3730161..1746874 100644 --- a/src/ContinuousReader.js +++ b/src/ContinuousReader.js @@ -9,6 +9,7 @@ class ContinuousReader extends Readable { waitAfterEmpty = 5000, // wait for new items in database (limit load on database) waitAfterLow = 1000, // wait if the chunk size is smaller than requested (limit load on db) waitAfterError = 10000, // wait for database to recover from error (limit load on database) + autoStop = false, } = opts; super({ objectMode: true, @@ -18,13 +19,17 @@ class ContinuousReader extends Readable { this.waitAfterEmpty = waitAfterEmpty; this.waitAfterLow = waitAfterLow; this.waitAfterError = waitAfterError; + this.autoStop = autoStop; this.total = 0; // counter this.stopping = false; this.stopped = false; } - stop() { - if (this.readableLength === 0) { // stop immediately + stop(immediately = false) { + if (this.stopped) { + return; + } + if (immediately || this.readableLength === 0) { // stop immediately this.stopped = true; this.push(null); // -> 'end' event -> 'close' event return; @@ -53,14 +58,16 @@ class ContinuousReader extends Readable { total: this.total, elapsed: endTime - startTime, }); - if (items.length > 0) { - items.forEach((item) => this.push(item)); - if (items.length < count) { - await delay(this.waitAfterLow); - } - } else { // currently no new items in database + items.forEach((item) => this.push(item)); + if (items.length < count && this.autoStop) { + this.stop(true); + return; + } + if (items.length === 0) { // no new items in database await delay(this.waitAfterEmpty); this.push(); // continue reading !! + } else if (items.length < count) { // too few items in database + await delay(this.waitAfterLow); } } catch (error) { if (this.skipOnError) { diff --git a/src/index.d.ts b/src/index.d.ts index 36b38b3..6341362 100644 --- a/src/index.d.ts +++ b/src/index.d.ts @@ -12,6 +12,7 @@ export interface ContinuousReaderOptions { waitAfterEmpty?: number; waitAfterLow?: number; waitAfterError?: number; + autoStop?: boolean; } export interface ContinuousReaderEventDebug { diff --git a/test/unit/pipeline.spec.js b/test/unit/pipeline.spec.js index 44ce44d..7d2533a 100644 --- a/test/unit/pipeline.spec.js +++ b/test/unit/pipeline.spec.js @@ -766,4 +766,54 @@ describe('Continuous pipeline', () => { }, 1000); }); }); + + it('should terminate gracefully when reader data is low and autoStop=true', async () => { + const data = ['a', 'b', 'c']; + let readerIndex = 0; + return new Promise((resolve) => { + const reader = new ContinuousReader({ chunkSize: 2, autoStop: true }); + reader.readData = async (count) => { + await wait(100); + const chunk = data.slice(readerIndex, readerIndex + count); + readerIndex += chunk.length; + return chunk; + }; + const writerSpy = sinon.spy(); + const writer = new ContinuousWriter(); + writer.writeData = async (d) => { + await wait(100); + writerSpy(d); + }; + writer.once('finish', () => { + expect(writerSpy).to.have.been.calledThrice; + resolve(); + }); + reader.pipe(writer); + }); + }); + + it('should terminate gracefully when reader data is empty and autoStop=true', async () => { + const data = ['a', 'b', 'c', 'd']; + let readerIndex = 0; + return new Promise((resolve) => { + const reader = new ContinuousReader({ chunkSize: 2, autoStop: true }); + reader.readData = async (count) => { + await wait(100); + const chunk = data.slice(readerIndex, readerIndex + count); + readerIndex += chunk.length; + return chunk; + }; + const writerSpy = sinon.spy(); + const writer = new ContinuousWriter(); + writer.writeData = async (d) => { + await wait(100); + writerSpy(d); + }; + writer.once('finish', () => { + expect(writerSpy.callCount).to.be.equals(4); + resolve(); + }); + reader.pipe(writer); + }); + }); });