Skip to content

Commit

Permalink
feat: TransformMapOptions.onDone
Browse files Browse the repository at this point in the history
which allows to "export" the stats after the pipeline is done
  • Loading branch information
kirillgroshkov committed Apr 17, 2024
1 parent 69e6445 commit f65af56
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 104 deletions.
8 changes: 8 additions & 0 deletions src/stream/progressLogger.ts
Expand Up @@ -101,6 +101,12 @@ export interface ProgressLoggerCfg<T = any> {
*/
extra?: (chunk: T | undefined, index: number) => AnyObject

/**
* Hook that is called when the last item is passed through.
* Passes the final stats as `ProgressLogItem`.
*/
onProgressDone?: (stats: ProgressLogItem) => any

/**
* If specified - will multiply the counter by this number.
* Useful e.g when using `transformChunk({ chunkSize: 500 })`, so
Expand Down Expand Up @@ -298,6 +304,8 @@ export class ProgressLogger<T> implements Disposable {
batchedProgress,
)} rows with total RPS of ${yellow(rpsTotal)}`,
)

this.cfg.onProgressDone?.(o)
}
}
}
Expand Down
18 changes: 17 additions & 1 deletion src/stream/transform/transformLogProgress.test.ts
@@ -1,6 +1,12 @@
import { Readable } from 'node:stream'
import { pDelay } from '@naturalcycles/js-lib'
import { _pipeline, progressReadableMapper, readableFrom, writableVoid } from '../..'
import {
_pipeline,
ProgressLogItem,
progressReadableMapper,
readableFrom,
writableVoid,
} from '../..'
import { transformLogProgress } from './transformLogProgress'

// todo: AsyncIterable2 (or Iterable2.mapAsync) should be implemented in js-lib
Expand All @@ -19,6 +25,7 @@ test('transformLogProgress', async () => {
// const readable = readableFromArray(_range(0, 11), i => pDelay(10, i))
// const readable = Readable.from(AsyncSequence.create(1, i => (i === 10 ? END : pDelay(10, i + 1))))
const readable = Readable.from(rangeItAsync(1, 11, 10))
let stats: ProgressLogItem

await _pipeline([
readable,
Expand All @@ -36,10 +43,19 @@ test('transformLogProgress', async () => {
aaa: index,
}
},
onProgressDone: s => (stats = s),
}),
// transformLogProgress({logProgressInterval: 10}),
writableVoid(),
])

expect(stats!).toEqual({
progress_final: 10,
peakRSS: expect.any(Number),
rps10: expect.any(Number),
rpsTotal: expect.any(Number),
rss: expect.any(Number),
})
})

test('progressReadableMapper', async () => {
Expand Down
69 changes: 64 additions & 5 deletions src/stream/transform/transformMap.test.ts
@@ -1,6 +1,12 @@
import { Readable } from 'node:stream'
import { AsyncMapper, ErrorMode, _range, pExpectedError, _stringify } from '@naturalcycles/js-lib'
import { readableFromArray, _pipeline, _pipelineToArray, transformMap } from '../../index'
import {
readableFromArray,
_pipeline,
_pipelineToArray,
transformMap,
TransformMapStats,
} from '../../index'

interface Item {
id: string
Expand Down Expand Up @@ -37,10 +43,15 @@ test('transformMap with mapping', async () => {
})

test('transformMap emit array as multiple items', async () => {
let stats: TransformMapStats
const data = _range(1, 4)
const data2 = await _pipelineToArray<number>([
readableFromArray(data),
transformMap(n => [n * 2, n * 2 + 1], { flattenArrayOutput: true }),
transformMap(n => [n * 2, n * 2 + 1], {
flattenArrayOutput: true,
// async is to test that it's awaited
onDone: async s => (stats = s),
}),
])

const expected: number[] = []
Expand All @@ -51,6 +62,16 @@ test('transformMap emit array as multiple items', async () => {
// console.log(data2)

expect(data2).toEqual(expected)

expect(stats!).toMatchInlineSnapshot(`
{
"collectedErrors": [],
"countErrors": 0,
"countIn": 3,
"countOut": 6,
"ok": true,
}
`)
})

// non-object mode is not supported anymore
Expand All @@ -69,32 +90,47 @@ test('transformMap emit array as multiple items', async () => {
// })

test('transformMap errorMode=THROW_IMMEDIATELY', async () => {
let stats: TransformMapStats
const data: Item[] = _range(1, 5).map(n => ({ id: String(n) }))
const readable = readableFromArray(data)
const data2: Item[] = []

await expect(
_pipeline([
readable,
transformMap(mapperError3, { concurrency: 1 }),
transformMap(mapperError3, { concurrency: 1, onDone: s => (stats = s) }),
transformMap<Item, void>(r => void data2.push(r)),
]),
).rejects.toThrow('my error')

expect(data2).toEqual(data.filter(r => Number(r.id) < 3))

// expect(readable.destroyed).toBe(true)

expect(stats!).toMatchInlineSnapshot(`
{
"collectedErrors": [],
"countErrors": 1,
"countIn": 3,
"countOut": 2,
"ok": false,
}
`)
})

test('transformMap errorMode=THROW_AGGREGATED', async () => {
let stats: TransformMapStats
const data: Item[] = _range(1, 5).map(n => ({ id: String(n) }))
const readable = readableFromArray(data)
const data2: Item[] = []

const err = await pExpectedError(
_pipeline([
readable,
transformMap(mapperError3, { errorMode: ErrorMode.THROW_AGGREGATED }),
transformMap(mapperError3, {
errorMode: ErrorMode.THROW_AGGREGATED,
onDone: s => (stats = s),
}),
transformMap<Item, void>(r => void data2.push(r)),
]),
AggregateError,
Expand All @@ -108,20 +144,43 @@ test('transformMap errorMode=THROW_AGGREGATED', async () => {
expect(data2).toEqual(data.filter(r => r.id !== '3'))

// expect(readable.destroyed).toBe(true)

expect(stats!).toMatchInlineSnapshot(`
{
"collectedErrors": [
[Error: my error],
],
"countErrors": 1,
"countIn": 4,
"countOut": 3,
"ok": false,
}
`)
})

test('transformMap errorMode=SUPPRESS', async () => {
let stats: TransformMapStats
const data: Item[] = _range(1, 5).map(n => ({ id: String(n) }))
const readable = readableFromArray(data)

const data2: Item[] = []
await _pipeline([
readable,
transformMap(mapperError3, { errorMode: ErrorMode.SUPPRESS }),
transformMap(mapperError3, { errorMode: ErrorMode.SUPPRESS, onDone: s => (stats = s) }),
transformMap<Item, void>(r => void data2.push(r)),
])

expect(data2).toEqual(data.filter(r => r.id !== '3'))

// expect(readable.destroyed).toBe(true)

expect(stats!).toMatchInlineSnapshot(`
{
"collectedErrors": [],
"countErrors": 1,
"countIn": 4,
"countOut": 3,
"ok": true,
}
`)
})
53 changes: 53 additions & 0 deletions src/stream/transform/transformMap.ts
Expand Up @@ -49,6 +49,18 @@ export interface TransformMapOptions<IN = any, OUT = IN> {
*/
onError?: (err: Error, input: IN) => any

/**
* A hook that is called when the last item is finished processing.
* stats object is passed, containing countIn and countOut -
* number of items that entered the transform and number of items that left it.
*
* Callback is called **before** [possible] Aggregated error is thrown,
* and before [possible] THROW_IMMEDIATELY error.
*
* onDone callback will be called before Error is thrown.
*/
onDone?: (stats: TransformMapStats) => any

/**
* Progress metric
*
Expand All @@ -59,6 +71,20 @@ export interface TransformMapOptions<IN = any, OUT = IN> {
logger?: CommonLogger
}

export interface TransformMapStats {
/**
* True if transform was successful (didn't throw Immediate or Aggregated error).
*/
ok: boolean
/**
* Only used (and returned) for ErrorMode.Aggregated
*/
collectedErrors: Error[]
countErrors: number
countIn: number
countOut: number
}

