From 1d73027abc20afa98d31bd997afbaef118459db3 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:06:07 +1000 Subject: [PATCH 01/10] feat: introduce maybeIterator --- asynciterator.ts | 51 ++++++++++++++++++++ test/maybeIterator-test.js | 95 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 test/maybeIterator-test.js diff --git a/asynciterator.ts b/asynciterator.ts index f1e7bc19..8ddf7ed1 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1966,3 +1966,54 @@ type SourceExpression = type InternalSource = AsyncIterator & { _destination: AsyncIterator }; + + +/** + * @param source An AsyncIterator + * @returns The AsyncIterator if it is not empty, otherwise undefined + */ + export async function maybeIterator(source: AsyncIterator): Promise> { + // Avoid creating a new iterator where possible + // if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer.length > 0) { + // return source + // } + // if (source instanceof IntegerIterator && (source as any).step >= 0 ? (source as any).next > (source as any).last : (source as any).next < (source as any).last) { + // return source; + // } + + let item; + do { + if ((item = source.read()) !== null) + return source.prepend([item]); + await awaitReadable(source); + } while (!source.done); + return null; +} + +function awaitReadable(source: AsyncIterator): Promise { + return new Promise((res, rej) => { + if (source.readable || source.done) + res(); + + function done() { + cleanup(); + res(); + } + + function err() { + cleanup(); + rej(); + } + + function cleanup() { + source.removeListener('readable', done); + source.removeListener('end', done); + source.removeListener('error', err); + } + + source.on('readable', done); + source.on('end', done); + source.on('error', err); + }); +} + diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js new file mode 100644 index 00000000..973aa9da --- /dev/null +++ b/test/maybeIterator-test.js @@ -0,0 +1,95 @@ +import { + AsyncIterator, + ArrayIterator, + fromArray, + maybeIterator, + range, + empty, + scheduleTask +} from '../dist/asynciterator.js'; + +class MyIterator extends AsyncIterator { + read() { + this.close(); + return null; + } +} + + +class MyBufferingIterator extends AsyncIterator { + i = 3; + + read() { + if (this.i-- < 0) { + this.close(); + } else { + scheduleTask(() => { + if (this.readable) + this.emit('readable') + else + this.readable = true + }) + } + return null; + } +} + + +class MyItemBufferingIterator extends AsyncIterator { + i = 10; + + read() { + this.i--; + if (this.i < 0) { + this.close(); + } else { + scheduleTask(() => { + if (this.readable) + this.emit('readable') + else + this.readable = true + }) + } + return this.i % 2 === 0 ? this.i : null; + } +} + +describe('maybeIterator', () => { + // TODO: + describe('Should return null on empty iterators', () => { + it('fromArray', async () => { + expect(await maybeIterator(fromArray([]))).to.be.null; + }); + it('range', async () => { + expect(await maybeIterator(range(0, -1))).to.be.null; + }); + it('MyIterator', async () => { + expect(await maybeIterator(new MyIterator())).to.be.null; + }); + it('empty', async () => { + expect(await maybeIterator(empty())).to.be.null; + }); + it('awaited empty', async () => { + const e = empty(); + // Add an await so that scheduleMacroTask will have run + await Promise.resolve(); + + expect(await maybeIterator(e)).to.be.null; + }); + it('MyBufferingIterator', async () => { + expect(await maybeIterator(new MyBufferingIterator())).to.be.null; + }); + }); + + describe('Should return an iterator with all elements if the iterator is not empty', () => { + it('fromArray', async () => { + expect(await (await maybeIterator(fromArray([1, 2, 3]))).toArray()).to.deep.equal([1, 2, 3]); + }); + it('range', async () => { + expect(await (await maybeIterator(range(1, 3))).toArray()).to.deep.equal([1, 2, 3]); + }); + it('range', async () => { + expect(await (await maybeIterator(new MyItemBufferingIterator())).toArray()).to.deep.equal([8, 6, 4, 2, 0]); + }); + }); +}); From 61148e8a5b946f82ca957e1592726cdd0e324cbf Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:07:19 +1000 Subject: [PATCH 02/10] feat: introduce maybeIterator --- asynciterator.ts | 6 +++--- test/maybeIterator-test.js | 27 ++++++++++++++------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 8ddf7ed1..e1759e20 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1972,7 +1972,7 @@ type InternalSource = * @param source An AsyncIterator * @returns The AsyncIterator if it is not empty, otherwise undefined */ - export async function maybeIterator(source: AsyncIterator): Promise> { +export async function maybeIterator(source: AsyncIterator): Promise> { // Avoid creating a new iterator where possible // if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer.length > 0) { // return source @@ -2000,9 +2000,9 @@ function awaitReadable(source: AsyncIterator): Promise { res(); } - function err() { + function err(e: Error) { cleanup(); - rej(); + rej(e); } function cleanup() { diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js index 973aa9da..52a2b691 100644 --- a/test/maybeIterator-test.js +++ b/test/maybeIterator-test.js @@ -1,11 +1,10 @@ import { AsyncIterator, - ArrayIterator, fromArray, maybeIterator, range, empty, - scheduleTask + scheduleTask, } from '../dist/asynciterator.js'; class MyIterator extends AsyncIterator { @@ -18,17 +17,18 @@ class MyIterator extends AsyncIterator { class MyBufferingIterator extends AsyncIterator { i = 3; - + read() { if (this.i-- < 0) { this.close(); - } else { + } + else { scheduleTask(() => { if (this.readable) - this.emit('readable') + this.emit('readable'); else - this.readable = true - }) + this.readable = true; + }); } return null; } @@ -37,25 +37,26 @@ class MyBufferingIterator extends AsyncIterator { class MyItemBufferingIterator extends AsyncIterator { i = 10; - + read() { this.i--; if (this.i < 0) { this.close(); - } else { + } + else { scheduleTask(() => { if (this.readable) - this.emit('readable') + this.emit('readable'); else - this.readable = true - }) + this.readable = true; + }); } return this.i % 2 === 0 ? this.i : null; } } describe('maybeIterator', () => { - // TODO: + // TODO: describe('Should return null on empty iterators', () => { it('fromArray', async () => { expect(await maybeIterator(fromArray([]))).to.be.null; From 1f1cffeb50d321f1270a8e0ecc761898e597d304 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:25:11 +1000 Subject: [PATCH 03/10] chore: add test error coverage and IntegerIterator optimization --- asynciterator.ts | 6 +++--- test/maybeIterator-test.js | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index e1759e20..4bda7241 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1977,9 +1977,9 @@ export async function maybeIterator(source: AsyncIterator): Promise 0) { // return source // } - // if (source instanceof IntegerIterator && (source as any).step >= 0 ? (source as any).next > (source as any).last : (source as any).next < (source as any).last) { - // return source; - // } + if (source instanceof IntegerIterator && (source as any)._step >= 0 ? (source as any)._next <= (source as any)._last : (source as any)._next >= (source as any)._last) { + return source; + } let item; do { diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js index 52a2b691..23f0c2cd 100644 --- a/test/maybeIterator-test.js +++ b/test/maybeIterator-test.js @@ -1,3 +1,4 @@ +import { expect } from 'chai'; import { AsyncIterator, fromArray, @@ -89,8 +90,29 @@ describe('maybeIterator', () => { it('range', async () => { expect(await (await maybeIterator(range(1, 3))).toArray()).to.deep.equal([1, 2, 3]); }); + it('range', async () => { + expect(await (await maybeIterator(range(1, 1))).toArray()).to.deep.equal([1]); + }); it('range', async () => { expect(await (await maybeIterator(new MyItemBufferingIterator())).toArray()).to.deep.equal([8, 6, 4, 2, 0]); }); }); + + // TODO: Add better error coverage - it is *possible* that there may be a bug + // that occurs when errors are thrown when we are *not* in the awaitReadable + // code section + it('Should reject on error before first element', async () => { + const iterator = new AsyncIterator(); + scheduleTask(() => { iterator.emit('error', new Error('myError')); }); + + let error = false; + + try { + await maybeIterator(iterator); + } catch (e) { + error = true + } + + expect(error).to.be.true; + }) }); From 70b1e694570ccb24c179b7d0af2eca9ab0c446df Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:26:24 +1000 Subject: [PATCH 04/10] chore: add ArrayIterator and BufferedIterator optimizations --- asynciterator.ts | 6 +++--- test/maybeIterator-test.js | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 4bda7241..56588fb9 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1977,9 +1977,9 @@ export async function maybeIterator(source: AsyncIterator): Promise 0) { // return source // } - if (source instanceof IntegerIterator && (source as any)._step >= 0 ? (source as any)._next <= (source as any)._last : (source as any)._next >= (source as any)._last) { - return source; - } + if (source instanceof IntegerIterator && (source as any)._step >= 0 ? (source as any)._next <= (source as any)._last : (source as any)._next >= (source as any)._last) + return source; + let item; do { diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js index 23f0c2cd..664d313d 100644 --- a/test/maybeIterator-test.js +++ b/test/maybeIterator-test.js @@ -104,15 +104,16 @@ describe('maybeIterator', () => { it('Should reject on error before first element', async () => { const iterator = new AsyncIterator(); scheduleTask(() => { iterator.emit('error', new Error('myError')); }); - + let error = false; try { await maybeIterator(iterator); - } catch (e) { - error = true + } + catch (e) { + error = true; } expect(error).to.be.true; - }) + }); }); From 5bdd2bc1a030a1c677de6e0e7627f45e9d38ba77 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:29:24 +1000 Subject: [PATCH 05/10] chore: add ArrayIterator and BufferedIterator optimizations --- asynciterator.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 56588fb9..1b450883 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1974,9 +1974,9 @@ type InternalSource = */ export async function maybeIterator(source: AsyncIterator): Promise> { // Avoid creating a new iterator where possible - // if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer.length > 0) { - // return source - // } + if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer?.length > 0) { + return source + } if (source instanceof IntegerIterator && (source as any)._step >= 0 ? (source as any)._next <= (source as any)._last : (source as any)._next >= (source as any)._last) return source; From 0440aa141407eddd3da7d62175930fff50c96b4c Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:30:25 +1000 Subject: [PATCH 06/10] chore: remove redundant double break --- asynciterator.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 1b450883..64c3bae8 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1967,16 +1967,15 @@ type SourceExpression = type InternalSource = AsyncIterator & { _destination: AsyncIterator }; - /** * @param source An AsyncIterator * @returns The AsyncIterator if it is not empty, otherwise undefined */ export async function maybeIterator(source: AsyncIterator): Promise> { // Avoid creating a new iterator where possible - if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer?.length > 0) { - return source - } + if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer?.length > 0) + return source; + if (source instanceof IntegerIterator && (source as any)._step >= 0 ? (source as any)._next <= (source as any)._last : (source as any)._next >= (source as any)._last) return source; From d23b837d515215e57bcc2649b26c19a6d0755115 Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:31:09 +1000 Subject: [PATCH 07/10] chore: fix test naming --- test/maybeIterator-test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js index 664d313d..f1fbd267 100644 --- a/test/maybeIterator-test.js +++ b/test/maybeIterator-test.js @@ -87,13 +87,13 @@ describe('maybeIterator', () => { it('fromArray', async () => { expect(await (await maybeIterator(fromArray([1, 2, 3]))).toArray()).to.deep.equal([1, 2, 3]); }); - it('range', async () => { + it('range 1-3', async () => { expect(await (await maybeIterator(range(1, 3))).toArray()).to.deep.equal([1, 2, 3]); }); - it('range', async () => { + it('range 1-1', async () => { expect(await (await maybeIterator(range(1, 1))).toArray()).to.deep.equal([1]); }); - it('range', async () => { + it('MyItemBufferingIterator', async () => { expect(await (await maybeIterator(new MyItemBufferingIterator())).toArray()).to.deep.equal([8, 6, 4, 2, 0]); }); }); From 35dd342013afd371c18cc246615ae447a4dd714e Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:32:14 +1000 Subject: [PATCH 08/10] chore: init i in constructore --- test/maybeIterator-test.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js index f1fbd267..6dc0b534 100644 --- a/test/maybeIterator-test.js +++ b/test/maybeIterator-test.js @@ -37,7 +37,10 @@ class MyBufferingIterator extends AsyncIterator { class MyItemBufferingIterator extends AsyncIterator { - i = 10; + constructor() { + super(); + this.i = 10; + } read() { this.i--; From 2e581b8f1bc750d1ba19aa67e7bc3315adcce02f Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Thu, 28 Apr 2022 15:33:59 +1000 Subject: [PATCH 09/10] chore: init i in constructore --- test/maybeIterator-test.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/maybeIterator-test.js b/test/maybeIterator-test.js index 6dc0b534..49dc6fce 100644 --- a/test/maybeIterator-test.js +++ b/test/maybeIterator-test.js @@ -17,7 +17,10 @@ class MyIterator extends AsyncIterator { class MyBufferingIterator extends AsyncIterator { - i = 3; + constructor() { + super(); + this.i = 10; + } read() { if (this.i-- < 0) { From 18b17ce801fb5973a156ad2ae2cb318f2de7190a Mon Sep 17 00:00:00 2001 From: Jesse Wright Date: Fri, 29 Apr 2022 10:50:37 +1000 Subject: [PATCH 10/10] fix: only subscribe to events once --- asynciterator.ts | 54 ++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 64c3bae8..5e241a01 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1972,31 +1972,28 @@ type InternalSource = * @returns The AsyncIterator if it is not empty, otherwise undefined */ export async function maybeIterator(source: AsyncIterator): Promise> { - // Avoid creating a new iterator where possible - if ((source instanceof ArrayIterator || source instanceof BufferedIterator) && (source as any)._buffer?.length > 0) - return source; - - if (source instanceof IntegerIterator && (source as any)._step >= 0 ? (source as any)._next <= (source as any)._last : (source as any)._next >= (source as any)._last) - return source; - - - let item; - do { - if ((item = source.read()) !== null) - return source.prepend([item]); - await awaitReadable(source); - } while (!source.done); - return null; -} + return new Promise((res, rej) => { + let item; -function awaitReadable(source: AsyncIterator): Promise { - return new Promise((res, rej) => { - if (source.readable || source.done) - res(); + if ((item = source.read()) !== null) { + res(source.prepend([item])); + return; + } + if (source.done) { + res(null); + return; + } - function done() { - cleanup(); - res(); + function onReadable() { + if ((item = source.read()) !== null) { + cleanup(); + res(source.prepend([item])); + return; + } + if (source.done) { + cleanup(); + res(null); + } } function err(e: Error) { @@ -2005,14 +2002,13 @@ function awaitReadable(source: AsyncIterator): Promise { } function cleanup() { - source.removeListener('readable', done); - source.removeListener('end', done); + source.removeListener('readable', onReadable); + source.removeListener('end', onReadable); source.removeListener('error', err); } - source.on('readable', done); - source.on('end', done); + source.on('readable', onReadable); + source.on('end', onReadable); source.on('error', err); - }); + }) } -