From 46494ce11250ad0568f36106c539767528045a59 Mon Sep 17 00:00:00 2001 From: Denis DelGrosso Date: Tue, 9 Aug 2022 16:35:21 +0000 Subject: [PATCH] implement final, fix _write to invoke callback on nextTick --- src/util.ts | 25 +++++++++++++++++++------ test/file.ts | 6 ++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/src/util.ts b/src/util.ts index 9322ad56d..90ebafcca 100644 --- a/src/util.ts +++ b/src/util.ts @@ -13,7 +13,7 @@ // limitations under the License. import * as querystring from 'querystring'; -import {PassThrough} from 'stream'; +import {PassThrough, TransformCallback} from 'stream'; export function normalize( optionsOrCallback?: T | U, @@ -172,10 +172,6 @@ export class PassThroughShim extends PassThrough { private shouldEmitReading = true; private shouldEmitWriting = true; - constructor() { - super(); - } - _read(size: number): void { if (this.shouldEmitReading) { this.emit('reading'); @@ -193,6 +189,23 @@ export class PassThroughShim extends PassThrough { this.emit('writing'); this.shouldEmitWriting = false; } - super._write(chunk, encoding, callback); + // Per the nodejs documention, callback must be invoked on the next tick + process.nextTick(() => { + super._write(chunk, encoding, callback); + }); + } + + _final(callback: (error?: Error | null | undefined) => void): void { + // If the stream is empty (i.e. empty file) final will be invoked before _read / _write + // and we should still emit the proper events. + if (this.shouldEmitReading) { + this.emit('reading'); + this.shouldEmitReading = false; + } + if (this.shouldEmitWriting) { + this.emit('writing'); + this.shouldEmitWriting = false; + } + super._final(callback); } } diff --git a/test/file.ts b/test/file.ts index fb980dcf3..9c9a2ffe8 100644 --- a/test/file.ts +++ b/test/file.ts @@ -1916,10 +1916,8 @@ describe('File', () => { const uploadStream = new PassThrough(); file.startResumableUpload_ = (dup: duplexify.Duplexify) => { - process.nextTick(() => { - dup.setWritable(uploadStream); - uploadStream.emit('error', error); - }); + dup.setWritable(uploadStream); + uploadStream.emit('error', error); }; const writable = file.createWriteStream();