// doesn't work, cause here we don't construct our Transform instance ourselves
// export class TransformMap extends AbortableTransform {}

Expand All @@ -84,11 +110,13 @@ export function transformMap<IN = any, OUT = IN>(
errorMode = ErrorMode.THROW_IMMEDIATELY,
flattenArrayOutput,
onError,
onDone,
metric = 'stream',
logger = console,
} = opt

let index = -1
let countOut = 0
let isSettled = false
let errors = 0
const collectedErrors: Error[] = [] // only used if errorMode == THROW_AGGREGATED
Expand All @@ -102,6 +130,14 @@ export function transformMap<IN = any, OUT = IN>(
logErrorStats(true)

if (collectedErrors.length) {
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
})

// emit Aggregated error
cb(
new AggregateError(
Expand All @@ -111,6 +147,15 @@ export function transformMap<IN = any, OUT = IN>(
)
} else {
// emit no error

onDone?.({
ok: true,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
})

cb()
}
},
Expand All @@ -134,6 +179,7 @@ export function transformMap<IN = any, OUT = IN>(
},
)

countOut += passedResults.length
passedResults.forEach(r => this.push(r))

if (isSettled) {
Expand All @@ -155,6 +201,13 @@ export function transformMap<IN = any, OUT = IN>(

if (errorMode === ErrorMode.THROW_IMMEDIATELY) {
isSettled = true
onDone?.({
ok: false,
collectedErrors,
countErrors: errors,
countIn: index + 1,
countOut,
})
return cb(err) // Emit error immediately
}

Expand Down
42 changes: 35 additions & 7 deletions src/stream/transform/transformMapSync.test.ts
@@ -1,6 +1,6 @@
import { Readable } from 'node:stream'
import { AppError, ErrorMode, _range, pTry } from '@naturalcycles/js-lib'
import { writableVoid, _pipeline } from '../..'
import { writableVoid, _pipeline, TransformMapStats } from '../..'
import { transformMapSync } from './transformMapSync'

interface Item {
Expand All @@ -20,19 +20,25 @@ test('transformMapSync simple', async () => {
})

test('transformMapSync error', async () => {
let stats: TransformMapStats
const data = _range(100).map(String)

const data2: string[] = []
const [err] = await pTry(
_pipeline([
Readable.from(data),
transformMapSync<string, void>((r, i) => {
if (i === 50) {
throw new AppError('error on 50th')
}
transformMapSync<string, void>(
(r, i) => {
if (i === 50) {
throw new AppError('error on 50th')
}

data2.push(r)
}),
data2.push(r)
},
{
onDone: s => (stats = s),
},
),
writableVoid(),
]),
)
Expand All @@ -41,9 +47,20 @@ test('transformMapSync error', async () => {
expect(err).toMatchInlineSnapshot(`[AppError: error on 50th]`)

expect(data2).toEqual(data.slice(0, 50))

expect(stats!).toMatchInlineSnapshot(`
{
"collectedErrors": [],
"countErrors": 1,
"countIn": 51,
"countOut": 50,
"ok": false,
}
`)
})

test('transformMapSync suppressed error', async () => {
let stats: TransformMapStats
const data = _range(100).map(String)

const data2: string[] = []
Expand All @@ -59,10 +76,21 @@ test('transformMapSync suppressed error', async () => {
},
{
errorMode: ErrorMode.SUPPRESS,
onDone: s => (stats = s),
},
),
writableVoid(),
])

expect(data2).toEqual(data.filter(r => r !== '50'))

expect(stats!).toMatchInlineSnapshot(`
{
"collectedErrors": [],
"countErrors": 1,
"countIn": 100,
"countOut": 99,
"ok": true,
}
`)
})

0 comments on commit f65af56

Please sign in to comment.