Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ It supports gracefully stopping the pipeline.
* `waitAfterEmpty` - Delay in milliseconds if there is (temporarily) no data available. Default is `5000`.
* `waitAfterLow` - Delay in milliseconds if there is (temporarily) less data available than chunk size. Default is `1000`.
* `waitAfterError` - Delay in milliseconds if there is a (temporary) reading problem. Default is `10000`.
* `autoStop` - The intention of this package was to provide easy streaming without stopping the stream when the source is (temporarily) empty. But sometimes, however, this is exactly what you want. Default is `false`. Set to `true` to automatically and gracefully stop streaming when `readData()` returns no data or less than requested.

#### Methods

Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "continuous-streams",
"version": "1.0.7",
"version": "1.1.0",
"description": "Special purpose Node streams",
"main": "src/index.js",
"types": "src/index.d.ts",
Expand Down
23 changes: 15 additions & 8 deletions src/ContinuousReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class ContinuousReader extends Readable {
waitAfterEmpty = 5000, // wait for new items in database (limit load on database)
waitAfterLow = 1000, // wait if the chunk size is smaller than requested (limit load on db)
waitAfterError = 10000, // wait for database to recover from error (limit load on database)
autoStop = false,
} = opts;
super({
objectMode: true,
Expand All @@ -18,13 +19,17 @@ class ContinuousReader extends Readable {
this.waitAfterEmpty = waitAfterEmpty;
this.waitAfterLow = waitAfterLow;
this.waitAfterError = waitAfterError;
this.autoStop = autoStop;
this.total = 0; // counter
this.stopping = false;
this.stopped = false;
}

stop() {
if (this.readableLength === 0) { // stop immediately
stop(immediately = false) {
if (this.stopped) {
return;
}
if (immediately || this.readableLength === 0) { // stop immediately
this.stopped = true;
this.push(null); // -> 'end' event -> 'close' event
return;
Expand Down Expand Up @@ -53,14 +58,16 @@ class ContinuousReader extends Readable {
total: this.total,
elapsed: endTime - startTime,
});
if (items.length > 0) {
items.forEach((item) => this.push(item));
if (items.length < count) {
await delay(this.waitAfterLow);
}
} else { // currently no new items in database
items.forEach((item) => this.push(item));
if (items.length < count && this.autoStop) {
this.stop(true);
return;
}
if (items.length === 0) { // no new items in database
await delay(this.waitAfterEmpty);
this.push(); // continue reading !!
} else if (items.length < count) { // too few items in database
await delay(this.waitAfterLow);
}
} catch (error) {
if (this.skipOnError) {
Expand Down
1 change: 1 addition & 0 deletions src/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface ContinuousReaderOptions {
waitAfterEmpty?: number;
waitAfterLow?: number;
waitAfterError?: number;
autoStop?: boolean;
}

export interface ContinuousReaderEventDebug {
Expand Down
50 changes: 50 additions & 0 deletions test/unit/pipeline.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -766,4 +766,54 @@ describe('Continuous pipeline', () => {
}, 1000);
});
});

it('should terminate gracefully when reader data is low and autoStop=true', async () => {
const data = ['a', 'b', 'c'];
let readerIndex = 0;
return new Promise((resolve) => {
const reader = new ContinuousReader({ chunkSize: 2, autoStop: true });
reader.readData = async (count) => {
await wait(100);
const chunk = data.slice(readerIndex, readerIndex + count);
readerIndex += chunk.length;
return chunk;
};
const writerSpy = sinon.spy();
const writer = new ContinuousWriter();
writer.writeData = async (d) => {
await wait(100);
writerSpy(d);
};
writer.once('finish', () => {
expect(writerSpy).to.have.been.calledThrice;
resolve();
});
reader.pipe(writer);
});
});

it('should terminate gracefully when reader data is empty and autoStop=true', async () => {
const data = ['a', 'b', 'c', 'd'];
let readerIndex = 0;
return new Promise((resolve) => {
const reader = new ContinuousReader({ chunkSize: 2, autoStop: true });
reader.readData = async (count) => {
await wait(100);
const chunk = data.slice(readerIndex, readerIndex + count);
readerIndex += chunk.length;
return chunk;
};
const writerSpy = sinon.spy();
const writer = new ContinuousWriter();
writer.writeData = async (d) => {
await wait(100);
writerSpy(d);
};
writer.once('finish', () => {
expect(writerSpy.callCount).to.be.equals(4);
resolve();
});
reader.pipe(writer);
});
});
});