[spark] Make LakeSplit extend Serializable to simplify Spark serialization#3123
Merged
luoyuxia merged 7 commits intoapache:mainfrom Apr 24, 2026
Merged
[spark] Make LakeSplit extend Serializable to simplify Spark serialization#3123luoyuxia merged 7 commits intoapache:mainfrom
luoyuxia merged 7 commits intoapache:mainfrom
Conversation
…ation LakeSplit objects were previously serialized to Array[Byte] via SimpleVersionedSerializer on the Spark driver side, stored in InputPartition case classes, then deserialized on executors using a re-created serializer. This added unnecessary complexity and coupling. Since both Paimon's DataSplit and Iceberg's FileScanTask are already java.io.Serializable, LakeSplit can safely extend Serializable, allowing Spark to transport splits directly via Java serialization. Changes: - LakeSplit extends java.io.Serializable; PaimonSplit/TestingLakeSplit add serialVersionUID - InputPartition case classes use LakeSplit directly instead of Array[Byte] - Remove splitSerializer from batch/reader method chains - Remove serializeLakeSplits/deserializeLakeSplits from FlussLakeUtils - Add LakeSplitSerializationTest Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5956695 to
727afbb
Compare
beryllw
reviewed
Apr 21, 2026
beryllw
reviewed
Apr 21, 2026
beryllw
reviewed
Apr 21, 2026
Contributor
|
Thanks for the pr. LGTM! |
Use a unique table name in testJavaSerializationRoundTrip to avoid AlreadyExistsException when running alongside testSerializeAndDeserialize, since both tests share a static Iceberg catalog without per-test cleanup. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…lation Same issue as the Iceberg counterpart: testJavaSerializationRoundTrip shared DEFAULT_TABLE with testSerializeAndDeserialize. While Paimon's createTable uses ignoreIfExists=true so it wouldn't throw, the second test would silently append to the existing table, breaking test isolation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…nique table names Drop the default table before each test method in IcebergSourceTestBase and PaimonSourceTestBase to ensure test isolation. This is a more robust approach than requiring each test to use a unique table name. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
luoyuxia
reviewed
Apr 24, 2026
Contributor
luoyuxia
left a comment
There was a problem hiding this comment.
@YannByron Thanks for the pr. Left minor comments.
|
|
||
| /** Split for Iceberg table. */ | ||
| public class IcebergSplit implements LakeSplit, Serializable { | ||
| private static final long serialVersionUID = 1L; |
Contributor
There was a problem hiding this comment.
nit: why remove this serialVersionUID?
And we may also add serialVersionUID for PaimonSplit
Ugbot
pushed a commit
to Ugbot/fluss
that referenced
this pull request
Apr 26, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
LakeSplitextendjava.io.Serializableso Spark can transport splits directly instead of manual byte-level serialize/deserialize viaSimpleVersionedSerializerlakeSplitBytes: Array[Byte]withlakeSplit: LakeSplit/lakeSplits: java.util.List[LakeSplit]in InputPartition case classesserializeLakeSplits/deserializeLakeSplitsfromFlussLakeUtilsandsplitSerializerfrom batch/reader method chainsCloses #3122
Test plan
LakeSplitSerializationTest— verifiesTestingLakeSplitround-trips through Java serializationfluss-common,fluss-lake-paimon,fluss-lake-iceberg,fluss-spark-commonall compilefluss-spark/fluss-spark-ut/🤖 Generated with Claude Code