[spark] Refactor BatchWrite subclasses into base logic + per-version wrappers#7723
Conversation
b4569e5 to
d274193
Compare
There was a problem hiding this comment.
Pull request overview
Refactors Spark V2 BatchWrite implementations to avoid Spark 4.1’s inherited BatchWrite.commit(.., WriteSummary) signature from leaking into classes used on Spark 4.0 runtimes (which can trigger ClassNotFoundException: WriteSummary during task serialization). The change centralizes write business logic in Spark-version-agnostic base classes and moves the extends BatchWrite mixin into per-version thin wrappers constructed via SparkShim factories.
Changes:
- Introduce
PaimonBatchWriteBase/FormatTableBatchWriteBaseinpaimon-spark-common(do not extendBatchWrite) and add per-version wrapper classes that mix inBatchWrite. - Add
SparkShim.createPaimonBatchWriteandSparkShim.createFormatTableBatchWritefactories and route call sites throughSparkShimLoader. - Add Spark 4.0 shadow wrappers (and shim wiring) to ensure Spark 4.0-target-compiled class metadata is used at runtime.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala | Implements new SparkShim factories for Spark 4.x to construct version-compiled BatchWrite wrappers. |
| paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala | Spark 4.x thin BatchWrite wrapper delegating to PaimonBatchWriteBase. |
| paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala | Spark 4.x thin BatchWrite wrapper delegating to FormatTableBatchWriteBase. |
| paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala | Implements new SparkShim factories for Spark 3.x to construct version-compiled BatchWrite wrappers. |
| paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala | Spark 3.x thin BatchWrite wrapper delegating to PaimonBatchWriteBase. |
| paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala | Spark 3.x thin BatchWrite wrapper delegating to FormatTableBatchWriteBase. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala | Adds BatchWrite factory methods to route instantiation through per-version shims. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala | Switches V2 batch write construction to SparkShimLoader.shim.createPaimonBatchWrite. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWriteBase.scala | Extracts BatchWrite logic into a Spark-version-agnostic base class (no extends BatchWrite). |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala | Switches FormatTable batch write construction to SparkShimLoader.shim.createFormatTableBatchWrite and removes the inline BatchWrite impl. |
| paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala | Extracts FormatTable BatchWrite logic into a base class (no extends BatchWrite). |
| paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala | Spark 4.0 shim override wiring the new factories to Spark-4.0-compiled wrappers. |
| paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala | Spark 4.0-compatible shadow BatchWrite wrapper delegating to PaimonBatchWriteBase. |
| paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWrite.scala | Spark 4.0-compatible shadow BatchWrite wrapper delegating to FormatTableBatchWriteBase. |
Comments suppressed due to low confidence (2)
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala:74
batchWriteBuilder.newCommit()returns a commit object that should be closed. Here the commit is never closed (including on exceptions), which can leak resources/file handles. Wrap the commit in try/finally (or equivalent) and close it in the finally block while preserving the existing error logging.
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/FormatTableBatchWriteBase.scala:86abortMessagescreates a new commit viabatchWriteBuilder.newCommit()but never closes it. Close the commit in a finally block after aborting to avoid leaking resources.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d274193 to
642014f
Compare
|
Let me check it |
JingsongLi
left a comment
There was a problem hiding this comment.
Review: base + per-version wrapper refactoring for BatchWrite
Overall
The approach is sound. Spark 4.1's addition of a default method BatchWrite.commit(.., WriteSummary) means any class compiled against 4.1 that mixes in BatchWrite will carry the WriteSummary reference in its method table, breaking Spark 4.0 at runtime via lazy-linking. Splitting the implementation into a base class (no extends BatchWrite) in paimon-spark-common plus thin per-version wrappers compiled against each Spark version is the correct fix. The code is well-documented with clear scaladoc explaining the rationale.
Correctness
-
commitStartedvisibility change (private -> protected): Correct and necessary sinceabortMessagesnow lives in the base class and checks this flag. -
FormatTableBatchWriteBase.abortMessageshas nocommitStartedguard: This is consistent with the originalFormatTableBatchWritewhich also lacked the guard. ThePaimonBatchWriteBase.abortMessagesretains it. No regression here. -
FormatTableDataWriterrelocation: The class moved out ofPaimonFormatTable.scala(in both the 4.0 shadow and common) intoFormatTableBatchWriteBase.scalain common. This keeps the writer co-located with the batch-write base logic which makes sense, but please verify it is visible to per-version wrappers that callcreateFormatTableDataWriterFactory().
Design observations
-
Companion
object PaimonBatchWrite.applyin per-version modules: After this PR, all instantiation routes throughSparkShimLoader.shim.createPaimonBatchWrite(...)which callsnew PaimonBatchWrite(...)directly. The companionapplymethods appear unused. If they exist solely for binary compatibility with call-sites that used the formercase classconstructor syntax, please confirm whether any such call-sites remain; otherwise these could be removed to avoid confusion. -
with Serializableon both base classes: SinceBatchWriteinstances live exclusively on the driver (onlyDataWriterFactoryis serialized to executors),Serializableis not strictly required. It is harmless and preserves the implicit contract of the formercase class, but worth noting. The transitive serializability ofSparkMetricRegistryandBatchWriteBuilderheld byPaimonBatchWriteBasecould cause surprisingNotSerializableExceptions if something does accidentally attempt serialization. -
Source duplication across wrappers: The three
PaimonBatchWritewrappers (spark3-common, spark4-common, spark-4.0) and threeFormatTableBatchWritewrappers are near-identical. This is unavoidable given the design constraint. A brief comment at the top of each (already present -- good) explaining this is a deliberate compile-target copy helps future maintainers.
Minor nits
- In the
paimon-spark-4.0PaimonBatchWrite.scalascaladoc, the phrase "maven shade order pickspaimon-spark-4.0/target/classesahead of the shaded 4-common copy" is an implementation detail that may become stale; consider referencing the build mechanism more generically (e.g., "classpath ordering ensures the 4.0 shadow takes precedence"). - The
FormatTableBatchWriteBaseconstructor exposesbatchWriteBuilderasprotected val. If no subclass other than the thin wrappers needs it,private[format]would tighten visibility.
Summary
Well-structured refactoring that correctly isolates the Spark version-specific mixin. The pattern is consistent across both affected classes. No functional regressions observed. Only cosmetic/hygiene items noted above.
|
+1 |
Purpose
Follow-up of #7648 (Spark 4.1 module) and a sibling of #7721. After landing the reverse-shim layout, two of the files under
paimon-spark-4.0/src/mainonly existed as shadows because their compilation unit defined a Scala class thatextends BatchWrite. Spark 4.1 added a default methodBatchWrite.commit(WriterCommitMessage[], WriteSummary)whoseWriteSummaryparameter type does not exist on Spark 4.0; a class compiled against 4.1 that mixes inBatchWritecarries the inheritedcommit(.., WriteSummary)signature in its method table, which JVMObjectStreamClass.getPrivateMethodlazy-links during Spark task serialization and crashes 4.0 withClassNotFoundException: WriteSummary.This PR refactors both affected classes into the same base + per-version wrapper pattern:
PaimonBatchWrite(used by V2 writes)FormatTableBatchWrite(used byFormatTableV2 writes — was previously aprivate case classinsidePaimonFormatTable.scala)For each, the body lives in a new abstract base in
paimon-spark-commonthat deliberately does not extendBatchWrite(renamed protected helpers:commitMessages,abortMessages,createPaimonDataWriterFactory,createFormatTableDataWriterFactory). Each per-version module (paimon-spark3-common,paimon-spark4-common,paimon-spark-4.0/src/main) ships a thin wrapper that mixes inBatchWriteand forwards the fourBatchWritemethods to the base helpers. Routing happens through two newSparkShimfactories so each Spark version's scalac compiles the rightextends BatchWritemixin.The Spark 4.0 shadow of
PaimonFormatTable.scalais no longer needed and is deleted; only the new thinFormatTableBatchWrite.scalawrapper remains underpaimon-spark-4.0/src/main.Tests
CI
API and Format
No new public API. Two internal factories added to
org.apache.spark.sql.paimon.shims.SparkShim:createPaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, copyOnWriteScan)createFormatTableBatchWrite(table, overwriteDynamic, overwritePartitions, writeSchema)Documentation
No user-facing changes.