Skip to content

Commit

Permalink
fix: clean up the async reducers
Browse files Browse the repository at this point in the history
fix: correctly handle `undefined` initial value.
  • Loading branch information
Jason3S committed Apr 8, 2020
1 parent e52957e commit 8811368
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/GenSequence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ describe('GenSequence Tests', () => {
const values = [1, 2, 3, 4, 5].map(x => Promise.resolve<number>(x));
const gs = genSequence(values);
const result = await gs.reduceAsync((a, v) => a + v);
expect(result).toEqual(16);
expect(result).toEqual(15);
});

test('tests reducing asynchronously a sequence with init', async () => {
Expand Down
108 changes: 107 additions & 1 deletion src/operators/operatorsBase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('Tests Operators', () => {

test('makeIterable from Iterable', () => {
const a = [1, 2, 3];
const i = op.makeIterable(a[Symbol.iterator]());
const i = op.makeIterable(toIterable(a));
expect([...i,...i]).toEqual(a);
});

Expand All @@ -53,6 +53,74 @@ describe('Tests Operators', () => {
const i = op.makeIterable(toIterator(a));
expect([...i,...i]).toEqual(a);
});

test('makeAsyncIterable from Iterable', async () => {
const a = [1, 2, 3];
const i = op.makeAsyncIterable(toIterable(a));
const r: typeof a = [];
for await (const v of i) {
r.push(v);
}
expect(r).toEqual(a);
});

test('makeAsyncIterable from Iterator', async () => {
const a = [1, 2, 3];
const i = op.makeAsyncIterable(toIterator(a));
const r: typeof a = [];
for await (const v of i) {
r.push(v);
}
expect(r).toEqual(a);
});

test('makeAsyncIterable from Iterator', async () => {
const a = [1, 2, 3];
const i = op.makeAsyncIterable(toAsyncIterable(a));
const r: typeof a = [];
for await (const v of i) {
r.push(v);
}
expect(r).toEqual(a);
});

test('reduce', () => {
const fn = (a: number, b: number) => a + b;
const a = [1, 2, 3];
expect(op.reduce(fn, undefined, a)).toBe(6);
expect(op.reduce(fn, 10, a)).toBe(16);
expect(op.reduce(fn, undefined, toIterable(a))).toBe(6);
expect(op.reduce(fn, 10, toIterable(a))).toBe(16);
});

test('reduceAsync', async () => {
const fn = (a: number, b: number) => a + b;
const a = [1, 2, 3];
expect(await op.reduceAsync(fn, a, undefined)).toBe(6);
expect(await op.reduceAsync(fn, a, 10)).toBe(16);
expect(await op.reduceAsync(fn, toIterable(a), undefined)).toBe(6);
expect(await op.reduceAsync(fn, toIterable(a), 10)).toBe(16);
});

test('reduceAsyncForAsyncIterator', async () => {
const fn = (a: number, b: number) => a + b;
const a = [1, 2, 3];
const b: Iterable<number> = a;
expect(await op.reduceAsyncForAsyncIterator(fn, toAsyncIterable(a), undefined)).toBe(6);
expect(await op.reduceAsyncForAsyncIterator(fn, toAsyncIterable(a), 10)).toBe(16);
expect(await op.reduceAsyncForAsyncIterator(fn, op.makeAsyncIterable(a), undefined)).toBe(6);
expect(await op.reduceAsyncForAsyncIterator(fn, op.makeAsyncIterable(b), 10)).toBe(16);
});

test('reduceAsync nested promise', async () => {
const timeout = (ms: number) => new Promise<void>(resolve => setTimeout(resolve, ms));
const toPromise = <T>(i: T) => timeout(0).then(() => i);
const a = [1, 2, 3];
const p = a.map(toPromise);
const pp = p.map(toPromise);
const r = await op.reduceAsync((a, b) => a + b, pp, 1);
expect(r).toBe(7);
});
});

function *forceIterable<T>(i: Iterable<T> | IterableIterator<T>): IterableIterator<T> {
Expand All @@ -66,3 +134,41 @@ function* fib() {
[a, b] = [b, a + b];
}
}

function toIterable<T>(a: T[]): Iterable<T> {
let i = 0;
let used = false;
const next = () => ({
done: i >= a.length,
value: a[i++]
})
const iterator = () => {
if (used) {
throw 'Iterator Retry Error'
}
used = true;
return { next };
};
return {
[Symbol.iterator]: iterator
}
}

function toAsyncIterable<T>(a: T[]): AsyncIterable<T> {
let i = 0;
let used = false;
const next = () => Promise.resolve({
done: i >= a.length,
value: a[i++]
});
const iterator = () => {
if (used) {
throw 'Iterator Retry Error'
}
used = true;
return { next };
};
return {
[Symbol.asyncIterator]: iterator
}
}
92 changes: 67 additions & 25 deletions src/operators/operatorsBase.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Maybe, IterableLike, ThenArg, AsyncIterableLike } from '../types';
import { Maybe, IterableLike, AsyncIterableLike, IterableOfPromise } from '../types';

/**
* Operators used by Sequence
Expand Down Expand Up @@ -171,76 +171,118 @@ export function reduce<T, U>(fnReduce: (prevValue: U, curValue: T, curIndex: num
export function reduce<T>(fnReduce: (prevValue: T, curValue: T, curIndex: number) => T, initialValue: T, i: IterableLike<T>): T;
export function reduce<T>(fnReduce: (prevValue: T, curValue: T, curIndex: number) => T, initialValue: Maybe<T>, i: IterableLike<T>): Maybe<T>;
export function reduce<T>(fnReduce: (prevValue: T, curValue: T, curIndex: number) => T, initialValue: Maybe<T>, i: IterableLike<T>): Maybe<T> {
// We need to create a new iterable to prevent for...of from restarting an array.
const iter = makeIterable(i[Symbol.iterator]());
let index = 0;
if (initialValue === undefined) {
index = 1;
const r = i[Symbol.iterator]().next();
const r = iter.next();
initialValue = r.value;
}
let prevValue: T = initialValue!;
for (const t of i) {
for (const t of iter) {
const nextValue = fnReduce(prevValue, t, index);
prevValue = nextValue;
index += 1;
}
return prevValue;
}

export async function reduceAsync<T, U>(fnReduceAsync: (previosValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<U> | Promise<ThenArg<U>>, i: IterableLike<ThenArg<T>>, initialValue?: ThenArg<U>): Promise<ThenArg<U>>;
export async function reduceAsync<T>(fnReduceAsync: (previosValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<T> | Promise<ThenArg<T>>, i: IterableLike<ThenArg<T>>, initialValue?: ThenArg<T>): Promise<ThenArg<T>>;
export async function reduceAsync<T>(fnReduceAsync: (previosValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<T>>, i: IterableLike<ThenArg<T>>, initialValue?: ThenArg<T>): Promise<ThenArg<T>> {
export async function reduceAsync<T, U>(fnReduce: (previousValue: U, currentValue: T, currentIndex: number) => U | Promise<U>, i: IterableOfPromise<T>, initialValue: U | Promise<U>): Promise<U>;
export async function reduceAsync<T>(fnReduce: (previousValue: T, currentValue: T, currentIndex: number) => T | Promise<T>, i: IterableOfPromise<T>, initialValue?: T | Promise<T>): Promise<T>;
export async function reduceAsync<T>(fnReduce: (previousValue: T, currentValue: T, currentIndex: number) => T | Promise<T>, i: IterableOfPromise<T>, initialValue?: T | Promise<T>): Promise<T> {
// We need to create a new iterable to prevent for...of from restarting an array.
const iter = makeIterable((i as Iterable<Promise<T>>)[Symbol.iterator]());
let index = 0;
if (initialValue === undefined) {
index = 1;
const r = await i[Symbol.iterator]().next();
const r = iter.next();
initialValue = r.value;
}
let previosValue = await initialValue as ThenArg<T>;
let previousValue = await initialValue;

for await (const t of i) {
const nextValue = await fnReduceAsync(previosValue, t, index);
previosValue = nextValue;
for (const p of iter) {
const t = await p;
const nextValue = await fnReduce(previousValue!, t, index);
previousValue = nextValue;
index += 1;
}
return previosValue;
return previousValue!;
}

export async function reduceAsyncForAsyncIterator<T, U>(fnReduceAsync: (previosValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<U> | Promise<ThenArg<U>>, i: AsyncIterableLike<ThenArg<T>>, initialValue?: ThenArg<U>): Promise<ThenArg<U>>;
export async function reduceAsyncForAsyncIterator<T>(fnReduceAsync: (previosValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<T> | Promise<ThenArg<T>>, i: AsyncIterableLike<ThenArg<T>>, initialValue?: ThenArg<T>): Promise<ThenArg<T>>;
export async function reduceAsyncForAsyncIterator<T>(fnReduceAsync: (previosValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<T>>, i: AsyncIterableLike<ThenArg<T>>, initialValue?: ThenArg<T>): Promise<ThenArg<T>> {
export async function reduceAsyncForAsyncIterator<T, U>(fnReduce: (previousValue: U, currentValue: T, currentIndex: number) => U | Promise<U>, i: AsyncIterableLike<T>, initialValue?: U | Promise<U>): Promise<U>;
export async function reduceAsyncForAsyncIterator<T>(fnReduce: (previousValue: T, currentValue: T, currentIndex: number) => T | Promise<T>, i: AsyncIterableLike<T>, initialValue?: T | Promise<T>): Promise<T>;
export async function reduceAsyncForAsyncIterator<T>(fnReduce: (previousValue: T, currentValue: T, currentIndex: number) => T | Promise<T>, i: AsyncIterableLike<T>, initialValue?: T | Promise<T>): Promise<T> {
const iter = makeAsyncIterable(i[Symbol.asyncIterator]());
let index = 0;
if (initialValue === undefined) {
index = 1;
const r = await i[Symbol.asyncIterator]().next();
const r = await iter.next();
initialValue = r.value;
}
let previosValue = await initialValue as ThenArg<T>;
let previousValue = await initialValue;

for await (const t of i) {
const nextValue = await fnReduceAsync(previosValue, t, index);
previosValue = nextValue;
for await (const t of iter) {
const nextValue = await fnReduce(previousValue!, t, index);
previousValue = nextValue;
index += 1;
}
return previosValue;
return previousValue!;
}

//// Utilities
/**
* Convert an Iterator into an IterableIterator
*/
export function makeIterable<T>(i: Iterator<T> | IterableIterator<T>) {
function* iterate() {
export function makeIterable<T>(i: Iterator<T> | Iterable<T> | IterableIterator<T>): IterableIterator<T> {
function* fromIterator(i: Iterator<T>) {
for (let r = i.next(); ! r.done; r = i.next()) {
yield r.value;
}
}
return isIterable(i) ? i : iterate();
function* fromIterable(i: Iterable<T>): IterableIterator<T> {
yield *i;
}
return isIterable(i) ? (isIterableIterator(i) ? i : fromIterable(i)) : fromIterator(i);
}

export function isIterable<T>(i: Iterator<T> | IterableLike<T>): i is IterableLike<T> {
export function isIterable<T>(i: Iterator<T> | IterableLike<T> | AsyncIterator<T> | AsyncIterableIterator<T>): i is IterableLike<T> {
return !!(i as IterableIterator<T>)[Symbol.iterator];
}

export function isIterableIterator<T>(i: IterableLike<T>): i is IterableIterator<T> {
return typeof (i as IterableIterator<T>).next == 'function';
}

export function makeAsyncIterable<T>(i: Iterator<T> | Iterable<T> | IterableIterator<T> | AsyncIterator<T> | AsyncIterable<T> | AsyncIterableIterator<T>): AsyncIterableIterator<T> {
async function* fromIterable(i: IterableIterator<T> | Iterable<T>) {
for (const v of i) {
yield v;
}
}
async function* fromIterator(i: AsyncIterator<T> | Iterator<T>) {
for (let r = await i.next(); ! r.done; r = await i.next()) {
yield r.value;
}
}

async function* fromAsyncIterable(i: AsyncIterable<T>) {
yield *i;
}

return isAsyncIterable(i) ? (isAsyncIterableIterator(i) ? i : fromAsyncIterable(i)) :
isIterable(i) ? fromIterable(i) :
fromIterator(i);
}

export function isAsyncIterable<T>(i: Iterator<T> | Iterable<T> | AsyncIterator<T> | AsyncIterable<T> | AsyncIterableIterator<T>): i is AsyncIterableLike<T> {
return !!(i as AsyncIterableLike<T>)[Symbol.asyncIterator];
}

export function isAsyncIterableIterator<T>(i: AsyncIterableLike<T>): i is AsyncIterableIterator<T> {
return typeof (i as AsyncIterableIterator<T>).next == 'function';
}

/**
* Creates a scan function that can be used in a map function.
*/
Expand Down
25 changes: 15 additions & 10 deletions src/types.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
export type Maybe<T> = T | undefined;
export type ThenArg<T> = T extends Promise<infer U> ? U : T
export type ThenArg<T> = T extends PromiseLike<PromiseLike<PromiseLike<infer U>>> ? U :
T extends PromiseLike<PromiseLike<infer U>> ? U :
T extends PromiseLike<infer U> ? U :
T;

export type IterableOfPromise<T> = IterableLike<Promise<Promise<T>>> | IterableLike<Promise<T>> | IterableLike<T>;

export interface IterableLike<T> {
[Symbol.iterator](): Iterator<T> | IterableIterator<T>;
}

export interface AsyncIterableLike<T> {
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
[Symbol.asyncIterator](): AsyncIterableIterator<T> | AsyncIterator<T>;
}

export interface GenIterable<T> extends IterableLike<T> {}
Expand Down Expand Up @@ -54,10 +59,10 @@ export interface Sequence<T> extends IterableLike<T> {
/** reduce function see Array.reduce */
reduce(fnReduce: (previousValue: T, currentValue: T, currentIndex: number) => T): Maybe<T>;
reduce<U>(fnReduce: (previousValue: U, currentValue: T, currentIndex: number) => U, initialValue: U): U;
reduceAsync(fnReduceAsync: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<T>): Promise<ThenArg<T>>;
reduceAsync(fnReduceAsync: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<T>>): Promise<ThenArg<T>>;
reduceAsync<U>(fnReduceAsync: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<U>, initialValue: U): Promise<ThenArg<U>>;
reduceAsync<U>(fnReduceAsync: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<U>>, initialValue: U): Promise<ThenArg<U>>;
reduceAsync(fnReduce: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<T>): Promise<ThenArg<T>>;
reduceAsync(fnReduce: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<T>>): Promise<ThenArg<T>>;
reduceAsync<U>(fnReduce: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<U>, initialValue: U): Promise<ThenArg<U>>;
reduceAsync<U>(fnReduce: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<U>>, initialValue: U): Promise<ThenArg<U>>;
reduceToSequence<U, V extends GenIterable<U>>(fnReduce: (previousValue: V, currentValue: T, currentIndex: number) => V, initialValue: V): Sequence<U>;
reduceToSequence<U>(fnReduce: (previousValue: GenIterable<U>, currentValue: T, currentIndex: number) => GenIterable<U>, initialValue: GenIterable<U>): Sequence<U>;

Expand All @@ -78,10 +83,10 @@ export interface Sequence<T> extends IterableLike<T> {
}

export interface AsyncSequence<T> extends AsyncIterableLike<T> {
reduceAsync(fnReduceAsync: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<T>): Promise<ThenArg<T>>;
reduceAsync(fnReduceAsync: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<T>>): Promise<ThenArg<T>>;
reduceAsync<U>(fnReduceAsync: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<U>, initialValue: U): Promise<ThenArg<U>>;
reduceAsync<U>(fnReduceAsync: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<U>>, initialValue: U): Promise<ThenArg<U>>;
reduceAsync(fnReduce: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<T>): Promise<ThenArg<T>>;
reduceAsync(fnReduce: (previousValue: ThenArg<T>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<T>>): Promise<ThenArg<T>>;
reduceAsync<U>(fnReduce: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => ThenArg<U>, initialValue: U): Promise<ThenArg<U>>;
reduceAsync<U>(fnReduce: (previousValue: ThenArg<U>, currentValue: ThenArg<T>, currentIndex: number) => Promise<ThenArg<U>>, initialValue: U): Promise<ThenArg<U>>;
}

export interface SequenceBuilder<S, T = S> {
Expand Down

0 comments on commit 8811368

Please sign in to comment.