Skip to content

[VL] Remove invalid code path for shuffle read#12403

Closed
marin-ma wants to merge 1 commit into
apache:mainfrom
marin-ma:cleanup-shuffle-reader-codepath
Closed

[VL] Remove invalid code path for shuffle read#12403
marin-ma wants to merge 1 commit into
apache:mainfrom
marin-ma:cleanup-shuffle-reader-codepath

Conversation

@marin-ma

@marin-ma marin-ma commented Jun 30, 2026

Copy link
Copy Markdown
Contributor

Refine code paths for shuffle reader and deserialiser:

  1. ColumnarShuffleReader only accept shuffle dependency as ColumnarShuffleDependency. Other type of dependencies are invalid.
  2. ColumnarBatchSerializer calls native reader to deserialise input streams in batch. deserializeStream is not used.

Copilot AI review requested due to automatic review settings June 30, 2026 11:33
@github-actions github-actions Bot added the VELOX label Jun 30, 2026

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refines the Velox backend shuffle read/deserialization flow by removing the fallback (row-based) deserialization path and tightening expectations so that the shuffle reader uses Gluten’s columnar serializer/reader path only.

Changes:

  • Simplified ColumnarShuffleReader (and the Delta optimized-writer shuffle reader) to always use ColumnarBatchSerializerInstance.deserializeStreams(...) for batch deserialization.
  • Removed the now-unused deserializeStream(...) implementation from the Velox ColumnarBatchSerializer instance implementation.
  • Centralized “unsupported” SerializerInstance APIs in ColumnarBatchSerializerInstance (now throwing UnsupportedOperationException).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala Removes non-columnar dependency fallback and always deserializes via deserializeStreams.
backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializerInstance.scala Marks generic SerializerInstance APIs as unsupported for this columnar shuffle serializer instance.
backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala Removes the unused single-stream deserialization and redundant unsupported overrides from the concrete serializer instance.
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala Aligns Delta optimized writer shuffle reader with the columnar-only deserialization path.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +32 to 35
// These methods are never called by shuffle code.
override def serialize[T: ClassTag](t: T): ByteBuffer = {
throw new UnsupportedOperationException
}
Copilot AI review requested due to automatic review settings June 30, 2026 11:48

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

Comment on lines +91 to +95
fetchContinuousBlocksInBatch
).toCompletionIterator

val recordIter = dep match {
case columnarDep: ColumnarShuffleDependency[K, _, C] =>
// If the dependency is a ColumnarShuffleDependency, we use the columnar serializer.
columnarDep.serializer
.newInstance()
.asInstanceOf[ColumnarBatchSerializerInstance]
.deserializeStreams(wrappedStreams)
.asKeyValueIterator
case _ =>
val serializerInstance = dep.serializer.newInstance()
// Create a key/value iterator for each stream
wrappedStreams.flatMap {
case (blockId, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
val columnarDep = dep match {
case d: ColumnarShuffleDependency[_, _, _] =>
Comment on lines +337 to +341
@@ -338,25 +338,18 @@ private class GlutenOptimizedWriterShuffleReader(
).toCompletionIterator

// Create a key/value iterator for each stream
val recordIter = dep match {
case columnarDep: ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] =>
// If the dependency is a ColumnarShuffleDependency, we use the columnar serializer.
columnarDep.serializer
.newInstance()
.asInstanceOf[ColumnarBatchSerializerInstance]
.deserializeStreams(wrappedStreams)
.asKeyValueIterator
case _ =>
val serializerInstance = dep.serializer.newInstance()
// Create a key/value iterator for each stream
wrappedStreams.flatMap {
case (blockId, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
val columnarDep = dep match {
Copilot AI review requested due to automatic review settings June 30, 2026 11:55

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

Copilot AI review requested due to automatic review settings June 30, 2026 12:30
@marin-ma marin-ma force-pushed the cleanup-shuffle-reader-codepath branch from f5676f6 to f17d014 Compare June 30, 2026 12:30

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.

@marin-ma

Copy link
Copy Markdown
Contributor Author

Closing this as deserializeStream is still used by uniffle.

@marin-ma marin-ma closed this Jun 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants