Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark custom Kryo registrator #549

Closed
wants to merge 1 commit into from

Conversation

jzhuge
Copy link
Member

@jzhuge jzhuge commented Oct 15, 2019

Kryo can't handle guava ImmutableList and ImmutableMap. This commit add a Spark custom Kryo registrator for these classes used by GenericDataFile.

Unit test TestKryoSerialization adapted from the one in #546.

A downstream project works fine after adding this Spark conf:

spark.kryo.registrator                org.apache.iceberg.spark.SparkKryoRegistrator

Fixes #446.

This PR replaces #546 that may be merged first before the release.

build.gradle Outdated
@@ -429,6 +429,8 @@ project(':iceberg-spark') {
compile project(':iceberg-parquet')
compile project(':iceberg-hive')

compile 'de.javakaffee:kryo-serializers'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be included in iceberg-spark-runtime, which means that we will need to make sure we add it to the shaded LICENSE and NOTICE.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added additional LICENSE and NOTICE.

.build();

private SparkConf conf = new SparkConf()
.set("spark.kryo.registrator", SparkKryoRegistrator.class.getCanonicalName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the registration to Spark's KryoSerializer from IcebergSource? It would be nice if users didn't have to add config properties.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the idea. Let me explore. Might have to instantiate the global SparkEnv.

versions.lock Outdated
@@ -5,8 +5,10 @@ aopalliance:aopalliance:1.0 (1 constraints: 170a83ac)
asm:asm:3.1 (2 constraints: 4f19c3c6)
com.carrotsearch:hppc:0.7.2 (1 constraints: f70cda14)
com.clearspring.analytics:stream:2.7.0 (1 constraints: 1a0dd136)
com.esotericsoftware:kryo:4.0.2 (1 constraints: 720da324)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to substitute kryo with kryo-shaded?
I took a quick stab at gradle dependency substitution but couldn't get it going because --write-locks still output kryo, not kryo-shaded.

@jzhuge jzhuge force-pushed the spark-custom-kryo-registrator branch from 9b0b7ff to 4928991 Compare October 15, 2019 19:43
@jzhuge
Copy link
Member Author

jzhuge commented Oct 15, 2019

Rebase after merging #546.

TODO:

  • Substitute dependency kryo with kryo-shaded
  • Explore registering Kryo serializers in IcebergSource thus no need to add Spark conf spark.kryo.registrator.

@aokolnychyi
Copy link
Contributor

This PR will also unblock #553 as we need to handle SerializableConfiguration in a special way when broadcasting.

@jzhuge jzhuge force-pushed the spark-custom-kryo-registrator branch from 869ea31 to 93eafaa Compare October 20, 2019 02:14
@jzhuge
Copy link
Member Author

jzhuge commented Oct 20, 2019

Spark KryoSerializer calls Twitter's com.twitter.chill.AllScalaRegistrar that includes serializers for many Java unmodifiable collections.

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2019

I think we want to avoid requiring a custom registrator for Spark to work with Kryo. Instead, we will need to use non-guava collections in places that may be serialized with Kryo.

@jzhuge, @aokolnychyi, should we close this or is there still value in having a custom registrator even if we try to avoid needing it?

@jzhuge
Copy link
Member Author

jzhuge commented Dec 18, 2019

Ok to close.

@rdblue rdblue closed this Dec 18, 2019
@flyrain
Copy link
Contributor

flyrain commented Jul 28, 2021

Are we going to replace guava maps with non-guava ones? There are bunches of place using guava's Immutable Map. For example, here are 4 guava maps in class TableMetadata. We hit this issue when we were trying to broadcast one of them.

    this.snapshotsById = indexAndValidateSnapshots(snapshots, lastSequenceNumber);
    this.schemasById = indexSchemas();
    this.specsById = indexSpecs(specs);
    this.sortOrdersById = indexSortOrders(sortOrders);

@jzhuge
Copy link
Member Author

jzhuge commented Jul 29, 2021

What version of kyro do you use? Newer versions support ImmutableMap.

@flyrain
Copy link
Contributor

flyrain commented Jul 29, 2021

Thanks for the information. Nice to see you here, @jzhuge. We are using kryo-shaded-4.0.2.jar in Spark. Which version has the change in https://github.com/magro/kryo-serializers/blob/master/src/main/java/de/javakaffee/kryoserializers/guava/ImmutableMapSerializer.java?
Also, Spark 3.1.2 uses kryo v4 per this doc, https://spark.apache.org/docs/latest/tuning.html#data-serialization.

@jzhuge
Copy link
Member Author

jzhuge commented Jul 29, 2021

Oops, I referenced the wrong repo.

If possible, you can add the following code to KryoSerializer.newKryo():

    ImmutableListSerializer.registerSerializers(kryo);
    ImmutableMapSerializer.registerSerializers(kryo);

Be sure to add dependency on de.javakaffee:kryo-serializers.

@flyrain
Copy link
Contributor

flyrain commented Jul 30, 2021

Hi @jzhuge, not sure I understand. Do I add these lines to my Spark application or Spark itself?

    ImmutableListSerializer.registerSerializers(kryo);
    ImmutableMapSerializer.registerSerializers(kryo);

@jzhuge
Copy link
Member Author

jzhuge commented Jul 30, 2021

Spark itself if you can

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

KryoException when writing Iceberg tables in Spark
4 participants