Skip to content

Commit

Permalink
[FIX] Ensure return is called on AsyncIterators (#348)
Browse files Browse the repository at this point in the history
* fix(tests): await values in concatMap and flatMap tests

* test(debounce): test that debounce triggers finalize

* fix(timeout): ensure timeout triggers finalize

* fix(tap): ensure tap triggers finalize

* fix(flattenconcurrentasynciterable): ensure FlattenConcurrentAsyncIterable triggers finalize

* ignore non-js files when debugging
  • Loading branch information
trxcllnt committed Jul 26, 2022
1 parent 32137a7 commit 72c37ec
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 125 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"console": "integratedTerminal",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"skipFiles": [
"<node_internals>/**/*.js",
"${workspaceFolder}/node_modules/**/*.js"
"<node_internals>/**/*",
"${workspaceFolder}/node_modules/**/*",
],
"env": {
"NODE_NO_WARNINGS": "1",
Expand Down
32 changes: 16 additions & 16 deletions spec/asynciterable-operators/concatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { hasNext, hasErr, noNext } from '../asynciterablehelpers';
import { of, range, sequenceEqual, throwError } from 'ix/asynciterable';
import { map, tap, concatMap } from 'ix/asynciterable/operators';

Expand All @@ -7,13 +7,13 @@ test('AsyncIterable#concatMap with range', async () => {
const ys = xs.pipe(concatMap(async (x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 2);
noNext(it);
await hasNext(it, 0);
await hasNext(it, 0);
await hasNext(it, 1);
await hasNext(it, 0);
await hasNext(it, 1);
await hasNext(it, 2);
await noNext(it);
});

test('AsyncIterable#concatMap order of effects', async () => {
Expand All @@ -35,10 +35,10 @@ test('AsyncIterable#concatMap selector returns throw', async () => {
const ys = xs.pipe(concatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasNext(it, 1);
await hasErr(it, err);
});

test('AsyncIterable#concatMap with error throws', async () => {
Expand All @@ -63,8 +63,8 @@ test('AsyncIterable#concatMap selector throws error', async () => {
);

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasNext(it, 1);
await hasErr(it, err);
});
57 changes: 54 additions & 3 deletions spec/asynciterable-operators/debounce-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { hasNext, noNext, delayValue } from '../asynciterablehelpers';
import { debounce } from 'ix/asynciterable/operators';
import { hasNext, hasErr, noNext, delayError, delayValue } from '../asynciterablehelpers';
import { debounce, finalize } from 'ix/asynciterable/operators';
import { as } from 'ix/asynciterable';
import { AbortError } from 'ix/Ix';

Expand Down Expand Up @@ -53,8 +53,59 @@ test(
const it = ys[Symbol.asyncIterator](controller.signal);
await hasNext(it, 1);
setImmediate(() => controller.abort());
await expect(hasNext(it, 3)).rejects.toThrow(AbortError);
await hasErr(it, AbortError);
await noNext(it);
},
10 * 1000
);

test(
'AsyncIterable#debounce triggers finalize on error',
async () => {
let done = false;
const e = new Error();
const xs = async function* () {
yield await delayValue(1, 100);
yield await delayError(e, 100);
yield await delayValue(3, 100);
};
const ys = as(xs()).pipe(
finalize(() => {
done = true;
}),
debounce(50)
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await hasErr(it, e);
await noNext(it);
expect(done).toBeTruthy();
},
10 * 1000
);

test(
'AsyncIterable#debounce triggers finalize on complete',
async () => {
let done = false;
const xs = async function* () {
yield await delayValue(1, 200);
yield await delayValue(2, 400);
yield await delayValue(3, 200);
};
const ys = as(xs()).pipe(
finalize(() => {
done = true;
}),
debounce(300)
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await hasNext(it, 3);
await noNext(it);
expect(done).toBeTruthy();
},
10 * 1000
);
74 changes: 69 additions & 5 deletions spec/asynciterable-operators/finalize-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { hasNext, hasErr, noNext } from '../asynciterablehelpers';
import { range, throwError } from 'ix/asynciterable';
import { finalize } from 'ix/asynciterable/operators';
import { flatMap, finalize, tap } from 'ix/asynciterable/operators';

test('AsyncIterable#finally defers behavior', async () => {
test('AsyncIterable#finalize defers behavior', async () => {
let done = false;

const xs = range(0, 2).pipe(
Expand All @@ -25,7 +25,7 @@ test('AsyncIterable#finally defers behavior', async () => {
expect(done).toBeTruthy();
});

test('AsyncIterable#finally calls even with error', async () => {
test('AsyncIterable#finalize calls even with error', async () => {
let done = false;

const err = new Error();
Expand All @@ -34,12 +34,76 @@ test('AsyncIterable#finally calls even with error', async () => {
done = true;
})
);

expect(done).toBeFalsy();

const it = xs[Symbol.asyncIterator]();

expect(done).toBeFalsy();

await hasErr(it, err);

expect(done).toBeTruthy();
});

test('AsyncIterable#finalize calls with downstream error', async () => {
let done = false;

const err = new Error();
const xs = range(0, 2).pipe(
finalize(async () => {
done = true;
}),
tap(async () => {
throw err;
})
);

expect(done).toBeFalsy();

const it = xs[Symbol.asyncIterator]();

expect(done).toBeFalsy();

await expect(hasNext(it, 0)).rejects.toThrow(err);
await hasErr(it, err);

expect(done).toBeTruthy();
});

test('AsyncIterable#finalize calls with downstream error from flattening', async () => {
let done = false;
// let srcValues = [] as number[];

const err = new Error();
const xs = range(0, 4).pipe(
finalize(async () => {
done = true;
}),
flatMap(async (x) => {
// srcValues.push(x);
if (x === 1) {
return throwError(err);
}
return [x];
})
);

expect(done).toBeFalsy();

const it = xs[Symbol.asyncIterator]();

expect(done).toBeFalsy();

await hasNext(it, 0);
await hasErr(it, err);
await noNext(it);

expect(done).toBeTruthy();
// The source will yield one more value after the throwError(err),
// because the internal Promise.race([outer, inner]) call resolves
// to the last outer value before the inner error. This is an artifact
// of the JS Promise scheduler's breadth-first scheduling behavior, not
// a bug in IxJS.
// TODO: This is broken in google-closure-compiler?
// expect(srcValues).toEqual([0, 1, 2]);
});
46 changes: 36 additions & 10 deletions spec/asynciterable-operators/flatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { hasNext } from '../asynciterablehelpers';
import { hasNext, hasErr } from '../asynciterablehelpers';
import { of, range, throwError, toArray } from 'ix/asynciterable';
import { flatMap } from 'ix/asynciterable/operators';

Expand All @@ -10,15 +10,25 @@ test('AsyncIterable#flatMap with range', async () => {
});

test('AsyncIterable#flatMap selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap((x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 0);
await hasNext(it, 0);
await hasErr(it, err);
});

test('AsyncIterable#flatMap async selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasErr(it, err);
});

test('AsyncIterable#flatMap with error throws', async () => {
Expand All @@ -27,10 +37,27 @@ test('AsyncIterable#flatMap with error throws', async () => {
const ys = xs.pipe(flatMap((x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
await hasErr(it, err);
});

test('AsyncIterable#flatMap selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
flatMap((x) => {
if (x < 3) {
return range(0, x);
}
throw err;
})
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 0);
await hasErr(it, err);
});

test('AsyncIterable#flatMap async selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
Expand All @@ -43,8 +70,7 @@ test('AsyncIterable#flatMap selector throws error', async () => {
);

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
await hasNext(it, 0);
await hasNext(it, 0);
await hasErr(it, err);
});
26 changes: 23 additions & 3 deletions spec/asynciterable-operators/timeout-spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { hasNext, noNext, delayValue } from '../asynciterablehelpers';
import { timeout } from 'ix/asynciterable/operators';
import { hasNext, hasErr, noNext, delayValue } from '../asynciterablehelpers';
import { timeout, finalize } from 'ix/asynciterable/operators';
import { as } from 'ix/asynciterable';
import { TimeoutError } from 'ix/asynciterable/operators/timeout';

Expand Down Expand Up @@ -27,6 +27,26 @@ test('AsyncIterable#timeout throws when delayed', async () => {

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await expect(it.next()).rejects.toThrow(TimeoutError);
await hasErr(it, TimeoutError);
await noNext(it);
});

test('AsyncIterable#timeout triggers finalize', async () => {
let done = false;
const xs = async function* () {
yield await delayValue(1, 50);
yield await delayValue(2, 200);
};
const ys = as(xs()).pipe(
finalize(() => {
done = true;
}),
timeout(100)
);

const it = ys[Symbol.asyncIterator]();
await hasNext(it, 1);
await hasErr(it, TimeoutError);
await noNext(it);
expect(done).toBeTruthy();
});
19 changes: 14 additions & 5 deletions spec/asynciterablehelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import { AsyncIterableX } from 'ix/asynciterable';
import { Observer, PartialObserver } from '../src/observer';

export async function hasNext<T>(source: AsyncIterator<T>, expected: T) {
const { done, value } = await source.next();
expect(done).toBeFalsy();
expect(value).toEqual(expected);
await expect(source.next()).resolves.toEqual({ done: false, value: expected });
}

export async function hasErr(source: AsyncIterator<any>, expected: any) {
await expect(source.next()).rejects.toThrow(expected);
}

export async function noNext<T>(source: AsyncIterator<T>) {
const next = await source.next();
expect(next.done).toBeTruthy();
await expect(source.next()).resolves.toEqual({ done: true, value: undefined });
}

export function delayValue<T>(item: T, delay: number): Promise<T> {
Expand All @@ -21,6 +22,14 @@ export function delayValue<T>(item: T, delay: number): Promise<T> {
});
}

export function delayError<T>(item: T, delay: number): Promise<void> {
return new Promise<void>((_, reject) => {
setTimeout(() => {
reject(item);
}, delay);
});
}

const noop = (_?: any) => {
/**/
};
Expand Down
4 changes: 2 additions & 2 deletions src/asynciterable/asynciterablex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ export abstract class AsyncIterableX<T> implements AsyncIterable<T> {
*/
static as(source: string): AsyncIterableX<string>;
/**
* Converts the async iterable like input into an async-iterable.
* Converts the AsyncIterable-like input or single element into an AsyncIterable.
*
* @template T The type of elements in the async-iterable like sequence.
* @param {AsyncIterableInput<T>} source The async-iterable like input to convert to an async-iterable.
* @returns {AsyncIterableX<T>} An async-iterable stream from elements in the async-iterable like sequence.
*/
static as<T>(source: AsyncIterableInput<T>): AsyncIterableX<T>;
static as<T>(source: AsyncIterableInput<T> | T): AsyncIterableX<T>;
/**
* Converts the single element into an async-iterable sequence.
*
Expand Down

0 comments on commit 72c37ec

Please sign in to comment.