Skip to content

[SPARK-56227][CORE] Fix GcmTransportCipher to correctly handle multiple messages per channel#55621

Open
aajisaka wants to merge 3 commits intoapache:branch-3.5from
aajisaka:fix-rpc-encryption-branch-3.5
Open

[SPARK-56227][CORE] Fix GcmTransportCipher to correctly handle multiple messages per channel#55621
aajisaka wants to merge 3 commits intoapache:branch-3.5from
aajisaka:fix-rpc-encryption-branch-3.5

Conversation

@aajisaka
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Backport #55028 to branch-3.5 for testing.

Why are the changes needed?

To successfully run Spark jobs on YARN with spark.network.crypto.cipher="AES/GCM/NoPadding"

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new regression tests.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

aajisaka and others added 2 commits April 30, 2026 16:26
…le messages per channel

Three bugs in `GcmTransportCipher` cause failures in production YARN clusters
when AES-GCM RPC encryption is enabled (`spark.network.crypto.cipher=AES/GCM/NoPadding`).

**Bug 1 — DecryptionHandler is single-use per channel (YARN container launch failure)**

After decoding the first post-auth message, `completed = true` was never reset.
`AesGcmHkdfStreaming` is a one-shot streaming primitive: each GCM message carries its
own random IV and requires a fresh `StreamSegmentDecrypter`. With `decrypter` declared
`final` and all guard flags stuck at their terminal values, every subsequent message
on the channel was silently discarded.

Fix: make `decrypter` non-final, add `resetForNextMessage()` that reinstates all
per-message state (including a fresh `StreamSegmentDecrypter`), and call it after each
successfully decoded message.

**Bug 2 — TCP-coalesced messages lost (SparkSQL IllegalStateException)**

When TCP delivers multiple back-to-back GCM messages in a single `channelRead()` call
(common under shuffle load), the old code released the `ByteBuf` after decoding the
first message, discarding any remaining bytes. The next `channelRead()` then read bytes
from the middle of the second message as its length header, producing a negative `long`
and throwing `IllegalStateException("Invalid expected ciphertext length")`.

Fix: wrap the decode logic in an outer `while(true)` loop that exhausts all complete
messages from the buffer before releasing it; call `resetForNextMessage()` inside the
loop between messages.

**Bug 3 — TCP-fragmented frame header causes IndexOutOfBoundsException (benchmark)**

`ByteBuf.readBytes(ByteBuffer dst)` requires exactly `dst.remaining()` bytes to be
present and throws `IndexOutOfBoundsException` if the source is shorter. Under high
load, TCP can fragment a GCM message's 24-byte internal header (or 8-byte length prefix)
across multiple `channelRead()` calls. The code incorrectly assumed `readBytes` would
stop early and leave `hasRemaining() == true`.

Fix: compute `toRead = Math.min(readable, dst.remaining())`, temporarily narrow
`dst.limit` to `position + toRead`, call `readBytes(dst)`, then restore `limit`.

**Bug 4 — EncryptionHandler shares mutable buffers across GcmEncryptedMessage instances**

`plaintextBuffer` and `ciphertextBuffer` were `EncryptionHandler` fields reused across
all `GcmEncryptedMessage` instances. Under Netty's write pipeline a new message can be
constructed (via `write()`) before a prior one's `transferTo()` completes; the new
constructor's `ciphertextBuffer.limit(0)` would corrupt the in-flight message's buffer.

Fix: allocate `plaintextBuffer` and `ciphertextBuffer` inside the `GcmEncryptedMessage`
constructor so each message owns its own buffers.

- Cache `headerLength` in `DecryptionHandler` to avoid repeated `getHeaderLength()` calls
- Replace `Integer.min()` with `Math.min()` for style consistency

- `testMultipleMessages`: regression for Bug 1 — same `DecryptionHandler` decodes two
  independent messages delivered via separate `channelRead()` calls
- `testBatchedMessages`: regression for Bug 2 — two ciphertexts concatenated into one
  `ByteBuf` and delivered in a single `channelRead()` call
- `testSplitHeader`: regression for Bug 3 — ciphertext split at byte 12 (8-byte length
  field + 4 bytes into the 24-byte GCM header) across two `channelRead()` calls

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nd reduce EventLoop callbacks

Two follow-up fixes to SPARK-56227 (`GcmTransportCipher`):

**Fix 1 — `encryptedCount` miscalculation for plaintext sizes in (32728, 32752]**

`GcmEncryptedMessage.encryptedCount` was computed as:
  `LENGTH_HEADER_BYTES + expectedCiphertextSize(P)`

Tink's `expectedCiphertextSize(P)` internally adds `getCiphertextOffset()` (24 bytes) to P
before dividing by `plaintextSegmentSize` to count segments. For P in (32728, 32752], this
predicts two ciphertext segments, but `transferTo()` writes the Tink header separately and
passes all P bytes to a single `encryptSegment()` call, producing only one segment. The
resulting `encryptedCount` was inflated by `TAG_SIZE_IN_BYTES` (16 bytes). After all ciphertext
was written, `count() > transferred()`, so subsequent `transferTo()` calls returned 0 and the
receiver stalled waiting indefinitely for 16 bytes that were never sent.

Fix: subtract `getCiphertextOffset()` from the argument to `expectedCiphertextSize()` and add
`getHeaderLength()` explicitly.

**Fix 2 — Executor heartbeat timeout under concurrent shuffle load**

`DecryptionHandler` called `ctx.fireChannelRead()` once per 32 KB ciphertext segment. Decoding
a 10 MB shuffle block produced ~320 synchronous EventLoop callbacks within a single
`processSelectedKeys()` invocation. With 50+ concurrent shuffle connections, the Netty EventLoop
thread was occupied for seconds at a stretch, leaving no time for `runAllTasks()` to execute the
executor-driver heartbeat task.

Fix: accumulate all decrypted segments zero-copy into a `CompositeByteBuf` and issue a single
`ctx.fireChannelRead()` when the complete message is decoded. `maxNumComponents` is set to
`Integer.MAX_VALUE` to disable `consolidateIfNeeded()` (which would otherwise copy all data
on O(n²) schedule once the default cap of 16 components is exceeded).

- `testEncryptedCountBoundary`: encrypts and decrypts messages at 32729, 32740, and 32752 bytes,
  asserting `count() == transferred()` after encryption and correct round-trip for each.
- `testSingleFirePerMessage`: encrypts a 5-segment plaintext and asserts exactly one
  `fireChannelRead` callback is issued for the full message.
- Existing tests updated: `times(2)` → `times(1)` and assertions adjusted for full-message
  `CompositeByteBuf` in `testGcmEncryptedMessage`, `testGcmEncryptedMessageFileRegion`,
  and `testGcmUnalignedDecryption`.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…compatibility

When compiled with a JDK 11/17 using -source 8 -target 8 (without --release 8),
calls like byteBuffer.flip() emit invokevirtual ByteBuffer.flip():ByteBuffer —
a descriptor that does not exist in Java 8's ByteBuffer. At Java 8 runtime this
throws NoSuchMethodError, causing the External Shuffle Service (which runs in the
NodeManager under Java 8) to drop connections.

Fix: add ((Buffer) ...) casts in all new/rewritten code sections introduced by
the previous commits — resetForNextMessage(), initializeExpectedLength(),
initializeDecrypter(), and the channelRead() inner loop — to match the pattern
already used throughout the rest of the file.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@aajisaka
Copy link
Copy Markdown
Member Author

aajisaka commented May 1, 2026

Pushed 1 commit for Java 8 compatibility

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant