diff --git a/src/util/StreamUtil.ts b/src/util/StreamUtil.ts index 6e3ab02c97..dfcb0e9e7a 100644 --- a/src/util/StreamUtil.ts +++ b/src/util/StreamUtil.ts @@ -1,5 +1,5 @@ -import type { Writable, ReadableOptions } from 'stream'; -import { Readable } from 'stream'; +import type { Writable, ReadableOptions, DuplexOptions } from 'stream'; +import { Readable, Transform } from 'stream'; import arrayifyStream from 'arrayify-stream'; import { getLoggerFor } from '../logging/LogUtil'; import type { Guarded } from './GuardedStream'; @@ -43,6 +43,59 @@ export function pipeSafely(readable: NodeJS.ReadableStream, return guardStream(destination); } +export interface AsyncTransformOptions extends DuplexOptions { + /** + * Transforms data from the source by calling the `push` method + */ + transform?: (this: Transform, data: T, encoding: string) => any | Promise; + + /** + * Performs any final actions after the source has ended + */ + flush?: (this: Transform) => any | Promise; +} + +/** + * Transforms a stream, ensuring that all errors are forwarded. + * @param source - The stream to be transformed + * @param options - The transformation options + * + * @returns The transformed stream + */ +export function transformSafely( + source: NodeJS.ReadableStream, + { + transform = function(data): void { + this.push(data); + }, + flush = (): null => null, + ...options + }: AsyncTransformOptions = {}, +): + Guarded { + return pipeSafely(source, new Transform({ + ...options, + async transform(data, encoding, callback): Promise { + let error: Error | null = null; + try { + await transform.call(this, data, encoding); + } catch (err: unknown) { + error = err as Error; + } + callback(error); + }, + async flush(callback): Promise { + let error: Error | null = null; + try { + await flush.call(this); + } catch (err: unknown) { + error = err as Error; + } + callback(error); + }, + })); +} + /** * Converts an iterable to a stream and applies an error guard so that it is {@link Guarded}. * @param iterable - Data to stream. diff --git a/test/unit/util/StreamUtil.test.ts b/test/unit/util/StreamUtil.test.ts index 42beb5bc89..f98d71f154 100644 --- a/test/unit/util/StreamUtil.test.ts +++ b/test/unit/util/StreamUtil.test.ts @@ -1,6 +1,7 @@ import { PassThrough } from 'stream'; +import arrayifyStream from 'arrayify-stream'; import streamifyArray from 'streamify-array'; -import { guardedStreamFrom, pipeSafely, readableToString } from '../../../src/util/StreamUtil'; +import { guardedStreamFrom, pipeSafely, transformSafely, readableToString } from '../../../src/util/StreamUtil'; describe('StreamUtil', (): void => { describe('#readableToString', (): void => { @@ -41,6 +42,104 @@ describe('StreamUtil', (): void => { }); }); + describe('#transformSafely', (): void => { + it('can transform a stream without arguments.', async(): Promise => { + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source); + transformed.setEncoding('utf8'); + const result = await arrayifyStream(transformed); + expect(result).toEqual([ 'data' ]); + }); + + it('can transform a stream synchronously.', async(): Promise => { + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source, { + encoding: 'utf8', + transform(data: string): void { + this.push(`${data}1`); + this.push(`${data}2`); + }, + flush(): void { + this.push(`data3`); + }, + }); + const result = await arrayifyStream(transformed); + expect(result).toEqual([ 'data1', 'data2', 'data3' ]); + }); + + it('can transform a stream asynchronously.', async(): Promise => { + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source, { + encoding: 'utf8', + async transform(data: string): Promise { + await new Promise((resolve): any => setImmediate(resolve)); + this.push(`${data}1`); + this.push(`${data}2`); + }, + async flush(): Promise { + await new Promise((resolve): any => setImmediate(resolve)); + this.push(`data3`); + }, + }); + const result = await arrayifyStream(transformed); + expect(result).toEqual([ 'data1', 'data2', 'data3' ]); + }); + + it('catches source errors.', async(): Promise => { + const error = new Error('stream error'); + const source = new PassThrough(); + const transformed = transformSafely(source); + source.emit('error', error); + await expect(arrayifyStream(transformed)).rejects.toThrow(error); + }); + + it('catches synchronous errors on transform.', async(): Promise => { + const error = new Error('stream error'); + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source, { + transform(): never { + throw error; + }, + }); + await expect(arrayifyStream(transformed)).rejects.toThrow(error); + }); + + it('catches synchronous errors on flush.', async(): Promise => { + const error = new Error('stream error'); + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source, { + async flush(): Promise { + await new Promise((resolve): any => setImmediate(resolve)); + throw error; + }, + }); + await expect(arrayifyStream(transformed)).rejects.toThrow(error); + }); + + it('catches asynchronous errors on transform.', async(): Promise => { + const error = new Error('stream error'); + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source, { + transform(): never { + throw error; + }, + }); + await expect(arrayifyStream(transformed)).rejects.toThrow(error); + }); + + it('catches asynchronous errors on flush.', async(): Promise => { + const error = new Error('stream error'); + const source = streamifyArray([ 'data' ]); + const transformed = transformSafely(source, { + async flush(): Promise { + await new Promise((resolve): any => setImmediate(resolve)); + throw error; + }, + }); + await expect(arrayifyStream(transformed)).rejects.toThrow(error); + }); + }); + describe('#guardedStreamFrom', (): void => { it('converts data to a guarded stream.', async(): Promise => { const data = [ 'a', 'b' ];