Skip to content

[SPARK-56674][SS] Add streaming shuffle wire protocol#55620

Open
jerrypeng wants to merge 1 commit intoapache:masterfrom
jerrypeng:stack/streaming-shuffle-pr1-network-protocol
Open

[SPARK-56674][SS] Add streaming shuffle wire protocol#55620
jerrypeng wants to merge 1 commit intoapache:masterfrom
jerrypeng:stack/streaming-shuffle-pr1-network-protocol

Conversation

@jerrypeng
Copy link
Copy Markdown
Contributor

@jerrypeng jerrypeng commented Apr 30, 2026

What changes were proposed in this pull request?

Introduces a new Java package org.apache.spark.network.shuffle.streaming in the network-common module with the complete binary wire protocol for streaming shuffle:

  • StreamingShuffleMessageType: enum of 4 message IDs (DATA, CREDIT_CONTROL, TERMINATION_CONTROL, TERMINATION_ACK)
  • StreamingShuffleMessage: abstract sealed base class; encodes a common 12-byte header (message-type int + sequence-number long); dispatches decode() to concrete subclasses
  • DataMessage: carries serialized records from writer to reader; header includes shuffleWriterId, shuffleReaderId, dataSize, and CRC32C checksum
  • CreditControlMessage: flow-control signal from reader to writer; also serves as the initial handshake on connection establishment
  • TerminationControlMessage: end-of-stream from writer to reader
  • TerminationAckMessage: end-of-stream acknowledgment from reader to writer, echoing the last sequence number seen
  • ShuffleChecksum: stateful CRC32C helper supporting both heap and direct ByteBuf inputs

All classes are pure Java and depend only on Netty (already present in network-common) and the JDK's java.util.zip.CRC32C (Java 9+).

Adds the STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE error condition, which StreamingShuffleMessage.decode() throws if the wire's message-type ordinal does not map to a known StreamingShuffleMessageType.

Why are the changes needed?

This is one part of creating the Streaming Shuffle needed for Real-time Mode (RTM).

The streaming shuffle contains a fair amount of code so I plan on creating a series of PRs each containing reviewable chunks of the implementation. This is the plan:

PR 1 — stack/streaming-shuffle-pr1-network-protocol introduces the streaming shuffle wire protocol as pure Java in the network-common module: seven protocol classes
(StreamingShuffleMessage, DataMessage, CreditControlMessage, TerminationControlMessage, TerminationAckMessage, StreamingShuffleMessageType, ShuffleChecksum) plus an encode/decode test
suite, and registers the STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE error class. The protocol depends only on Netty (already in network-common) and java.util.zip.CRC32C, with no other
Spark dependencies.

PR 2 — stack/streaming-shuffle-pr2-tracker adds the driver-side coordination service StreamingShuffleOutputTracker, including its Master, Worker, MasterEndpoint, location records
(StreamingShuffleTaskLocation, ShuffleLocationResponse, StreamingShuffleInfo), the NUM_MAPPERS/NUM_REDUCERS/TASK_LOCATION log keys, and an 11-test suite covering both RPC and direct
in-process behavior.

PR 3 — stack/streaming-shuffle-pr3-shuffle-manager is the plugin entry point: StreamingShuffleHandle, StreamingShuffleManager (with getWriter/getReader as UnsupportedOperationException
stubs that later PRs replace), the bidirectional Netty StreamingShuffleServerHandler and StreamingShuffleClientHandler, and the TaskContextAwareLogging mixin. It also adds a
TransportClient.send(ByteBuf) overload (changing send(ByteBuffer) to return ChannelFuture for listener attachment), the STREAMING_QUERY_ID log key, and the
STREAMING_SHUFFLE_INCORRECT_SEQUENCE_NUMBER error class.

PR 4 — stack/streaming-shuffle-pr4-sparkenv-integration wires the tracker into SparkEnv (adds _streamingShuffleOutputTracker plus the initializeStreamingShuffleOutputTracker() lifecycle
method, invoked from initializeShuffleManager) and into DAGScheduler.createShuffleMapStage, which now registers the shuffle with the tracker when one is present.

PR 5 — stack/streaming-shuffle-pr5-writer adds the ~480-line StreamingShuffleWriter — a per-task TransportServer, per-partition TimestampedBuffer instances, semaphore-based memory
backpressure, a time-based flush thread, and the termination handshake — and replaces the getWriter stub from PR 3 with the real implementation. It also adds the four writer configs
(spark.shuffle.streaming.{checksum.enabled, networkBufferSize, networkBufferMaxWaitTimeMs, writerMaxMemory}), exposes TransportServer.getPooledByteBufAllocator, and adds four log keys
(SHUFFLE_WRITER_ID, SHUFFLE_READER_ID, NUM_SHUFFLE_READERS, NUM_TERMINATION_ACKS).

