Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kryo serialization exception when running add_files #3586

Closed
dubeme opened this issue Nov 19, 2021 · 9 comments
Closed

Kryo serialization exception when running add_files #3586

dubeme opened this issue Nov 19, 2021 · 9 comments

Comments

@dubeme
Copy link
Contributor

dubeme commented Nov 19, 2021

Hey all,

I'm exploring Iceberg 0.12.1 on Azure Synapse Analytics Spark Pools, running Spark 3.1. I've created my table:

DROP TABLE IF EXISTS the_catalog.iceberg.synapse.the_table;

CREATE TABLE the_catalog.iceberg.synapse.the_table (
    group_id bigint,
    col1 string,
    col2 double,
    col3 map<bigint,bigint>,
    event_date_time timestamp,
    data_version timestamp,
    date date
) USING ICEBERG
PARTITIONED BY (date, group_id)

Next, I add files (about 15,000 files) for one partition

CALL the_catalog.system.add_files(
  table => 'the_catalog.iceberg.synapse.the_table',
  source_table => '`parquet`.`adl://<ADLS ACCOUNT>.azuredatalakestore.net/path/to/files/date=2021-11-01`'
)

This throws an exception - It seems to be complaining about kryo serialization... Any ideas what might be going on? The only other issue I can find is #446

Error: Job aborted due to stage failure: Task 2 in stage 4.0 failed 4 times, most recent failure: Lost task 2.3 in stage 4.0 (TID 504) (vm-29407630 executor 1): java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
values (org.apache.iceberg.spark.SparkTableUtil$SparkPartition)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1412)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:69)
at sun.reflect.GeneratedMethodAccessor33.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:457)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
values (org.apache.iceberg.spark.SparkTableUtil$SparkPartition)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:391)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:35)
at com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:23)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:297)
at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$2(ParallelCollectionRDD.scala:79)
at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$2$adapted(ParallelCollectionRDD.scala:79)
at org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:171)
at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$1(ParallelCollectionRDD.scala:79)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1405)
... 20 more
Caused by: java.lang.UnsupportedOperationException
at org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:527)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 35 more

Driver stacktrace:
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2263)
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2212)
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2211)
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2211)
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1082)
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1082)
scala.Option.foreach(Option.scala:407)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1082)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2450)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2392)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2381)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:869)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:304)
org.apache.spark.RangePartitioner.(Partitioner.scala:171)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:267)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:155)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:155)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:213)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:251)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:248)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:209)
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:354)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:420)
org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
org.apache.spark.sql.Dataset.$anonfun$collectAsList$1(Dataset.scala:2977)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:112)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:176)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:94)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:67)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2976)
org.apache.iceberg.spark.SparkTableUtil.importSparkPartitions(SparkTableUtil.java:485)
org.apache.iceberg.spark.procedures.AddFilesProcedure.importPartitions(AddFilesProcedure.java:181)
org.apache.iceberg.spark.procedures.AddFilesProcedure.importFileTable(AddFilesProcedure.java:169)
org.apache.iceberg.spark.procedures.AddFilesProcedure.lambda$importToIceberg$1(AddFilesProcedure.java:129)
org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:85)
org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:74)
org.apache.iceberg.spark.procedures.AddFilesProcedure.importToIceberg(AddFilesProcedure.java:121)
org.apache.iceberg.spark.procedures.AddFilesProcedure.call(AddFilesProcedure.java:108)
org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:33)
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:46)
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:112)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:176)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:94)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:67)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
org.apache.spark.sql.Dataset.(Dataset.scala:228)
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
org.apache.livy.repl.SQLInterpreter.execute(SQLInterpreter.scala:121)
org.apache.livy.repl.Session.$anonfun$executeCode$1(Session.scala:330)
scala.Option.map(Option.scala:230)
org.apache.livy.repl.Session.executeCode(Session.scala:328)
org.apache.livy.repl.Session.$anonfun$execute$2(Session.scala:202)
org.apache.livy.repl.Session.withRealtimeOutputSupport(Session.scala:451)
org.apache.livy.repl.Session.$anonfun$execute$1(Session.scala:202)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
scala.util.Success.$anonfun$map$1(Try.scala:255)
scala.util.Success.map(Try.scala:213)
scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)

@dubeme
Copy link
Contributor Author

dubeme commented Nov 19, 2021

cc: @RussellSpitzer

@dubeme dubeme changed the title Kryp serialization exception when running add_files Kryo serialization exception when running add_files Nov 20, 2021
@rdblue
Copy link
Contributor

rdblue commented Nov 21, 2021

Looks like we just need to stop using an ImmutableMap in SparkPartition: https://github.com/apache/iceberg/blob/master/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java#L650

@dubeme would you like to open a PR with the fix and a test that serializes SparkPartition using Kryo?

dubeme added a commit to dubeme/iceberg that referenced this issue Nov 22, 2021
Issue apache#3586 shows situation where Kyro is unable to serialize the SparkPartition object (@rdblue points it out from the stack trace). This commit replaces the ImmutableMap with HashMap
@dubeme
Copy link
Contributor Author

dubeme commented Nov 22, 2021

@rdblue , I'll give it a go. I'm not familiar with gradle (more of a maven person), but I can try working through it.

qq: You point to the Spark 3.2 source, I assume such fix will also apply to the 3.0, 3.1 files?

@rdblue
Copy link
Contributor

rdblue commented Nov 22, 2021

Thanks for working on this! For now, you should update 3.2 and we can port the fix to the other Spark versions after it's in.

dubeme added a commit to dubeme/iceberg that referenced this issue Nov 23, 2021
Issue apache#3586 shows situation where Kyro is unable to serialize the SparkPartition object (@rdblue points it out from the stack trace). This commit replaces the ImmutableMap with HashMap
@dubeme
Copy link
Contributor Author

dubeme commented Nov 23, 2021

Cool... I just created this small PR #3597

@kbendick kbendick added this to the Iceberg 0.13.0 Release milestone Nov 30, 2021
@kbendick
Copy link
Contributor

I added the associated PR to the 0.13.0 release as this seems like something we'd want to fix before then.

Given that there is a PR, I'll remove the issue from it.

@kbendick kbendick removed this from the Iceberg 0.13.0 Release milestone Nov 30, 2021
@kbendick
Copy link
Contributor

kbendick commented Dec 1, 2021

Hi @dubeme! Thanks for the PR. We'd love to include it in the upcoming planned 0.13.0 release.

Please let me know if there's anything I can do to help you complete it.

Looks like it just needs a test. I linked a sample test you can mostly copy from to complete that in the PR.

@rdblue
Copy link
Contributor

rdblue commented Dec 5, 2021

Fixed by #3667.

@rdblue rdblue closed this as completed Dec 5, 2021
@dubeme
Copy link
Contributor Author

dubeme commented Dec 6, 2021

Thanks @kbendick for stepping in, and getting this through... Been out of comissions the past few weeks #OSSFTW

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants