From 0280855679de666d1467ae33ad29adc1d4536106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustaf=20R=C3=A4ntil=C3=A4?= Date: Thu, 30 Dec 2021 10:08:08 +0100 Subject: [PATCH] feat(chunk): added support for chunking (by time or idle) filter and map operations --- README.md | 43 ++++++++++ lib/index.ts | 105 +++++++++++++++++++++--- test/index.ts | 215 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 353 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index e5ef834..8f73ecc 100644 --- a/README.md +++ b/README.md @@ -252,6 +252,29 @@ const outArray = await filter( inArray, filterFun ); const outArray = await filter( inArray, { concurrency: 4 }, filterFun ); ``` +### filter operations chunked by idle time + +Some filter operations (predicate functions) are heavy on calculations. To not starve the system (e.g. a browser) from CPU resources, the filter can be chopped up in smaller chunks with either a `setTimeout(0)` or by using [`requestIdleCallback`](https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback). + +The options used to specify concurrency can instead specify `chunk`. This implies a concurrency of 1, i.e. no concurrency. Chunking is mostly useful in synchronously heavy operations, not asynchronous. + +Specify a chunk time explicitly, e.g. 50ms: + +```ts +import { filter } from 'already' + +const outArray = await filter( inArray, { chunk: 50 }, filterFun ); +``` + +or use `requestIdleCallback` to try to maintain a hang-free experience in browsers: + +```ts +import { filter } from 'already' + +const outArray = await filter( inArray, { chunk: 'idle' }, filterFun ); +``` + + ## map Same as with `filter`, `map` acts like awaiting all promises in an array, and then applying `array.map( )` on the result. Also, just like with `filter`, it will await the resulting promises from the map callback (if they actually are promises). @@ -286,6 +309,26 @@ const outArray = await map( inArray, mapFun ); const outArray = await map( inArray, { concurrency: 4 }, mapFun ); ``` +### map operations chunked by idle time + +Some map operations (predicate functions) are heavy on calculations, just like `filter`. And for [the same reasons](#filter-operations-chunked-by-idle-time), you can select `chunk` to chunk up a map operation to not starve system from CPU resources in (synchronously) heavy map operations: + +Specify a chunk time explicitly, e.g. 50ms: + +```ts +import { map } from 'already' + +const outArray = await map( inArray, { chunk: 50 }, mapFun ); +``` + +or use `requestIdleCallback` to try to maintain a hang-free experience in browsers: + +```ts +import { map } from 'already' + +const outArray = await map( inArray, { chunk: 'idle' }, mapFun ); +``` + ## flatMap diff --git a/lib/index.ts b/lib/index.ts index 6be09d2..2343673 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -228,8 +228,14 @@ export interface ConcurrencyOptions { concurrency: number; } -export type FilterMapOptions = Partial< ConcurrencyOptions >; -const defaultFilterMapOptions: FilterMapOptions = { concurrency: Infinity }; +export interface ChunkOptions +{ + chunk: number | 'idle'; +} +export type FilterMapOptions = Partial< ConcurrencyOptions | ChunkOptions >; +const defaultFilterMapOptions: FilterMapOptions = { + concurrency: Infinity +}; export type MapArray< T > = Array< T | PromiseLike< T > > | @@ -349,12 +355,91 @@ export function map< T, U >( ? defaultFilterMapOptions : < FilterMapOptions >arr; - const { concurrency = Infinity } = opts; - const promiseMapFn = ( t: T, index: number, arr: ConcatArray< T | PromiseLike< T > > ) => Promise.resolve( ( < MapFn< T, U > >mapFn )( t, index, arr ) ); + const { concurrency = Infinity } = opts as Partial< ConcurrencyOptions >; + const { chunk } = opts as Partial< ChunkOptions >; + + if ( typeof chunk !== 'undefined' ) + { + if ( typeof chunk !== 'number' && chunk !== 'idle' ) + throw new Error( `Invalid 'chunk' option to 'map': ${chunk}` ); + + const useIdle = chunk === 'idle'; + const timeout = + chunk === 'idle' + ? 15 // 15ms, allow 1.666ms for browser work (to fill up 16.666 ms) + : chunk; + + const looper = async ( cb: ( timeout: number ) => Promise< void > ) => + new Promise< void >( ( resolve, reject ) => + { + const resolver = async ( timeout: number ) => + { + try + { + await cb( timeout ); + resolve( ); + } + catch ( err ) + { + reject( err ); + } + } + + ( useIdle && typeof requestIdleCallback !== 'undefined' ) + ? requestIdleCallback( + idleDeadline => + resolver( + // Round down to millisecond, subtract 1. + // That gives a little bit of time for the browser + // to do whatever it wants, and will to some + // degree prevent us to step over the time budget. + Math.floor( idleDeadline.timeRemaining( ) ) - 1 + ), + { timeout: 0 } + ) + : setTimeout( ( ) => resolver( timeout ), 0 ); + } ); + + return ( t: ConcatArray< T | PromiseLike< T > > ) + : Promise< Array< U > > => + { + return Promise.resolve( t ) + .then( async ( values: ConcatArray< T | PromiseLike< T > > ) => + { + const arr = toReadonlyArray( values ); + const ret: Array< U > = [ ]; + let i = 0; + + const loop = async ( timeout: number ) => + { + const start = Date.now( ); + + for ( ; i < arr.length; ) + { + const u = await promiseMapFn( await arr[ i ], i, arr ); + ret.push( u ); + ++i; + + if ( Date.now( ) - start >= timeout ) + { + await looper( loop ); + break; + } + } + } + + await loop( timeout ); + + return ret; + } ) + .then( values => Promise.all( values ) ); + }; + } + const concurrently = concurrent( concurrency ); return ( t: ConcatArray< T | PromiseLike< T > > ) @@ -362,12 +447,12 @@ export function map< T, U >( { return Promise.resolve( t ) .then( ( values: ConcatArray< T | PromiseLike< T > > ) => - toReadonlyArray( values ).map( - ( val, index, arr ) => - ( ( ) => Promise.resolve( val ) )( ) - .then( ( val: T ) => - concurrently( promiseMapFn, val, index, arr ) - ) + toReadonlyArray( values ) + .map( ( val, index, arr ) => + ( ( ) => Promise.resolve( val ) )( ) + .then( ( val: T ) => + concurrently( promiseMapFn, val, index, arr ) + ) ) ) .then( values => Promise.all( values ) ); diff --git a/test/index.ts b/test/index.ts index 91015fa..9221775 100644 --- a/test/index.ts +++ b/test/index.ts @@ -360,6 +360,221 @@ describe( "map", ( ) => } ); } ); +describe( "time-chunking", ( ) => +{ + const withIdle = ( cb: ( mock: jest.Mock ) => Promise< void > ) => + async ( ) => + { + const old = global.requestIdleCallback; + + const mock: jest.Mock< any, any > = jest.fn( + ( cb: ( idleDeadline: IdleDeadline ) => void ) => + setTimeout( + ( ) => + cb( { + didTimeout: false, + timeRemaining: ( ) => 10, + } ), + 1 + ) + ); + global.requestIdleCallback = mock; + try + { + await cb( mock ); + } + finally + { + global.requestIdleCallback = old; + } + }; + + const withTimeout = ( cb: ( mock: jest.Mock ) => Promise< void > ) => + async ( ) => + { + const old = global.setTimeout; + + const mock: jest.Mock< any, any > = jest.fn( + ( cb: ( ) => void, timeout: number ) => + old( cb, timeout ) + ); + global.setTimeout = mock as any; + try + { + await cb( mock ); + } + finally + { + global.setTimeout = old; + } + }; + + it( "map with idle", withIdle( async ( mock ) => + { + const arr = [ + 1, + 2, + Promise.resolve( 3 ), + delayChain( 5 )( 4 ), + 5, + 6, + 7, + 8, + 9, + 10, + ]; + const arr2 = await map( + arr, + { + chunk: 'idle', + }, + t => + ( t % 2 === 0 ) + ? delay( 5 ).then( ( ) => ( { t } ) ) + : ( { t } ) + ); + const arr3 = arr2.map( ( { t } ) => t ); + + expect( arr3 ).toEqual( [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] ); + expect( mock.mock.calls.length ).toBeGreaterThan( 0 ); + } ) ); + + it( "map with idle (error before scheduling)", withIdle( async ( mock ) => + { + const arr = [ + 1, + 2, + Promise.resolve( 3 ), + delayChain( 5 )( 4 ), + 5, + 6, + 7, + 8, + 9, + 10, + ]; + const reflection = await reflect( map( + arr, + { + chunk: 'idle', + }, + t => + ( t % 2 === 0 ) + ? ( ( ) => { throw new Error( 'iteration error' ); } )( ) + : ( { t } ) + ) ); + + expect( mock.mock.calls.length ).toBe( 0 ); + expect( reflection.isRejected ).toBe( true ); + expect( reflection.error?.message ).toBe( 'iteration error' ); + } ) ); + + it( "map with idle (error after scheduling)", withIdle( async ( mock ) => + { + const arr = [ + 1, + 2, + Promise.resolve( 3 ), + delayChain( 5 )( 4 ), + 5, + 6, + 7, + 8, + 9, + 10, + ]; + const reflection = await reflect( map( + arr, + { + chunk: 'idle', + }, + t => + ( t % 9 === 0 ) + ? ( ( ) => { throw new Error( 'iteration error' ); } )( ) + : ( t % 2 === 0 ) + ? delay( 5 ).then( ( ) => ( { t } ) ) + : ( { t } ) + ) ); + + expect( mock.mock.calls.length ).toBeGreaterThan( 0 ); + expect( reflection.isRejected ).toBe( true ); + expect( reflection.error?.message ).toBe( 'iteration error' ); + } ) ); + + it( "map with timeout", withTimeout( async ( mock ) => + { + const arr = [ + 1, + 2, + Promise.resolve( 3 ), + delayChain( 5 )( 4 ), + 5, + 6, + 7, + 8, + 9, + 10, + ]; + const arr2 = await map( + arr, + { + chunk: 7, + }, + t => + ( t % 2 === 0 ) + ? delay( 5 ).then( ( ) => ( { t } ) ) + : ( { t } ) + ); + const arr3 = arr2.map( ( { t } ) => t ); + + expect( arr3 ).toEqual( [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] ); + expect( mock.mock.calls.length ).toBeGreaterThan( 0 ); + } ) ); + + it( "map invalid chunk", async ( ) => + { + expect( + ( ) => + map( + [ ], + { + chunk: 'invalid value' as 'idle', + }, + t => ( { t } ) + ) + ) + .toThrowError( ); + } ); + + it( "filter with idle", withIdle( async ( mock ) => + { + const arr = [ + 1, + 2, + Promise.resolve( 3 ), + delayChain( 5 )( 4 ), + 5, + 6, + 7, + 8, + 9, + 10, + ]; + const arr2 = await filter( + arr, + { + chunk: 'idle', + }, + t => + ( t % 2 === 0 ) + ? delay( 5 ).then( ( ) => t % 3 === 0 ) + : t % 3 === 0 + ); + + expect( arr2 ).toEqual( [ 3, 6, 9 ] ); + expect( mock.mock.calls.length ).toBeGreaterThan( 0 ); + } ) ); +} ); describe( "flatMap", ( ) => {