Skip to content

Commit

Permalink
fix(flatMap): default flatMap concurrent parameter to 1 (#346)
Browse files Browse the repository at this point in the history
* fix(flatMap): default flatMap concurrent parameter to 1

* feat(switchall): add switchAll operator

* fix(concatmap): align concatMap source iteration behavior with concatAll
  • Loading branch information
trxcllnt committed Jul 26, 2022
1 parent 1a0ee90 commit bcfab5a
Show file tree
Hide file tree
Showing 23 changed files with 219 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
- name: Lint files
if: ${{ steps.files_changed.outputs.any_changed == 'true' || steps.files_changed.outputs.any_deleted == 'true' }}
run: |
yarn lint
yarn lint:ci
build-and-test-pull-request:
needs:
Expand Down
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
"build": "gulp build",
"clean": "gulp clean",
"debug": "gulp debug",
"lint": "run-p lint:*",
"lint": "run-p lint:src lint:spec",
"release": "./npm-release.sh",
"doc": "shx rm -rf ./doc && typedoc --options typedoc.js",
"postinstall": "patch-package --patch-dir ./patches",
"commitmsg": "validate-commit-msg",
"test:coverage": "gulp test -t src --coverage",
"lint:src": "eslint --ext .ts --fix \"src/**/*.ts\"",
"lint:spec": "eslint --ext .ts --fix \"spec/**/*.ts\"",
"lint:ci": "eslint src spec",
"lint:src": "eslint --fix src",
"lint:spec": "eslint --fix spec",
"prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1"
},
"author": "Matthew Podwysocki <matthewp@microsoft.com>",
Expand Down Expand Up @@ -52,8 +53,8 @@
"devDependencies": {
"@types/glob": "7.1.1",
"@types/jest": "27.4.0",
"@typescript-eslint/eslint-plugin": "5.12.0",
"@typescript-eslint/parser": "5.12.0",
"@typescript-eslint/eslint-plugin": "^5.31.0",
"@typescript-eslint/parser": "^5.31.0",
"abortcontroller-polyfill": "1.4.0",
"async-done": "1.3.2",
"benchmark": "2.1.4",
Expand All @@ -66,8 +67,8 @@
"coveralls": "3.0.9",
"cz-conventional-changelog": "3.1.0",
"del": "5.1.0",
"eslint": "8.9.0",
"eslint-plugin-jest": "26.1.1",
"eslint": "^8.20.0",
"eslint-plugin-jest": "^26.6.0",
"esm": "https://github.com/jsg2021/esm/releases/download/v3.x.x-pr883/esm-3.x.x-pr883.tgz",
"glob": "7.1.6",
"google-closure-compiler": "20220601.0.0",
Expand Down
70 changes: 70 additions & 0 deletions spec/asynciterable-operators/concatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { of, range, sequenceEqual, throwError } from 'ix/asynciterable';
import { map, tap, concatMap } from 'ix/asynciterable/operators';

test('AsyncIterable#concatMap with range', async () => {
const xs = of(1, 2, 3);
const ys = xs.pipe(concatMap(async (x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 2);
noNext(it);
});

test('AsyncIterable#concatMap order of effects', async () => {
let i = 0;
const res = range(0, 3).pipe(
tap({ next: async () => ++i }),
concatMap((x) => range(0, x + 1)),
map((x) => i + ' - ' + x)
);

expect(
await sequenceEqual(res, of('1 - 0', '2 - 0', '2 - 1', '3 - 0', '3 - 1', '3 - 2'))
).toBeTruthy();
});

test('AsyncIterable#concatMap selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(concatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
});

test('AsyncIterable#concatMap with error throws', async () => {
const err = new Error();
const xs = throwError(err);
const ys = xs.pipe(concatMap((x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
});

test('AsyncIterable#concatMap selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
concatMap(async (x) => {
if (x < 3) {
return range(0, x);
}
throw err;
})
);

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
await expect(it.next()).rejects.toThrow(err);
});
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/distinctuntilchanged-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import '../asynciterablehelpers';
import { of, sequenceEqual } from 'ix/asynciterable';
import { distinctUntilChanged } from 'ix/asynciterable/operators';

test('Iterable#distinctUntilChanged no selector', async () => {
test('AsyncIterable#distinctUntilChanged no selector', async () => {
const res = of(1, 2, 2, 3, 3, 3, 2, 2, 1).pipe(distinctUntilChanged());
expect(await sequenceEqual(res, of(1, 2, 3, 2, 1))).toBeTruthy();
});

test('Iterable#distinctUntilChanged with selector', async () => {
test('AsyncIterable#distinctUntilChanged with selector', async () => {
const res = of(1, 1, 2, 3, 4, 5, 5, 6, 7).pipe(
distinctUntilChanged({ keySelector: (x) => Math.floor(x / 2) })
);
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/dowhile-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { defer, of, toArray } from 'ix/asynciterable';
import { doWhile, tap } from 'ix/asynciterable/operators';
import { sequenceEqual } from 'ix/iterable';

test('Iterable#doWhile some', async () => {
test('AsyncIterable#doWhile some', async () => {
let x = 5;

const res = await toArray(
Expand All @@ -16,7 +16,7 @@ test('Iterable#doWhile some', async () => {
expect(sequenceEqual(res, [5, 4, 3, 2, 1])).toBeTruthy();
});

test('Iterable#doWhile one', async () => {
test('AsyncIterable#doWhile one', async () => {
let x = 0;
const res = await toArray(
defer(() => of(x)).pipe(
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/except-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of } from 'ix/asynciterable';
import { except } from 'ix/asynciterable/operators';

test('Iterable#except with default comparer', async () => {
test('AsyncIterable#except with default comparer', async () => {
const xs = of(1, 2, 3);
const ys = of(3, 5, 1, 4);
const res = xs.pipe(except(ys));
Expand All @@ -12,7 +12,7 @@ test('Iterable#except with default comparer', async () => {
await noNext(it);
});

test('Iterable#except with custom comparer', async () => {
test('AsyncIterable#except with custom comparer', async () => {
const comparer = (x: number, y: number) => Math.abs(x) === Math.abs(y);
const xs = of(1, 2, -3);
const ys = of(3, 5, -1, 4);
Expand Down
8 changes: 4 additions & 4 deletions spec/asynciterable-operators/flat-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ test('AsyncIterable#flat flattens all', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat()));

compareArrays(ys, [1, 4, 2, 3]);
compareArrays(ys, [1, 2, 3, 4]);
});

test('AsyncIterable#flat flattens all with concurrent = 1', async () => {
test('AsyncIterable#flat flattens all layers', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat(-1, 1)));
const ys = await toArray(xs.pipe(flat(-1)));

compareArrays(ys, [1, 2, 3, 4]);
});
Expand All @@ -24,5 +24,5 @@ test('AsyncIterable#flat flattens two layers', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat(2)));

compareArrays(ys, [1, 4, 2, 3]);
compareArrays(ys, [1, 2, 3, 4]);
});
8 changes: 4 additions & 4 deletions spec/asynciterable-operators/flatmap-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of, range, throwError } from 'ix/asynciterable';
import { flatMap } from 'ix/asynciterable/operators';

test('Iterable#flatMap with range', async () => {
test('AsyncIterable#flatMap with range', async () => {
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => range(0, x)));

Expand All @@ -16,7 +16,7 @@ test('Iterable#flatMap with range', async () => {
noNext(it);
});

test('Iterable#flatMap selector returns throw', async () => {
test('AsyncIterable#flatMap selector returns throw', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => (x < 3 ? range(0, x) : throwError(err))));
Expand All @@ -28,7 +28,7 @@ test('Iterable#flatMap selector returns throw', async () => {
await expect(it.next()).rejects.toThrow(err);
});

test('Iterable#flatMap with error throws', async () => {
test('AsyncIterable#flatMap with error throws', async () => {
const err = new Error();
const xs = throwError(err);
const ys = xs.pipe(flatMap((x) => range(0, x)));
Expand All @@ -37,7 +37,7 @@ test('Iterable#flatMap with error throws', async () => {
await expect(it.next()).rejects.toThrow(err);
});

test('Iterable#flatMap selector throws error', async () => {
test('AsyncIterable#flatMap selector throws error', async () => {
const err = new Error();
const xs = of(1, 2, 3);
const ys = xs.pipe(
Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/ignoreelements.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import '../asynciterablehelpers';
import { range } from 'ix/asynciterable';
import { ignoreElements, take, tap } from 'ix/asynciterable/operators';

test('Iterable#ignoreElements has side effects', async () => {
test('AsyncIterable#ignoreElements has side effects', async () => {
let n = 0;
await range(0, 10)
.pipe(
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/intersect-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of } from 'ix/asynciterable';
import { intersect } from 'ix/asynciterable/operators';

test('Iterable#intersect with default comparer', async () => {
test('AsyncIterable#intersect with default comparer', async () => {
const xs = of(1, 2, 3);
const ys = of(3, 5, 1, 4);
const res = xs.pipe(intersect(ys));
Expand All @@ -13,7 +13,7 @@ test('Iterable#intersect with default comparer', async () => {
await noNext(it);
});

test('Iterable#intersect with custom comparer', async () => {
test('AsyncIterable#intersect with custom comparer', async () => {
const comparer = (x: number, y: number) => Math.abs(x) === Math.abs(y);
const xs = of(1, 2, -3);
const ys = of(3, 5, -1, 4);
Expand Down
4 changes: 2 additions & 2 deletions spec/asynciterable-operators/isempty-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import '../asynciterablehelpers';
import { of, empty, isEmpty } from 'ix/asynciterable';

test('Iterable#isEmpty empty', async () => {
test('AsyncIterable#isEmpty empty', async () => {
expect(await isEmpty(empty())).toBeTruthy();
});

test('Iterable#isEmpty not-empty', async () => {
test('AsyncIterable#isEmpty not-empty', async () => {
expect(await isEmpty(of(1))).toBeFalsy();
});
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/mergeall-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators';

test('AsyncIterable#merge mergeAll behavior', async () => {
const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll());
expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]);
expect(await toArray(res)).toEqual([1, 2, 3, 4, 5]);
});
6 changes: 3 additions & 3 deletions spec/asynciterable-operators/pluck-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { hasNext, noNext } from '../asynciterablehelpers';
import { of } from 'ix/asynciterable';
import { pluck } from 'ix/asynciterable/operators';

test('Iterable#pluck simple prop', async () => {
test('AsyncIterable#pluck simple prop', async () => {
const xs = of({ prop: 1 }, { prop: 2 }, { prop: 3 }, { prop: 4 }, { prop: 5 });
const ys = xs.pipe(pluck('prop'));

Expand All @@ -15,7 +15,7 @@ test('Iterable#pluck simple prop', async () => {
await noNext(it);
});

test('Iterable#pluck nested prop', async () => {
test('AsyncIterable#pluck nested prop', async () => {
const xs = of(
{ a: { b: { c: 1 } } },
{ a: { b: { c: 2 } } },
Expand All @@ -34,7 +34,7 @@ test('Iterable#pluck nested prop', async () => {
await noNext(it);
});

test('Iterable#pluck edge cases', async () => {
test('AsyncIterable#pluck edge cases', async () => {
const xs = of<any>(
{ a: { b: { c: 1 } } },
{ a: { b: 2 } },
Expand Down
46 changes: 46 additions & 0 deletions spec/asynciterable-operators/switchall-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import '../asynciterablehelpers';
import { of, toArray } from 'ix/asynciterable';
import { switchAll, delayEach, tap } from 'ix/asynciterable/operators';

describe(`AsyncIterable#switchAll`, () => {
test('switches inner sequences', async () => {
const innerValues = new Array<string>();

const ys = of('0', '1', '2', '3').pipe(
delayEach(200),
tap((x) => innerValues.push(x))
);
const xs = of(ys, ys, ys).pipe(delayEach(500));
const source = xs.pipe(switchAll());
const expected = [
'0',
'1', // xs=0
'0',
'1', // xs=1
'0',
'1',
'2',
'3', // xs=2
];

expect(await toArray(source)).toEqual(expected);

expect(innerValues).toEqual(expected);
});

test(`supports projecting to Arrays`, async () => {
const xs = of([0, 1, 2], [0, 1, 2], [0, 1, 2]).pipe(delayEach(100));
const source = xs.pipe(switchAll());
expect(await toArray(source)).toEqual([0, 1, 2, 0, 1, 2, 0, 1, 2]);
});

test(`supports projecting to Promise<Array>`, async () => {
const xs = of(
Promise.resolve([0, 1, 2]),
Promise.resolve([0, 1, 2]),
Promise.resolve([0, 1, 2])
).pipe(delayEach(100));
const source = xs.pipe(switchAll());
expect(await toArray(source)).toEqual([0, 1, 2, 0, 1, 2, 0, 1, 2]);
});
});
1 change: 1 addition & 0 deletions src/Ix.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import './add/asynciterable-operators/orderby';
import './add/asynciterable-operators/pairwise';
import './add/asynciterable-operators/pluck';
import './add/asynciterable-operators/publish';
import './add/asynciterable-operators/switchall';
import './add/asynciterable-operators/switchmap';
import './add/asynciterable-operators/reduceright';
import './add/asynciterable-operators/reduce';
Expand Down
17 changes: 17 additions & 0 deletions src/add/asynciterable-operators/switchall.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { switchAll } from '../../asynciterable/operators/switchall';

/**
* @ignore
*/
export function switchAllProto<T>(this: AsyncIterableX<AsyncIterable<T>>) {
return switchAll()(this);
}

AsyncIterableX.prototype.switchAll = switchAllProto;

declare module '../../asynciterable/asynciterablex' {
interface AsyncIterableX<T> {
switchAll: typeof switchAllProto;
}
}
2 changes: 1 addition & 1 deletion src/asynciterable/operators/_flatten.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class FlattenConcurrentAsyncIterable<TSource, TResult> extends AsyncItera
private _thisArg?: any
) {
super();
this._concurrent = this._switchMode ? 1 : Math.max(_concurrent, 0);
this._concurrent = this._switchMode ? 1 : Math.max(_concurrent, 1);
}
async *[Symbol.asyncIterator](outerSignal?: AbortSignal) {
throwIfAborted(outerSignal);
Expand Down

0 comments on commit bcfab5a

Please sign in to comment.