New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Fix the kryo serialization issue in BaseFile. #2343
Conversation
@@ -155,7 +153,7 @@ public PartitionData copy() { | |||
/** | |||
* Copy constructor. | |||
* | |||
* @param toCopy a generic data file to copy. | |||
* @param toCopy a generic data file to copy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: unnecessary whitespace changes can cause commit conflicts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will revert this change.
} | ||
|
||
private LazyImmutableMap(Map<K, V> map) { | ||
this.copiedMap = Maps.newLinkedHashMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use LinkedHashMap
? Do we need to preserve key order for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I was thinking that the map assertion will check the pairs insert order, so I used the LinkedHashMap
here. Read the code again, here we can use the HashMap
directly.
.add("null_value_counts", toReadableMap(nullValueCounts)) | ||
.add("nan_value_counts", toReadableMap(nanValueCounts)) | ||
.add("lower_bounds", toReadableMap(lowerBounds)) | ||
.add("upper_bounds", toReadableMap(upperBounds)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original maps should work fine here. There's no risk of add
modifying the map.
import java.util.Set; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
|
||
public class LazyImmutableMap<K, V> implements Map<K, V>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name doesn't quite work because this map isn't immutable. It can be used to get an immutable view of this map. Also, the of
method copies the contents of the other map, so it would be better to name that copyOf
instead.
How about naming this class SerializableMap
instead, since it is used to avoid problems with Kryo serialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sound good to me.
case 11: | ||
return upperBounds; | ||
return toReadableMap(upperBounds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to use toReadableMap
in this method because it is only used for serialization. We don't expose the Avro methods publicly, so we only need the unmodifiable maps to be returned by the ContentFile
accessor methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that make sense !
import static org.apache.iceberg.types.Types.NestedField.optional; | ||
import static org.apache.iceberg.types.Types.NestedField.required; | ||
|
||
public class TestDataFileSerialization { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests look good.
Let me rebase this PR to the latest commits, Thanks. |
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts); | ||
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts); | ||
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts); | ||
this.lowerBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later, we should check whether we need two wrappers, but it isn't a blocker here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do need SerializableByteBufferMap
for now as byte buffers may not be serializable.
Merged. Thanks for fixing this, @openinx! |
I am afraid it does not solve our issues with Kryo. We still are going to fail if we get a non-serializable byte buffer. I think We probably have two options:
|
Does my analysis sound right to you, @openinx @rdblue? cc others as well, @RussellSpitzer @yyanyy @jackye1995 |
Also, to answer why Spark is capable to handle unmodifiable collections: Spark registers by default com.twitter.chill serializers that cover unmodifiable maps/lists. That's why our Spark tests work. |
@aokolnychyi I tried to run the unit tests by switching to use diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java b/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
index ad67b181..4b43718d 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestDataFileSerialization.java
@@ -196,6 +196,6 @@ public class TestDataFileSerialization {
}
private static ByteBuffer longToBuffer(long value) {
- return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
+ return ByteBuffer.allocateDirect(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java
index f5c2990b..eb22873d 100644
--- a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java
+++ b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java
@@ -164,6 +164,6 @@ public class TestDataFileSerialization {
}
private static ByteBuffer longToBuffer(long value) {
- return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
+ return ByteBuffer.allocateDirect(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
} |
Hm, both suites fail for me after making the change. Flink:
Spark:
|
OK, from the class name Any way , I think it's necessary to address this issue, let me consider how to get this done. |
Let's collaborate, @openinx. I am looking into this as well. I feel the we have the same problem in Flink and Spark. |
I don't have a good solution so far. This is what I considered:
|
It seems this serialization bug persists when calling |
@dubeme, that isn't the same problem. That issue is happening when you serialize |
This PR is resolving this comment : #2258 (comment)