Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 36 #6621

Closed
dik111 opened this issue Sep 7, 2022 · 8 comments
Assignees
Labels

Comments

@dik111
Copy link

dik111 commented Sep 7, 2022

Describe the problem you faced

I used Flink to update data in realtime, and used spark to read data, it throws an error com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 36

I used Flink to read the data and no relevant exception occurred

To Reproduce

Steps to reproduce the behavior:

1.Use Flink to update data in realtime
2.Use Spark to read data

Environment Description

  • Hudi version : 0.12.0

  • Spark version :2.4.4 and 3.2.1

  • Flink version :1.13.3

  • Hive version :3.0.0

  • Hadoop version :3.0.0

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : no

Additional context

Here is my code:

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    val spark:SparkSession = {
      SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[4]")
        .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
        .getOrCreate()
    }

    val tableName = "pt_sfy_sfy_oe_order_lines_all_hudi_0908"
    val tablePath = "hdfs://xxx:8020/warehouse/tablespace/managed/hive/test.db/pt_sfy_sfy_oe_order_lines_all_hudi_090666"

    val dataFrame = spark.read.format("org.apache.hudi").load(tablePath)
    dataFrame.createOrReplaceTempView("pt_sfy_sfy_oe_order_lines_all_hudi_090666")
    spark.sql("select count(*) from pt_sfy_sfy_oe_order_lines_all_hudi_090666").show()
    spark.stop()
  }

Stacktrace

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 36
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:388)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
	at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:104)
	at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:78)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:106)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:91)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:473)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:343)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:402)
	at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.<init>(HoodieMergeOnReadRDD.scala:196)
	at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.<init>(HoodieMergeOnReadRDD.scala:278)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:132)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
22/09/07 16:14:36 ERROR Executor: Exception in task 73.0 in stage 3.0 (TID 251)
org.apache.hudi.exception.HoodieException: Exception when reading log file 
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:352)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:192)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:110)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:103)
	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:324)
	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:402)
	at org.apache.hudi.HoodieMergeOnReadRDD$LogFileIterator.<init>(HoodieMergeOnReadRDD.scala:196)
	at org.apache.hudi.HoodieMergeOnReadRDD$RecordMergingFileIterator.<init>(HoodieMergeOnReadRDD.scala:278)
	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:132)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
@danny0405
Copy link
Contributor

JohnSnowLabs/spark-nlp#1123

Did you try configuring the serializer explicitly: spark.serializer: org.apache.spark.serializer.KryoSerializer

@danny0405 danny0405 added spark Issues related to spark reader-core labels Sep 9, 2022
@danny0405
Copy link
Contributor

@alexeykudinkin Do you have interest on this ?

@yihua yihua added priority:critical production down; pipelines stalled; Need help asap. priority:blocker and removed priority:blocker priority:critical production down; pipelines stalled; Need help asap. labels Sep 26, 2022
@yihua yihua assigned xushiyan and unassigned bvaradar Sep 26, 2022
@codope
Copy link
Member

codope commented Sep 30, 2022

@dik111 did you get a chance to try out this suggestion #6621 (comment) ?

@alexeykudinkin
Copy link
Contributor

alexeykudinkin commented Sep 30, 2022

Created HUDI-4959 to track this issue.

TL;DR Problem is in the fact that we're using Kryo to ser/de HoodieKey (which fails b/c can't find class registration nor association with the encoded id)


EDIT

I'm taking back my hypothesis that the issue is in the class encoding, after writing a small test to validate the issue i confirmed that Kryo actually writes out full class-name for all classes registered implicitly (as it should).

It seems that the problem is actually indeed in misalignment of the Avro versions as reported by @KnightChess: quick-checking i see that b/w Avro 1.8.2 and 1.10.2, Utf8 actually had one more field added:

  // 1.8.2 
  private byte[] bytes = EMPTY;
  private int length;
  private String string;

  // 1.10.2
  private byte[] bytes;
  private int hash;
  private int length;
  private String string;

Provided that we're relying on Kryo to generate serializer for orderingVal that could be Utf8 (based on FieldSerializer) it would actually explain why it couldn't deserialize it back (since they will have different serializers).

@KnightChess
Copy link
Contributor

KnightChess commented Oct 14, 2022

we also encountered this problem, in our env, flink-bundle avro version is 1.10.0 and spark3-bundle avro version is 1.10.2, and after flink write mor table with deletelogblock

spark:
spark query will throw exception, classId - 2 = index is id 36

Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 36, Size: 7
Serialization trace:
string (org.apache.hudi.org.apache.avro.util.Utf8)
orderingVal (org.apache.hudi.common.model.DeleteRecord)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
	at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:104)
	at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:78)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.deserialize(HoodieDeleteBlock.java:106)
	at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getRecordsToDelete(HoodieDeleteBlock.java:91)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:476)
	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scanInternal(AbstractHoodieLogRecordReader.java:346)
	... 30 more
Caused by: java.lang.IndexOutOfBoundsException: Index: 36, Size: 7
	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:857)
	at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:780)
	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
	... 44 more

presto:
presto will throw the same exception which you meet, presto avro version is 1.8.2

resovle:
In spark, I change the avro version to 1.10.0 which the same as flink, anything will be ok in query, I didn't check for presto, you can have a try in spark @dik111

@Armelabdelkbir
Copy link

Armelabdelkbir commented Oct 14, 2022

Hello i also have this probleme when i try to read data with spark, same job works fine sometimes and sometimes failed :

Steps to reproduce :

1.Use spark structured streaming to fetch data from kafka in realtime
2.Use Spark to read data
i have problem when i'm using spark submit, but i can't reproduce the issue with jupyter :(
My stack
Hudi version : 0.11.0
Spark version : 2.4.6 / 3.1.4
Hive version : 1.2.1000
Storage (HDFS) : 2.7.3

58782 [dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ShuffleMapStage 8 (parquet at Utilities.scala:27) failed in 8.241 s due to Job aborted due to stage failure: Task 1 in stage 8.0 failed 4 times, most recent failure: Lost task 1.3 in stage 8.0 (TID 35, cnode24.datalake.com, executor 36): org.apache.hudi.exception.HoodieException: Exception when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:332) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:178) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.performScan(HoodieMergedLogRecordScanner.java:103) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:96) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner$Builder.build(HoodieMergedLogRecordScanner.java:291) at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:370) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:240) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:230) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:77) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: [Lorg.apache.hudi.common.model.DeleteRecord; at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:804) at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:104) at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:78) at org.apache.hudi.common.table.log.block.HoodieDeleteBlock.getKeysToDelete(HoodieDeleteBlock.java:89) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.processQueuedBlocksForInstant(AbstractHoodieLogRecordReader.java:430) at org.apache.hudi.common.table.log.AbstractHoodieLogRecordReader.scan(AbstractHoodieLogRecordReader.java:323) ... 34 more

@alexeykudinkin
Copy link
Contributor

@dik111 please check my updated comment: can you try remediation that @KnightChess called out (making sure that the versions of Avro used in Flink and Spark are identical)?

In the meantime, i'm going to be putting up a fix to properly handle Utf8 in Kryo

@alexeykudinkin
Copy link
Contributor

Please follow HUDI-4959 for the updates regarding the fix. Closing this one (in favor of the Jira)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Archived in project
Development

No branches or pull requests

9 participants