Skip to content

[SUPPORT] Facing issues based on custom Payload implementation setting #7988

@michael1991

Description

@michael1991

Tips before filing an issue

  • Have you gone through our FAQs?

  • Yes, but there is no complete example.

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • No.

  • If you have triaged this as a bug, then file an issue directly.

  • No.

Describe the problem you faced

I'm using HUDI 0.12.0 on GCP Dataproc 2.1.3 with Spark 3.3.0, when I try to use custom Payload class following steps below:

  1. implement custom Payload class by Java extends OverwriteWithLatestAvroPayload to achieve partial updates
  2. package jar with single custom Payload class
  3. set spark.driver.extraClassPath and spark.executor.extraClassPath during SparkSession creation
  4. config WRITE_PAYLOAD_CLASS_NAME with custom Payload class on writing
    Please give me a favor to handle this situation, Thanks in advance !

To Reproduce

Steps to reproduce the behavior:

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.12.0

  • Spark version : 3.3.0

  • Hive version : not used

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS..) : GCS

  • Running on Docker? (yes/no) : no

Additional context

Sorry, not sure.

Stacktrace

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 4 times, most recent failure: Lost task 0.3 in stage 15.0 (TID 797) (executor 2): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2673)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2609)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2608)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2861)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2803)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2792)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2257)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2276)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2301)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$countByKey$1(PairRDDFunctions.scala:367)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:367)
	at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:314)
	at org.apache.hudi.data.HoodieJavaPairRDD.countByKey(HoodieJavaPairRDD.java:105)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.buildProfile(BaseSparkCommitActionExecutor.java:187)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:156)
	at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:45)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:112)
	at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:76)
	at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:169)
	at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:166)
	at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:934)
	at org.apache.hudi.table.action.BaseActionExecutor.lambda$writeTableMetadata$2(BaseActionExecutor.java:77)
	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
	at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:77)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.finishRollback(BaseRollbackActionExecutor.java:247)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.runRollback(BaseRollbackActionExecutor.java:116)
	at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.execute(BaseRollbackActionExecutor.java:137)
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.rollback(HoodieSparkCopyOnWriteTable.java:281)
	at org.apache.hudi.client.BaseHoodieWriteClient.rollback(BaseHoodieWriteClient.java:772)
	... 52 more
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
	at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
	at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
	at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
	at jdk.internal.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1046)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2357)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:writerWrite client and core write operationspriority:criticalProduction degraded; pipelines stalled

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions