From 317e6b863aafe562de4d7b6152eaaa5d46055b9d Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Sun, 27 Jan 2019 16:40:40 +0100 Subject: [PATCH] [FLINK-11329] Migrating the UnionSerializer --- .../api/datastream/CoGroupedStreams.java | 107 +++++++++++------- .../UnionSerializerMigrationTest.java | 67 +++++++++++ .../resources/flink-1.6-union-serializer-data | 1 + .../flink-1.6-union-serializer-snapshot | Bin 0 -> 1393 bytes .../resources/flink-1.7-union-serializer-data | 1 + .../flink-1.7-union-serializer-snapshot | Bin 0 -> 1403 bytes 6 files changed, 135 insertions(+), 41 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 19d978322f09c2..bfd74e05cab0f4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -25,14 +25,13 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -504,7 +503,9 @@ public boolean canEqual(Object obj) { } } - private static class UnionSerializer extends TypeSerializer> { + @VisibleForTesting + @Internal + static class UnionSerializer extends TypeSerializer> { private static final long serialVersionUID = 1L; private final TypeSerializer oneSerializer; @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot> snapshotConfiguration() { - return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof UnionSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousSerializersAndConfigs = - ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(0).f1, - oneSerializer); - - CompatibilityResult twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(1).f1, - twoSerializer); - - if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new UnionSerializer<>( - new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new UnionSerializerSnapshot<>(this); } } /** * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. */ + @Deprecated public static class UnionSerializerConfigSnapshot - extends CompositeTypeSerializerConfigSnapshot> { + extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; - /** This empty nullary constructor is required for deserializing the configuration. */ - public UnionSerializerConfigSnapshot() {} + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public UnionSerializerConfigSnapshot() { + } public UnionSerializerConfigSnapshot(TypeSerializer oneSerializer, TypeSerializer twoSerializer) { super(oneSerializer, twoSerializer); } + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + List, TypeSerializerSnapshot>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs(); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new UnionSerializerSnapshot<>(), + nestedSerializersAndConfigs.get(0).f1, + nestedSerializersAndConfigs.get(1).f1 + ); + } + @Override public int getVersion() { return VERSION; } } + /** + * The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}. + */ + public static class UnionSerializerSnapshot + extends CompositeTypeSerializerSnapshot, UnionSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings("WeakerAccess") + public UnionSerializerSnapshot() { + super(correspondingSerializerClass()); + } + + UnionSerializerSnapshot(UnionSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(UnionSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer}; + } + + @SuppressWarnings("unchecked") + @Override + protected UnionSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new UnionSerializer<>((TypeSerializer) nestedSerializers[0], (TypeSerializer) nestedSerializers[1]); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) UnionSerializer.class; + } + } + // ------------------------------------------------------------------------ // Utility functions that implement the CoGroup logic based on the tagged // union window reduce diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java new file mode 100644 index 00000000000000..2b85d7e4909a7c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration tests for {@link UnionSerializer}. + */ +@RunWith(Parameterized.class) +public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + public UnionSerializerMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "union-serializer", + UnionSerializer.class, + UnionSerializerSnapshot.class, + UnionSerializerMigrationTest::stringLongRowSupplier); + + return testSpecifications.get(); + } + + private static TypeSerializer> stringLongRowSupplier() { + return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE); + } + +} + + + diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data new file mode 100644 index 00000000000000..cb29a995ec68b3 --- /dev/null +++ b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data @@ -0,0 +1 @@ +53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270 \ No newline at end of file diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..178d007196e40066639db92d9369060aa1554aae GIT binary patch literal 1393 zcmd6m%}T>S6oqdBS^5^PCmRu@JBt(~3n2<_$7b3NCUfJ=l%`#{^;s0(!}k!EitgNc zGYO3$3btm^fx!GE-#z!t2>^KDhW7~&aGKKEdw+a`z-Tx{J!L%Sb2c z)H7Mq2>H&^aJ&JjttlpIDlDqy{oTV8Tb%d+McJiK0*^MpM);e&k`}mYzwGd~zU(2Y ziJZoY=Vpf1(E&)i8?O|w6)h8bKI*fVI&eE{dLN;3y^cCA|Cr4)u6ZV`(5Wz{9g50a Y2m7a8<*)x-JKs;5dJp?<`o6tcUz~8%(*OVf literal 0 HcmV?d00001 diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data new file mode 100644 index 00000000000000..d2f04fdc567368 --- /dev/null +++ b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data @@ -0,0 +1 @@ +53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393 \ No newline at end of file diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..747eb82103473a1761b2986a7c6d1942d0dba4be GIT binary patch literal 1403 zcmd6m&q~8U5XL8wdh;!E-dseG-Yi9s9E4Qxy0+VPG1*CXxAf1!YoA5&F$CX3JSzGK z&L*vCDuU4*3<+c=$#1{;z6}8QaEW@vc*gr{XDi&JuL@tHypc=|itaxU|XqWNBI{pF36N}mKC*T6>j zojgZtxnSrbdVai6xuS9cu(wYg_w0GSCH}1~O~g)f`p~Q4>u&JDZe#Ov<>Wtl=VEwT b$IwjpjqlCILO8DxLUnTo)v^-6?_K)