Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/FUNDING.yml

This file was deleted.

3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ jobs:
contents: read

prettier:
if: ${{ github.event_name == 'push' || github.event.pull_request.head.repo.full_name == github.repository }}
uses: haraka/.github/.github/workflows/prettier.yml@master
permissions:
contents: write
with:
branch: ${{ github.head_ref }}
branch: ${{ github.event_name == 'pull_request' && github.head_ref || github.ref_name }}

coverage:
uses: haraka/.github/.github/workflows/coverage.yml@master
Expand Down
2 changes: 1 addition & 1 deletion .release
Submodule .release updated 2 files
+0 −12 js/bots.txt
+10 −12 js/contributors.cjs
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/).

### Unreleased

### [2.0.3] - 2026-04-23

Comment thread
msimerson marked this conversation as resolved.
- register `transformer.once('end'...)` before piping #21

### [2.0.2] - 2026-04-08

- fix: limit header size to prevent memory exhaustion
Expand Down Expand Up @@ -115,3 +119,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/).
[2.0.0]: https://github.com/haraka/message-stream/releases/tag/v2.0.0
[2.0.1]: https://github.com/haraka/message-stream/releases/tag/v2.0.1
[2.0.2]: https://github.com/haraka/message-stream/releases/tag/v2.0.2
[2.0.3]: https://github.com/haraka/message-stream/releases/tag/v2.0.3
2 changes: 1 addition & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This handcrafted artisanal software is brought to you by:

| <img height="80" src="https://avatars.githubusercontent.com/u/261635?v=4"><br><a href="https://github.com/msimerson">msimerson</a> (<a href="https://github.com/haraka/message-stream/commits?author=msimerson">15</a>) | <img height="80" src="https://avatars.githubusercontent.com/u/14638441?v=4"><br><a href="https://github.com/bjarn">bjarn</a> (<a href="https://github.com/haraka/message-stream/commits?author=bjarn">1</a>) |
| <img height="80" src="https://avatars.githubusercontent.com/u/261635?v=4"><br><a href="https://github.com/msimerson">msimerson</a> (<a href="https://github.com/haraka/message-stream/commits?author=msimerson">16</a>) | <img height="80" src="https://avatars.githubusercontent.com/u/14638441?v=4"><br><a href="https://github.com/bjarn">bjarn</a> (<a href="https://github.com/haraka/message-stream/commits?author=bjarn">1</a>) |
| :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------: |

<sub>this file is generated by [.release](https://github.com/msimerson/.release).
Expand Down
7 changes: 5 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ class MessageStream extends Stream {
const source = new PassThrough()
this.#currentSource = source

source.pipe(transformer).pipe(destination, { end: options.end !== false })

// Register before pipe() so these fire before the pipe's own 'end' handler,
// which calls destination.end() — potentially triggering a synchronous next()
// that would attempt a new pipe() while #inPipe is still true.
transformer.once('end', () => {
this.#inPipe = false
})
Expand All @@ -218,6 +219,8 @@ class MessageStream extends Stream {
})
source.once('error', (err) => this.emit('error', err))

source.pipe(transformer).pipe(destination, { end: options.end !== false })

// Constructor headers suppress raw-data headers; skip_headers also suppresses ctor headers
const emitCtorHeaders = this.headers.length > 0 && !skipHeaders
const skipRawHeaders = this.headers.length > 0 || skipHeaders
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "haraka-message-stream",
"version": "2.0.2",
"version": "2.0.3",
"description": "Haraka email message stream",
"main": "index.js",
"files": [
Expand Down
41 changes: 41 additions & 0 deletions test/message-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,44 @@ describe('pipe end option', () => {
)
})
})

describe('sequential pipe', () => {
it('allows a second pipe started synchronously from the first pipe end callback', (t, done) => {
// Regression test for haraka/Haraka#3551:
// When destination.end() is called synchronously inside the 'end' listener
// registered by pipe(), and that callback triggers another pipe(), the
// #inPipe guard must already be cleared or it throws "Cannot pipe while
// currently piping".
const ms = new MessageStream({ main: {} }, 'msg', [])
ms.add_line('Subject: test\r\n')
ms.add_line('\r\n')
ms.add_line('body\r\n')
ms.add_line_end()

const chunks1 = []
const chunks2 = []

// First destination: a writable that triggers a second pipe synchronously
// inside its end() — simulating what DKIMSignStream does.
const dest1 = new stream.Writable({
write(chunk, _enc, cb) {
chunks1.push(chunk.toString())
cb()
},
final(cb) {
cb()
// Synchronously start a second pipe, just like DKIMSignStream's callback
// calls next() which leads to process_delivery which pipes the stream again.
const dest2 = new stream.PassThrough()
dest2.on('data', (c) => chunks2.push(c.toString()))
dest2.on('end', () => {
assert.ok(chunks2.join('').length > 0, 'second pipe received data')
done()
})
ms.pipe(dest2)
},
})

ms.pipe(dest1)
})
})
Loading