PR 6 — stack/streaming-shuffle-pr6-reader adds the ~500-line StreamingShuffleReader with parallel client creation via a task-discovery thread, a LinkedBlockingQueue-backed message queue,
per-message CRC32C verification, sequence-number enforcement, and executor-lifecycle cleanup — and replaces the getReader stub from PR 3 with the real implementation. It also adds the
readerMaxMemory config, the STREAMING_SHUFFLE_CHECKSUM_VERIFICATION_FAILED error class, and three more log keys (NUM_CONNECTED_SHUFFLE_WRITERS, NUM_SHUFFLE_WRITERS, SHUFFLE_WRITERS).

PR 7 — stack/streaming-shuffle-pr7-multi-manager introduces MultiShuffleManager (and MultiShuffleHandle), a routing wrapper around SortShuffleManager and StreamingShuffleManager that
selects per-shuffle based on the task-local property spark.shuffle.streaming.useForCurrentQuery and caches the decision per shuffleId. It also updates
SparkEnv.initializeStreamingShuffleOutputTracker to additionally recognize MultiShuffleManager as a tracker-requiring manager.

PR 8 — stack/streaming-shuffle-pr8-tests is the ~1300-line StreamingShuffleSuite covering 30 end-to-end scenarios: server lifecycle, the SC-165888 race fix, read/write correctness,
backpressure, the termination handshake, error propagation through reader/writer/Netty threads, checksum failure, resource cleanup on a variety of task-completion timings, and
client-factory/EventLoopGroup shutdown. It also exposes TransportServer.channelFuture() (used only by tests).

PR 9 — stack/streaming-shuffle-pr9-docs adds docs/streaming-shuffle.md (~370 lines): when to use streaming shuffle, the architecture, the wire-protocol layout, coordination model,
backpressure, termination handshake, memory management, checksum integrity, the MultiShuffleManager routing model, the full spark.shuffle.streaming.* configuration reference, a comparison
table with sort shuffle, and the explicit list of limitations.

The final product can be viewed here for context:

https://github.com/apache/spark/compare/master...jerrypeng:spark:stack/streaming-shuffle-pr9-docs?expand=1

Does this PR introduce any user-facing change?

no

How was this patch tested?

Unit tests cover encode/decode round-trips for all message types, determinism of the checksum, heap/direct buffer equivalence, and the unknown-type error path.

Was this patch authored or co-authored using generative AI tooling?

No

…ommon)

Introduces a new Java package org.apache.spark.network.shuffle.streaming
in the network-common module with the complete binary wire protocol for
streaming shuffle:

- StreamingShuffleMessageType: enum of 4 message IDs (DATA, CREDIT_CONTROL,
  TERMINATION_CONTROL, TERMINATION_ACK)
- StreamingShuffleMessage: abstract sealed base class; encodes a common
  12-byte header (message-type int + sequence-number long); dispatches
  decode() to concrete subclasses
- DataMessage: carries serialized records from writer to reader; header
  includes shuffleWriterId, shuffleReaderId, dataSize, and CRC32C checksum
- CreditControlMessage: flow-control signal from reader to writer; also
  serves as the initial handshake on connection establishment
- TerminationControlMessage: end-of-stream from writer to reader
- TerminationAckMessage: end-of-stream acknowledgment from reader to
  writer, echoing the last sequence number seen
- ShuffleChecksum: stateful CRC32C helper supporting both heap and
  direct ByteBuf inputs

All classes are pure Java and depend only on Netty (already present in
network-common) and the JDK's java.util.zip.CRC32C (Java 9+).

Adds the STREAMING_SHUFFLE_UNEXPECTED_MESSAGE_TYPE error condition,
which StreamingShuffleMessage.decode() throws if the wire's message-type
ordinal does not map to a known StreamingShuffleMessageType.

Unit tests cover encode/decode round-trips for all message types,
determinism of the checksum, heap/direct buffer equivalence, and the
unknown-type error path.

Co-authored-by: Isaac
@jerrypeng jerrypeng force-pushed the stack/streaming-shuffle-pr1-network-protocol branch from c13f536 to fd91d36 Compare April 30, 2026 17:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant