From 55483b71f36b84ac57d03a9b83e0e9d9b9b98eab Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Tue, 17 Jan 2017 19:10:33 +0100 Subject: [PATCH 1/2] [FLINK-5484] [serialization] Add test for registered Kryo types --- .../resources/flink_11-kryo_registrations | 86 +++++++++++++++ .../KryoGenericTypeSerializerTest.scala | 100 +++++++++++++++++- pom.xml | 1 + 3 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 flink-tests/src/test/resources/flink_11-kryo_registrations diff --git a/flink-tests/src/test/resources/flink_11-kryo_registrations b/flink-tests/src/test/resources/flink_11-kryo_registrations new file mode 100644 index 0000000000000..7000e62819a06 --- /dev/null +++ b/flink-tests/src/test/resources/flink_11-kryo_registrations @@ -0,0 +1,86 @@ +0,int +1,java.lang.String +2,float +3,boolean +4,byte +5,char +6,short +7,long +8,double +9,void +10,scala.collection.convert.Wrappers$SeqWrapper +11,scala.collection.convert.Wrappers$IteratorWrapper +12,scala.collection.convert.Wrappers$MapWrapper +13,scala.collection.convert.Wrappers$JListWrapper +14,scala.collection.convert.Wrappers$JMapWrapper +15,scala.Some +16,scala.util.Left +17,scala.util.Right +18,scala.collection.immutable.Vector +19,scala.collection.immutable.Set$Set1 +20,scala.collection.immutable.Set$Set2 +21,scala.collection.immutable.Set$Set3 +22,scala.collection.immutable.Set$Set4 +23,scala.collection.immutable.HashSet$HashTrieSet +24,scala.collection.immutable.Map$Map1 +25,scala.collection.immutable.Map$Map2 +26,scala.collection.immutable.Map$Map3 +27,scala.collection.immutable.Map$Map4 +28,scala.collection.immutable.HashMap$HashTrieMap +29,scala.collection.immutable.Range$Inclusive +30,scala.collection.immutable.NumericRange$Inclusive +31,scala.collection.immutable.NumericRange$Exclusive +32,scala.collection.mutable.BitSet +33,scala.collection.mutable.HashMap +34,scala.collection.mutable.HashSet +35,scala.collection.convert.Wrappers$IterableWrapper +36,scala.Tuple1 +37,scala.Tuple2 +38,scala.Tuple3 +39,scala.Tuple4 +40,scala.Tuple5 +41,scala.Tuple6 +42,scala.Tuple7 +43,scala.Tuple8 +44,scala.Tuple9 +45,scala.Tuple10 +46,scala.Tuple11 +47,scala.Tuple12 +48,scala.Tuple13 +49,scala.Tuple14 +50,scala.Tuple15 +51,scala.Tuple16 +52,scala.Tuple17 +53,scala.Tuple18 +54,scala.Tuple19 +55,scala.Tuple20 +56,scala.Tuple21 +57,scala.Tuple22 +58,scala.Tuple1$mcJ$sp +59,scala.Tuple1$mcI$sp +60,scala.Tuple1$mcD$sp +61,scala.Tuple2$mcJJ$sp +62,scala.Tuple2$mcJI$sp +63,scala.Tuple2$mcJD$sp +64,scala.Tuple2$mcIJ$sp +65,scala.Tuple2$mcII$sp +66,scala.Tuple2$mcID$sp +67,scala.Tuple2$mcDJ$sp +68,scala.Tuple2$mcDI$sp +69,scala.Tuple2$mcDD$sp +70,scala.Symbol +71,scala.reflect.ClassTag +72,scala.runtime.BoxedUnit +73,java.util.Arrays$ArrayList +74,java.util.BitSet +75,java.util.PriorityQueue +76,java.util.regex.Pattern +77,java.sql.Date +78,java.sql.Time +79,java.sql.Timestamp +80,java.net.URI +81,java.net.InetSocketAddress +82,java.util.UUID +83,java.util.Locale +84,java.text.SimpleDateFormat +85,org.apache.avro.generic.GenericData$Array diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala index 08a0a96d32731..e001799ebe88c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/KryoGenericTypeSerializerTest.scala @@ -17,17 +17,19 @@ */ package org.apache.flink.api.scala.runtime -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.esotericsoftware.kryo.io.{Input, Output} +import java.io._ +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Kryo, Serializer} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeutils.SerializerTestInstance import org.apache.flink.api.java.typeutils.GenericTypeInfo - +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.joda.time.LocalDate - import org.junit.Test +import scala.collection.mutable +import scala.io.Source import scala.reflect._ class KryoGenericTypeSerializerTest { @@ -146,6 +148,96 @@ class KryoGenericTypeSerializerTest { runTests(list) } + /** + * Tests that the registered classes in Kryo did not change. + * + * Once we have proper serializer versioning this test will become obsolete. + * But currently a change in the serializers can break savepoint backwards + * compatability between Flink versions. + */ + @Test + def testDefaultKryoRegisteredClassesDidNotChange(): Unit = { + // Previous registration (id => registered class (Class#getName)) + val previousRegistrations: mutable.HashMap[Int, String] = mutable.HashMap[Int, String]() + + val stream = Thread.currentThread().getContextClassLoader() + .getResourceAsStream("flink_11-kryo_registrations") + Source.fromInputStream(stream).getLines().foreach{ + line => + val Array(id, registeredClass) = line.split(",") + previousRegistrations.put(id.toInt, registeredClass) + } + + // Get Kryo and verify that the registered IDs and types in + // Kryo have not changed compared to the provided registrations + // file. + val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo + val nextId = kryo.getNextRegistrationId + for (i <- 0 until nextId) { + val registration = kryo.getRegistration(i) + + previousRegistrations.get(registration.getId) match { + case None => throw new IllegalStateException(s"Expected no entry with ID " + + s"${registration.getId}, but got one for type ${registration.getType.getName}. This " + + s"can lead to registered user types being deserialized with the wrong serializer when " + + s"restoring a savepoint.") + case Some(registeredClass) => + if (registeredClass != registration.getType.getName) { + throw new IllegalStateException(s"Expected type ${registration.getType.getName} with " + + s"ID ${registration.getId}, but got $registeredClass.") + } + } + } + + // Verify number of registrations (required to check if current number of + // registrations is less than before). + if (previousRegistrations.size != nextId) { + throw new IllegalStateException(s"Number of registered classes changed (previously " + + s"${previousRegistrations.size}, but now $nextId). This can lead to registered user " + + s"types being deserialized with the wrong serializer when restoring a savepoint.") + } + } + + /** + * Creates a Kryo serializer and writes the default registrations out to a + * comma separated file with one entry per line: + * + * id,class + * + * The produced file is used to check that the registered IDs don't change + * in future Flink versions. + * + * This method is not used in the tests, but documents how the test file + * has been created and can be used to re-create it if needed. + * + * @param filePath File path to write registrations to + */ + private def writeDefaultKryoRegistrations(filePath: String) = { + val file = new File(filePath) + if (file.exists()) { + file.delete() + } + + val writer = new BufferedWriter(new FileWriter(file)) + + try { + val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo + + val nextId = kryo.getNextRegistrationId + for (i <- 0 until nextId) { + val registration = kryo.getRegistration(i) + val str = registration.getId + "," + registration.getType.getName + writer.write(str, 0, str.length) + writer.newLine() + } + + println(s"Created file with registrations at $file.") + } finally { + writer.close() + } + } + + case class ComplexType(id: String, number: Int, values: List[Int]){ override def equals(obj: Any): Boolean ={ if(obj != null && obj.isInstanceOf[ComplexType]){ diff --git a/pom.xml b/pom.xml index cdb4e2da3ff7c..04934525ddca2 100644 --- a/pom.xml +++ b/pom.xml @@ -874,6 +874,7 @@ under the License. flink-tests/src/test/resources/testdata/terainput.txt + flink-tests/src/test/resources/flink_11-kryo_registrations flink-connectors/flink-avro/src/test/resources/avro/*.avsc out/test/flink-avro/avro/user.avsc flink-libraries/flink-table/src/test/scala/resources/*.out From ebd656310ac9e6323fc7b09632c8aef08f06ba48 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 18 Jan 2017 11:27:43 +0100 Subject: [PATCH 2/2] Revert "[FLINK-2608] Updated Twitter Chill version." This reverts commit 0d3ff88b369fbb1b0a8fb0e8263c9ce0a9da1583. --- flink-runtime/pom.xml | 13 -- .../util/CollectionDataSets.java | 32 ++- .../kryo/KryoCollectionsSerializerTest.java | 185 ------------------ pom.xml | 2 +- 4 files changed, 16 insertions(+), 216 deletions(-) delete mode 100644 flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 4a35304e4d3d2..ab1ff6bf73484 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -154,19 +154,6 @@ under the License. com.twitter chill_${scala.binary.version} ${chill.version} - - - - com.esotericsoftware - kryo-shaded - - - - - - - com.esotericsoftware.kryo - kryo diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java index 389a18fa5b4d3..ba48e121a168d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/util/CollectionDataSets.java @@ -23,8 +23,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -48,10 +46,10 @@ /** * ####################################################################################################### - * - * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. + * + * BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA. * IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING! - * + * * ####################################################################################################### */ public class CollectionDataSets { @@ -203,7 +201,7 @@ public static DataSet, String, Integer>> getGrou return env.fromCollection(data, type); } - + public static DataSet> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) { List> data = new ArrayList<>(); data.add(new Tuple2<>(new byte[]{0, 4}, 1)); @@ -212,12 +210,12 @@ public static DataSet> getTuple2WithByteArrayDataSet(Exe data.add(new Tuple2<>(new byte[]{2, 1}, 3)); data.add(new Tuple2<>(new byte[]{0}, 0)); data.add(new Tuple2<>(new byte[]{2, 0}, 1)); - + TupleTypeInfo> type = new TupleTypeInfo<>( PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO ); - + return env.fromCollection(data, type); } @@ -349,13 +347,13 @@ public static DataSet(3, "Third", 30, 300, 3000L, "Three", 30000L)); return env.fromCollection(data); } - + public static DataSet> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) { List> data = new ArrayList<>(); data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First")); data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second")); data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third")); - + return env.fromCollection(data); } @@ -612,22 +610,22 @@ public static class PojoWithDateAndEnum { public Date date; public Category cat; } - + public static DataSet getPojoWithDateAndEnum(ExecutionEnvironment env) { List data = new ArrayList<>(); - + PojoWithDateAndEnum one = new PojoWithDateAndEnum(); one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A; data.add(one); - + PojoWithDateAndEnum two = new PojoWithDateAndEnum(); two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A; data.add(two); - + PojoWithDateAndEnum three = new PojoWithDateAndEnum(); three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B; data.add(three); - + return env.fromCollection(data); } @@ -695,7 +693,7 @@ public static DataSet getPojoWithCollection(ExecutionEnviron pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); pwc1.scalaBigInt = BigInt.int2bigInt(10); pwc1.bigDecimalKeepItNull = null; - + // use calendar to make it stable across time zones GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18); pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis()); @@ -712,7 +710,7 @@ public static DataSet getPojoWithCollection(ExecutionEnviron pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN); pwc2.scalaBigInt = BigInt.int2bigInt(31104000); pwc2.bigDecimalKeepItNull = null; - + GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3); pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976 diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java deleted file mode 100644 index 0e8f4821ab6c4..0000000000000 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/kryo/KryoCollectionsSerializerTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.runtime.kryo; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.esotericsoftware.kryo.Kryo; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest; -import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Random; -import java.util.Set; - -@SuppressWarnings("unchecked") -public class KryoCollectionsSerializerTest extends AbstractGenericTypeSerializerTest { - - private ExecutionConfig ec = new ExecutionConfig(); - - @Test - public void testJavaList(){ - Collection a = new ArrayList<>(); - fillCollection(a); - runTests(a); - } - - @Test - public void testJavaSet(){ - Collection b = new HashSet<>(); - fillCollection(b); - runTests(b); - } - - @Test - public void testJavaDequeue(){ - Collection c = new LinkedList<>(); - fillCollection(c); - runTests(c); - } - - @Test - public void testJavaArraysAsList(){ - Collection a = Arrays.asList(42, 1337, 49, 1); - runTests(a); - } - - @Test - public void testJavaUnmodifiableSet(){ - Set b = new HashSet<>(); - fillCollection(b); - runTests(Collections.unmodifiableSet(b)); - } - - @Test - public void testJavaSingletonList(){ - Collection c = Collections.singletonList(42); - runTests(c); - } - - private void fillCollection(Collection coll) { - coll.add(42); - coll.add(1337); - coll.add(49); - coll.add(1); - } - - @Override - protected TypeSerializer createSerializer(Class type) { - return new KryoSerializer(type, ec); - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileSerializing() { - try { - // construct a long string - String str; - { - char[] charData = new char[40000]; - Random rnd = new Random(); - - for (int i = 0; i < charData.length; i++) { - charData[i] = (char) rnd.nextInt(10000); - } - - str = new String(charData); - } - - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000); - KryoSerializer serializer = new KryoSerializer(String.class, new ExecutionConfig()); - - try { - serializer.serialize(str, target); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - /** - * Make sure that the kryo serializer forwards EOF exceptions properly when serializing - */ - @Test - public void testForwardEOFExceptionWhileDeserializing() { - try { - int numElements = 100; - // construct a memory target that is too small for the string - TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements); - KryoSerializer serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig()); - - for(int i = 0; i < numElements; i++){ - serializer.serialize(i, target); - } - - ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer()); - - for(int i = 0; i < numElements; i++){ - int value = serializer.deserialize(source); - assertEquals(i, value); - } - - try { - serializer.deserialize(source); - fail("should throw a java.io.EOFException"); - } - catch (java.io.EOFException e) { - // that is how we like it :-) - } - catch (Exception e) { - fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName()); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void validateReferenceMappingEnabled() { - KryoSerializer serializer = new KryoSerializer<>(String.class, new ExecutionConfig()); - Kryo kryo = serializer.getKryo(); - assertTrue(kryo.getReferences()); - } -} diff --git a/pom.xml b/pom.xml index 04934525ddca2..1f2197fd040aa 100644 --- a/pom.xml +++ b/pom.xml @@ -98,7 +98,7 @@ under the License. 2.10.4 2.10 - 0.8.1 + 0.7.4 5.0.4 3.4.6 2.8.0