diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index c7b356ed4421b0..638a753cbcdf43 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -488,18 +488,11 @@ function onEofChunk(stream, state) { } state.ended = true; - if (state.sync && state.length) { - // if we are sync and have data in the buffer, wait until next tick - // to emit the data. otherwise we risk emitting data in the flow() - // the readable code triggers during a read() call - emitReadable(stream); - } else { - // emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - emitReadable_(stream); - } + // emit 'readable' now to make sure it gets picked up. + state.needReadable = false; + if (!state.emittedReadable) { + state.emittedReadable = true; + emitReadable_(stream); } } diff --git a/test/parallel/test-stream-pipe-flow.js b/test/parallel/test-stream-pipe-flow.js deleted file mode 100644 index 1f8564182a3107..00000000000000 --- a/test/parallel/test-stream-pipe-flow.js +++ /dev/null @@ -1,67 +0,0 @@ -'use strict'; -const common = require('../common'); -const { Readable, Writable, PassThrough } = require('stream'); - -{ - let ticks = 17; - - const rs = new Readable({ - objectMode: true, - read: () => { - if (ticks-- > 0) - return process.nextTick(() => rs.push({})); - rs.push({}); - rs.push(null); - } - }); - - const ws = new Writable({ - highWaterMark: 0, - objectMode: true, - write: (data, end, cb) => setImmediate(cb) - }); - - rs.on('end', common.mustCall()); - ws.on('finish', common.mustCall()); - rs.pipe(ws); -} - -{ - let missing = 8; - - const rs = new Readable({ - objectMode: true, - read: () => { - if (missing--) rs.push({}); - else rs.push(null); - } - }); - - const pt = rs - .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })) - .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); - - pt.on('end', function() { - wrapper.push(null); - }); - - const wrapper = new Readable({ - objectMode: true, - read: () => { - process.nextTick(function() { - let data = pt.read(); - if (data === null) { - pt.once('readable', function() { - data = pt.read(); - if (data !== null) wrapper.push(data); - }); - } else { - wrapper.push(data); - } - }); - } - }); - - wrapper.resume(); - wrapper.on('end', common.mustCall()); -} diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js deleted file mode 100644 index 505327e247da38..00000000000000 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict'; - -const { Readable } = require('stream'); -const common = require('../common'); - -let ticks = 18; -let expectedData = 19; - -const rs = new Readable({ - objectMode: true, - read: () => { - if (ticks-- > 0) - return process.nextTick(() => rs.push({})); - rs.push({}); - rs.push(null); - } -}); - -rs.on('end', common.mustCall()); -readAndPause(); - -function readAndPause() { - // Does a on(data) -> pause -> wait -> resume -> on(data) ... loop. - // Expects on(data) to never fire if the stream is paused. - const ondata = common.mustCall((data) => { - rs.pause(); - - expectedData--; - if (expectedData <= 0) - return; - - setImmediate(function() { - rs.removeListener('data', ondata); - readAndPause(); - rs.resume(); - }); - }, 1); // only call ondata once - - rs.on('data', ondata); -}