From 49b07587c137c6dfa22dc486424998175300830e Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Thu, 22 Feb 2018 17:22:54 +0100 Subject: [PATCH 1/3] [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant --- .../TupleSerializerConfigSnapshot.java | 2 +- .../apache/flink/util/InstantiationUtil.java | 54 +++++++++- .../flink-1.3.2-scala-types-serializer-data | Bin 0 -> 97 bytes ...link-1.3.2-scala-types-serializer-snapshot | Bin 0 -> 7634 bytes .../TupleSerializerCompatibilityTest.scala | 86 ++++++++++++++++ ...SerializerCompatibilityTestGenerator.scala | 94 ++++++++++++++++++ 6 files changed, 230 insertions(+), 6 deletions(-) create mode 100644 flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data create mode 100644 flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot create mode 100644 flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTest.scala create mode 100644 flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerCompatibilityTestGenerator.scala diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java index 705099e9b2dbc..eac5200da9c6f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerConfigSnapshot.java @@ -61,7 +61,7 @@ public void read(DataInputView in) throws IOException { super.read(in); try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) { - tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader()); + tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true); } catch (ClassNotFoundException e) { throw new IOException("Could not find requested tuple class in classpath.", e); } diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 11e3990077a59..0f84f6c5a7684 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -121,10 +121,53 @@ protected Class resolveClass(ObjectStreamClass desc) throws IOException, Clas scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer"); - scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer"); } + /** + * The serialVersionUID might change between Scala versions and since those classes are + * part of the tuple serializer config snapshots we need to ignore them. + * + * See also FLINK-8451. + */ + private static Set scalaTypes = new HashSet<>(); + static { + scalaTypes.add("scala.Tuple1"); + scalaTypes.add("scala.Tuple2"); + scalaTypes.add("scala.Tuple3"); + scalaTypes.add("scala.Tuple4"); + scalaTypes.add("scala.Tuple5"); + scalaTypes.add("scala.Tuple6"); + scalaTypes.add("scala.Tuple7"); + scalaTypes.add("scala.Tuple8"); + scalaTypes.add("scala.Tuple9"); + scalaTypes.add("scala.Tuple10"); + scalaTypes.add("scala.Tuple11"); + scalaTypes.add("scala.Tuple12"); + scalaTypes.add("scala.Tuple13"); + scalaTypes.add("scala.Tuple14"); + scalaTypes.add("scala.Tuple15"); + scalaTypes.add("scala.Tuple16"); + scalaTypes.add("scala.Tuple17"); + scalaTypes.add("scala.Tuple18"); + scalaTypes.add("scala.Tuple19"); + scalaTypes.add("scala.Tuple20"); + scalaTypes.add("scala.Tuple21"); + scalaTypes.add("scala.Tuple22"); + scalaTypes.add("scala.Tuple1$mcJ$sp"); + scalaTypes.add("scala.Tuple1$mcI$sp"); + scalaTypes.add("scala.Tuple1$mcD$sp"); + scalaTypes.add("scala.Tuple2$mcJJ$sp"); + scalaTypes.add("scala.Tuple2$mcJI$sp"); + scalaTypes.add("scala.Tuple2$mcJD$sp"); + scalaTypes.add("scala.Tuple2$mcIJ$sp"); + scalaTypes.add("scala.Tuple2$mcII$sp"); + scalaTypes.add("scala.Tuple2$mcID$sp"); + scalaTypes.add("scala.Tuple2$mcDJ$sp"); + scalaTypes.add("scala.Tuple2$mcDI$sp"); + scalaTypes.add("scala.Tuple2$mcDD$sp"); + } + /** * An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of * anonymous classes or our Scala serializer classes and also replaces occurences of GenericData.Array @@ -158,12 +201,13 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo } } - Class localClass = resolveClass(streamClassDescriptor); - if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass() + final Class localClass = resolveClass(streamClassDescriptor); + final String name = localClass.getName(); + if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name)|| localClass.isAnonymousClass() // isAnonymousClass does not work for anonymous Scala classes; additionally check by classname - || localClass.getName().contains("$anon$") || localClass.getName().contains("$anonfun")) { + || name.contains("$anon$") || name.contains("$anonfun")) { - ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); + final ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass); if (localClassDescriptor != null && localClassDescriptor.getSerialVersionUID() != streamClassDescriptor.getSerialVersionUID()) { LOG.warn("Ignoring serialVersionUID mismatch for anonymous class {}; was {}, now {}.", diff --git a/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..ddd6ac01112405f127b5c3f3d9412bf060009d3f GIT binary patch literal 97 zcmZQzV9;V@^GMCf$!B0I&qyq>chE=!0|o{L9w^oV(ioB{K*1A0oC4L(2vwm2(f%I@ H8i3LO{3sa1 literal 0 HcmV?d00001 diff --git a/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot b/flink-scala/src/test/resources/flink-1.3.2-scala-types-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..1ebdabb29fc94d31123061db091237561394f4f2 GIT binary patch literal 7634 zcmeHMO>7%Q6n?v@LzR-Gv=BiB5>!6)(p|-A%8!vM#r=_sC6FA1+5*+Yo!FDEcg^g$ z#1%i{0yo5=2jGejrQ*f`Rq6#eASzB=xKyf&%AZ8V1#tn9zM0*4H_60}z3a*dTUy(* z*1mb~dw<_N1AqbQHTosMBVXZHT#sJ<>=tBv4ljDVYLN!1)Tp)SF!wAjnB`Z9LoD8O z1*=nQzS(eSnQ}%PcAoNrS8ou(7MR0CYo7Yz7^P3_m!lMwnN;it#0KJlLJ~~0?9FR)=G{nfaL}*GAk=9|2BE3=tgheL0taH zw;ulF^3>n&OMI0)j0?T9Hb`1PK&_%HBD3SlahThheM-z)Bg@RKI#hUW>(h5GePg|P zL~8d6yrH6YRx37$+O|KeBTUpNk5qf}PlJA#eS6jfvjh_kU5u$M;9##_p5&}ri;0p- zj<;3KYgN?)Sr#0vjueh#DW#5~V`jAS)|>vk%VHWm^Qe`*c5&v%RS2b=Gack?l0Aj3 zCjM+#0H%Nb=lCxz^TgMy8_P;$s%Vww+#^@0%F|j^^bpN?o>^%L=f zA>z5EeU+)!d*8m>0dZ;Jj`rIggzBW$0lhW>*%I1Se4qKn$A=z#DsMuj2nIV0rkE~) zK?UqCsSRvd_U#~5r3XY#z(n{{;5)p#P2?DpD(_VV%7CI)YS|?Xzvc<_ zl#I$u=&|2$iT6gz*dspz7(*P#F9J+l0GQkhuqy}fz}4`X8(|L}%vzaJb1~qU7V8b^ z`5fV-P6g!-_5HQAJV0EbmIN#Glkr1{`M`GoBbDynWf|(!K>E^*W%*8{BgynL;J7x@ zHp%Ms-Cdj3n7-=uJBmM#-R5PyZmPWiZib=MO`X5101svcCxlBD+iM~6Nh2rqkRD9L zdx0s4@sSror2EM|v}|v)6U2I=+9nDSqZ=Og)^<--z_5NvN+*wJ0fA2MxR)vgPdz0i z>yv|ugk?v9s1TI$#*!TmCi_J~SG(0M)b%xVbPcb+q~dt}6lQX2ho#g#HJHUSDP9u$ zAybf(uGlS?K{hkV Date: Fri, 23 Feb 2018 14:28:41 +0100 Subject: [PATCH 2/3] Feedback addressed --- .../java/org/apache/flink/util/InstantiationUtil.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 0f84f6c5a7684..3664f4b5f929d 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ protected Class resolveClass(ObjectStreamClass desc) throws IOException, Clas * *

This can be removed once 1.2 is no longer supported. */ - private static Set scalaSerializerClassnames = new HashSet<>(); + private final static Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -128,9 +128,9 @@ protected Class resolveClass(ObjectStreamClass desc) throws IOException, Clas * The serialVersionUID might change between Scala versions and since those classes are * part of the tuple serializer config snapshots we need to ignore them. * - * See also FLINK-8451. + * @see FLINK-8451 */ - private static Set scalaTypes = new HashSet<>(); + private final static Set scalaTypes = new HashSet<>(); static { scalaTypes.add("scala.Tuple1"); scalaTypes.add("scala.Tuple2"); @@ -203,7 +203,7 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo final Class localClass = resolveClass(streamClassDescriptor); final String name = localClass.getName(); - if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name)|| localClass.isAnonymousClass() + if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name) || localClass.isAnonymousClass() // isAnonymousClass does not work for anonymous Scala classes; additionally check by classname || name.contains("$anon$") || name.contains("$anonfun")) { From 31c1f56def258a2fa8f3a3e0144097626fec6f17 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Wed, 28 Feb 2018 12:33:28 +0100 Subject: [PATCH 3/3] Make checkstyle happy --- .../main/java/org/apache/flink/util/InstantiationUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 3664f4b5f929d..978d270e05f97 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -113,7 +113,7 @@ protected Class resolveClass(ObjectStreamClass desc) throws IOException, Clas * *

This can be removed once 1.2 is no longer supported. */ - private final static Set scalaSerializerClassnames = new HashSet<>(); + private static final Set scalaSerializerClassnames = new HashSet<>(); static { scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer"); scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer"); @@ -130,7 +130,7 @@ protected Class resolveClass(ObjectStreamClass desc) throws IOException, Clas * * @see FLINK-8451 */ - private final static Set scalaTypes = new HashSet<>(); + private static final Set scalaTypes = new HashSet<>(); static { scalaTypes.add("scala.Tuple1"); scalaTypes.add("scala.Tuple2");