Skip to content

[Bug] Fix ChainSplit NPE after branch table cache invalidation #7299

@juntaozhang

Description

@juntaozhang

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

main

Compute Engine

spark 3.5

Minimal reproduce step

spark-sql (default)> CREATE TABLE default.t (
                   >     `t1` string ,
                   >     `t2` string ,
                   >     `t3` string
                   > ) PARTITIONED BY (`date` string)
                   > TBLPROPERTIES (
                   >   'chain-table.enabled' = 'true',
                   >   -- props about primary key table
                   >   'primary-key' = 'date,t1',
                   >   'sequence.field' = 't2',
                   >   'bucket-key' = 't1',
                   >   'bucket' = '2',
                   >   -- props about partition
                   >   'partition.timestamp-pattern' = '$date',
                   >   'partition.timestamp-formatter' = 'yyyyMMdd'
                   > );
26/02/24 13:05:12 WARN Mimetypes: Unable to find 'mime.types' file in classpath
Time taken: 0.823 seconds
spark-sql (default)> CALL sys.create_branch('default.t', 'snapshot');
true
Time taken: 0.725 seconds, Fetched 1 row(s)
spark-sql (default)>
                   > CALL sys.create_branch('default.t', 'delta');
true
Time taken: 0.441 seconds, Fetched 1 row(s)
spark-sql (default)> ALTER TABLE default.t SET tblproperties
                   >     ('scan.fallback-snapshot-branch' = 'snapshot',
                   >      'scan.fallback-delta-branch' = 'delta');
Time taken: 0.961 seconds
spark-sql (default)>
                   > ALTER TABLE `default`.`t$branch_snapshot` SET tblproperties
                   >     ('scan.fallback-snapshot-branch' = 'snapshot',
                   >      'scan.fallback-delta-branch' = 'delta');
Time taken: 0.667 seconds
spark-sql (default)>
                   > ALTER TABLE `default`.`t$branch_delta` SET tblproperties
                   >     ('scan.fallback-snapshot-branch' = 'snapshot',
                   >      'scan.fallback-delta-branch' = 'delta');
Time taken: 0.954 seconds
spark-sql (default)> insert overwrite `default`.`t$branch_snapshot` partition (date = '20250810')
                   >     values ('1', '1', '1');
Time taken: 24.562 seconds
spark-sql (default)> insert overwrite `default`.`t$branch_delta` partition (date = '20250811')
                   >     values ('2', '1', '1');
Time taken: 21.339 seconds
spark-sql (default)>
                   > select t1, t2, t3 from default.t where date = '20250811'
                   > ;
26/02/24 13:06:21 ERROR TaskSetManager: Failed to serialize task 4, not attempting to retry it.
java.lang.NullPointerException
	at java.base/java.io.DataOutputStream.writeUTF(Unknown Source)
	at java.base/java.io.DataOutputStream.writeUTF(Unknown Source)
	at org.apache.paimon.table.source.ChainSplit.serialize(ChainSplit.java:146)
	at org.apache.paimon.table.source.ChainSplit.writeObject(ChainSplit.java:115)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:516)
	at jdk.internal.reflect.GeneratedMethodAccessor130.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
	at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:516)
	at jdk.internal.reflect.GeneratedMethodAccessor130.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)
	at org.apache.spark.scheduler.TaskSetManager.prepareLaunchingTask(TaskSetManager.scala:530)
	at org.apache.spark.scheduler.TaskSetManager.$anonfun$resourceOffer$2(TaskSetManager.scala:494)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:470)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2(TaskSchedulerImpl.scala:414)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$2$adapted(TaskSchedulerImpl.scala:409)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOfferSingleTaskSet$1(TaskSchedulerImpl.scala:409)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:399)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20(TaskSchedulerImpl.scala:606)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$20$adapted(TaskSchedulerImpl.scala:601)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16(TaskSchedulerImpl.scala:601)
	at org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$resourceOffers$16$adapted(TaskSchedulerImpl.scala:574)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:574)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.$anonfun$makeOffers$1(CoarseGrainedSchedulerBackend.scala:366)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$$withLock(CoarseGrainedSchedulerBackend.scala:1058)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:360)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:188)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
	at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
26/02/24 13:06:21 ERROR TaskSchedulerImpl: Resource offer failed, task set TaskSet_6.0 was not serializable
Job aborted due to stage failure: Failed to serialize task 4, not attempting to retry it. Exception during serialization: java.lang.NullPointerException
org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 4, not attempting to retry it. Exception during serialization: java.lang.NullPointerException
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:475)
	at org.apache.spark.sql.execution.HiveResult$.hiveResultString(HiveResult.scala:76)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.$anonfun$run$2(SparkSQLDriver.scala:76)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:76)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:501)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:619)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:613)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:613)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:310)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1034)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:199)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:222)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1125)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1134)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What doesn't meet your expectations?

spark-sql (default)> select t1, t2, t3 from default.t where date = '20250811';
1	1	1
2	1	1
Time taken: 6.136 seconds, Fetched 2 row(s)

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions