Skip to content

Commit

Permalink
Default to flatMap concurrent (#347)
Browse files Browse the repository at this point in the history
* feat(flat): default AsyncIterable#flat's concurrent parameter to Infinity

* feat(mergeall): default AsyncIterable#mergeAll's concurrent parameter to Infinity

* feat(flatmap): default AsyncIterable#flatMap's concurrent parameter to Infinity
  • Loading branch information
trxcllnt committed Jul 26, 2022
1 parent 84c73b8 commit 32137a7
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 26 deletions.
10 changes: 5 additions & 5 deletions spec/asynciterable-operators/flat-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ function compareArrays<T>(fst: Iterable<T>, snd: Iterable<T>) {
expect(fst.toString()).toBe(snd.toString());
}

test('AsyncIterable#flat flattens all', async () => {
test('AsyncIterable#flat flattens all layers', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat()));

compareArrays(ys, [1, 2, 3, 4]);
compareArrays(ys, [1, 4, 2, 3]);
});

test('AsyncIterable#flat flattens all layers', async () => {
test('AsyncIterable#flat flattens all layers with concurrent = 1', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat(-1)));
const ys = await toArray(xs.pipe(flat(-1, 1)));

compareArrays(ys, [1, 2, 3, 4]);
});
Expand All @@ -24,5 +24,5 @@ test('AsyncIterable#flat flattens two layers', async () => {
const xs = of(1, of(2, of(3)), 4);
const ys = await toArray(xs.pipe(flat(2)));

compareArrays(ys, [1, 2, 3, 4]);
compareArrays(ys, [1, 4, 2, 3]);
});
13 changes: 3 additions & 10 deletions spec/asynciterable-operators/flatmap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,12 @@
import { hasNext, noNext } from '../asynciterablehelpers';
import { of, range, throwError } from 'ix/asynciterable';
import { hasNext } from '../asynciterablehelpers';
import { of, range, throwError, toArray } from 'ix/asynciterable';
import { flatMap } from 'ix/asynciterable/operators';

test('AsyncIterable#flatMap with range', async () => {
const xs = of(1, 2, 3);
const ys = xs.pipe(flatMap(async (x) => range(0, x)));

const it = ys[Symbol.asyncIterator]();
hasNext(it, 0);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 0);
hasNext(it, 1);
hasNext(it, 2);
noNext(it);
expect(await toArray(ys)).toEqual([0, 0, 0, 1, 1, 2]);
});

test('AsyncIterable#flatMap selector returns throw', async () => {
Expand Down
2 changes: 1 addition & 1 deletion spec/asynciterable-operators/mergeall-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import { mergeAll } from 'ix/asynciterable/operators';

test('AsyncIterable#merge mergeAll behavior', async () => {
const res = of(of(1, 2, 3), of(4, 5)).pipe(mergeAll());
expect(await toArray(res)).toEqual([1, 2, 3, 4, 5]);
expect(await toArray(res)).toEqual([1, 2, 4, 3, 5]);
});
8 changes: 6 additions & 2 deletions src/add/asynciterable-operators/flat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import { flat } from '../../asynciterable/operators/flat';
/**
* @ignore
*/
export function flatProto<T, D extends number = -1>(this: AsyncIterableX<T>, depth: D = -1 as any) {
return flat(depth)(this);
export function flatProto<T, D extends number = -1>(
this: AsyncIterableX<T>,
depth: D = -1 as any,
concurrent = Infinity
) {
return flat(depth, concurrent)(this);
}

AsyncIterableX.prototype.flat = flatProto;
Expand Down
3 changes: 2 additions & 1 deletion src/add/asynciterable-operators/flatmap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import { FlattenConcurrentSelector } from '../../asynciterable/operators/_flatte
export function flatMapProto<T, R>(
this: AsyncIterableX<T>,
selector: FlattenConcurrentSelector<T, R>,
concurrent = Infinity,
thisArg?: any
) {
return flatMap(selector, thisArg)(this);
return flatMap(selector, concurrent, thisArg)(this);
}

AsyncIterableX.prototype.flatMap = flatMapProto;
Expand Down
7 changes: 5 additions & 2 deletions src/add/asynciterable-operators/mergeall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { mergeAll } from '../../asynciterable/operators/mergeall';
/**
* @ignore
*/
export function mergeAllProto<T>(this: AsyncIterableX<AsyncIterable<T>>): AsyncIterableX<T> {
return mergeAll()(this);
export function mergeAllProto<T>(
this: AsyncIterableX<AsyncIterable<T>>,
concurrent = Infinity
): AsyncIterableX<T> {
return mergeAll(concurrent)(this);
}

AsyncIterableX.prototype.mergeAll = mergeAllProto;
Expand Down
4 changes: 2 additions & 2 deletions src/asynciterable/operators/flat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type FlattenWithDepth<Arr, Depth extends number> = {
* @param {number} [depth=Infinity] The depth to flatten the async-iterable sequence if specified, otherwise infinite.
* @returns {MonoTypeOperatorAsyncFunction<T>} An operator that flattens the async-iterable sequence.
*/
export function flat<D extends number = -1>(depth: D = -1 as any) {
export function flat<D extends number = -1>(depth: D = -1 as any, concurrent = Infinity) {
depth = (depth < 0 ? Infinity : depth) as any;
return function flattenOperatorFunction<T>(
source: AsyncIterable<T>
Expand All @@ -35,6 +35,6 @@ export function flat<D extends number = -1>(depth: D = -1 as any) {
return depth > 0 ? flat(depth - 1)(item) : item;
}
return [item];
})(source) as AsyncIterableX<Flattened<T, D>>;
}, concurrent)(source) as AsyncIterableX<Flattened<T, D>>;
};
}
3 changes: 2 additions & 1 deletion src/asynciterable/operators/flatmap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import { OperatorAsyncFunction } from '../../interfaces';
*/
export function flatMap<TSource, TResult>(
selector: FlattenConcurrentSelector<TSource, TResult>,
concurrent = Infinity,
thisArg?: any
): OperatorAsyncFunction<TSource, TResult> {
return function flatMapOperatorFunction(source) {
return new FlattenConcurrentAsyncIterable(source, selector, 1, false, thisArg);
return new FlattenConcurrentAsyncIterable(source, selector, concurrent, false, thisArg);
};
}
4 changes: 2 additions & 2 deletions src/asynciterable/operators/mergeall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { FlattenConcurrentAsyncIterable } from './_flatten';
* @template TSource The type of the elements in the source sequences.
* @returns {OperatorAsyncFunction<AsyncIterable<TSource>, TSource>} The async-iterable sequence that merges the elements of the inner sequences.
*/
export function mergeAll() {
export function mergeAll(concurrent = Infinity) {
return function mergeAllOperatorFunction<TSource>(source: AsyncIterable<AsyncIterable<TSource>>) {
return new FlattenConcurrentAsyncIterable(source, (s) => s, 1, false);
return new FlattenConcurrentAsyncIterable(source, (s) => s, concurrent, false);
};
}

0 comments on commit 32137a7

Please sign in to comment.