From 516d39ae6b60695c69c511a8accc945b622ab0df Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Sun, 27 Jan 2019 14:50:36 +0100 Subject: [PATCH 1/8] [FLINK-11329] Migrating CompositeSerializer --- .../common/typeutils/CompositeSerializer.java | 8 +- .../typeutils/CompositeSerializerTest.java | 5 ++ .../tests/TtlVerifyUpdateFunction.java | 6 +- .../streaming/tests/verify/ValueWithTs.java | 60 +++++++++++++- .../runtime/state/ttl/TtlStateFactory.java | 74 ++++++++++++++++-- .../ttl/TtlSerializerStateMigrationTest.java | 60 ++++++++++++++ .../resources/flink-1.6-ttl-serializer-data | Bin 0 -> 230 bytes .../flink-1.6-ttl-serializer-snapshot | Bin 0 -> 1753 bytes .../resources/flink-1.7-ttl-serializer-data | Bin 0 -> 230 bytes .../flink-1.7-ttl-serializer-snapshot | Bin 0 -> 1763 bytes 10 files changed, 195 insertions(+), 18 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java create mode 100644 flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data create mode 100644 flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot create mode 100644 flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data create mode 100644 flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java index 2db7a309ab647..8a0c9034eb011 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java @@ -204,13 +204,10 @@ public boolean canEqual(Object obj) { return obj != null && getClass().equals(obj.getClass()); } - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new ConfigSnapshot(fieldSerializers); - } - @Override public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + // We can not remove this method, as long as we support restoring into CompositeTypeSerializerConfigSnapshot. + // Previously (pre 1.8), multiple composite serializers were using this class directly as their snapshot class. if (configSnapshot instanceof ConfigSnapshot) { List, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = ((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); @@ -302,6 +299,7 @@ static PrecomputedParameters precompute( } /** Snapshot field serializers of composite type. */ + @Deprecated public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 0; diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java index fc5c241c20566..719ea493ab76f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java @@ -191,6 +191,11 @@ protected CompositeSerializer> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { return new TestListCompositeSerializer(precomputed, originalSerializers); } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + throw new UnsupportedOperationException(); + } } private static class CompositeSerializerTestInstance extends SerializerTestInstance> { diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index f9e492eb91bb3..18f6e358e914a 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -143,7 +144,10 @@ public void initializeState(FunctionInitializationContext context) { prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream() .collect(Collectors.toMap(TtlStateVerifier::getId, v -> { checkNotNull(v); - TypeSerializer> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer()); + final TypeSerializer> typeSerializer = new ValueWithTs.Serializer( + v.getUpdateSerializer(), + LongSerializer.INSTANCE); + ListStateDescriptor> stateDesc = new ListStateDescriptor<>( "TtlPrevValueState_" + v.getId(), typeSerializer); KeyedStateStore store = context.getKeyedStateStore(); diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java index a4f30804e4fd2..ba178f4aaa701 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java @@ -19,8 +19,9 @@ package org.apache.flink.streaming.tests.verify; import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nonnull; @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { - public Serializer(TypeSerializer userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE); + public Serializer(TypeSerializer valueSerializer, TypeSerializer timestampSerializer) { + super(true, valueSerializer, timestampSerializer); } @SuppressWarnings("unchecked") @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs value, int index) { protected CompositeSerializer> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { - return new Serializer(precomputed, (TypeSerializer) originalSerializers[0]); + + return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]); + } + + TypeSerializer getValueSerializer() { + return fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer getTimestampSerializer() { + TypeSerializer fieldSerializer = fieldSerializers[1]; + return (TypeSerializer) fieldSerializer; + } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new ValueWithTsSerializerSnapshot(this); + } + } + + /** + * A {@link TypeSerializerSnapshot} for ValueWithTs Serializer. + */ + public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot, Serializer> { + + private final static int VERSION = 2; + + @SuppressWarnings("unused") + public ValueWithTsSerializerSnapshot() { + super(Serializer.class); + } + + ValueWithTsSerializerSnapshot(Serializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(Serializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()}; + } + + @SuppressWarnings("unchecked") + @Override + protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + TypeSerializer valueSerializer = nestedSerializers[0]; + TypeSerializer timeSerializer = (TypeSerializer) nestedSerializers[1]; + return new Serializer(valueSerializer, timeSerializer); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 453f43aded6df..6f05e1c2f7a26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -28,7 +28,9 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -126,7 +128,7 @@ private IS createState() throws Exception { @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor)); } @@ -134,7 +136,7 @@ private IS createValueState() throws Exception { private IS createListState() throws Exception { ListStateDescriptor listStateDesc = (ListStateDescriptor) stateDesc; ListStateDescriptor> ttlDescriptor = new ListStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, listStateDesc.getElementSerializer())); return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor)); } @@ -144,7 +146,7 @@ private IS createMapState() throws Exception { MapStateDescriptor> ttlDescriptor = new MapStateDescriptor<>( stateDesc.getName(), mapStateDesc.getKeySerializer(), - new TtlSerializer<>(mapStateDesc.getValueSerializer())); + new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor)); } @@ -154,7 +156,7 @@ private IS createReducingState() throws Exception { ReducingStateDescriptor> ttlDescriptor = new ReducingStateDescriptor<>( stateDesc.getName(), new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), - new TtlSerializer<>(stateDesc.getSerializer())); + new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor)); } @@ -165,7 +167,7 @@ private IS createAggregatingState() throws Exception { TtlAggregateFunction ttlAggregateFunction = new TtlAggregateFunction<>( aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider); AggregatingStateDescriptor, OUT> ttlDescriptor = new AggregatingStateDescriptor<>( - stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>(createTtlStateContext(ttlDescriptor), ttlAggregateFunction); } @@ -178,7 +180,7 @@ private IS createFoldingState() throws Exception { stateDesc.getName(), ttlInitAcc, new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), - new TtlSerializer<>(stateDesc.getSerializer())); + new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlFoldingState<>(createTtlStateContext(ttlDescriptor)); } @@ -237,8 +239,8 @@ public static class TtlSerializer extends CompositeSerializer> { private static final long serialVersionUID = 131020282727167064L; @SuppressWarnings("WeakerAccess") - public TtlSerializer(TypeSerializer userValueSerializer) { - super(true, LongSerializer.INSTANCE, userValueSerializer); + public TtlSerializer(TypeSerializer timestampSerializer, TypeSerializer userValueSerializer) { + super(true, timestampSerializer, userValueSerializer); } @SuppressWarnings("WeakerAccess") @@ -272,5 +274,61 @@ protected CompositeSerializer> createSerializerInstance( Preconditions.checkArgument(originalSerializers.length == 2); return new TtlSerializer<>(precomputed, originalSerializers); } + + @SuppressWarnings("unchecked") + TypeSerializer getTimestampSerializer() { + return (TypeSerializer) (TypeSerializer) fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer getValueSerializer() { + return (TypeSerializer) fieldSerializers[1]; + } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new TtlSerializerSnapshot<>(this); + } + } + + /** + * A {@link TypeSerializerSnapshot} for TtlSerializer. + */ + public static final class TtlSerializerSnapshot extends CompositeTypeSerializerSnapshot, TtlSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings({"WeakerAccess", "unused"}) + public TtlSerializerSnapshot() { + super(correspondingSerializerClass()); + } + + TtlSerializerSnapshot(TtlSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(TtlSerializer outerSerializer) { + return new TypeSerializer[]{ outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer()}; + } + + @Override + @SuppressWarnings("unchecked") + protected TtlSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + TypeSerializer timestampSerializer = (TypeSerializer) nestedSerializers[0]; + TypeSerializer valueSerializer = (TypeSerializer) nestedSerializers[1]; + + return new TtlSerializer<>(timestampSerializer, valueSerializer); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) TtlSerializer.class; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java new file mode 100644 index 0000000000000..87ab3962c3e50 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration test for {@link TtlSerializerStateMigrationTest}. + */ +@RunWith(Parameterized.class) +public class TtlSerializerStateMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + private static final String SPEC_NAME = "ttl-serializer"; + + public TtlSerializerStateMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + SPEC_NAME, + TtlSerializer.class, + TtlSerializerSnapshot.class, + () -> new TtlSerializer<>(LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} + diff --git a/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..9d156cae2fd5c4b0d881e76fef95a17bed4f4062 GIT binary patch literal 230 zcmX}lJr03E5P;#mfy5DXx|!Me*|~^Dt5GdSTqZa9zpmMaL42o4%zmPp*%uR#0K%Nz-PXtV9R&1~1DRgPie z3O{#PZ^qK}>X`P<>pO2h-u`(SJKz}F;CKxYx{B$*IfgDwAm> zYo_nsy#Fj--H-BKzzM9gy!Jm}yW|mErhIWs`6wdVBW#pnEI&8(vAygUqoL7ljA6vv<4Qcl8#`3i^$35kJ?p?!8jjN1%XH3K*KPWJEq@Be?#01(1c>k=U|IT)cBYGsai zqr||bsWpNo85X2;E_Q92*<2CE=#nG>y-G0YCt-mVEvRw{izN zSJW<`gm543 z4i)Od`jVVrYrn(nXl6rH*}xkW&1{GkHbgx(r2ewqfjgN)ek&^o4+&m)VwSnttwCMs zWQv3?Z*$oX{mO1in-yGE4CAbD^9ntJZerToZy$U(x&P;V#bId3v3llkAafKQYm(Zq ze^ohno*laKE{eG#u8m-1f08(3#~?Q39Ta*SFou;gtr@3|k(JJZ^P z^*(H>G%ZLTXzati#dA-(1gWg~8Ej}Y361)2yBeBdp~qPeob#H4c!+1~2Y2`}M|8v^ zxWPv}ituedivMY5tLSd7GK8*QNK1OM)ojtfzq5*ZBiBm%zt8PoRZp&bXjT?L?-~JI Op1RiMgGLB9@$LfJ4Pl!A literal 0 HcmV?d00001 From 8bba09246cb84a372a7ad739060152ee50e7d992 Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Sun, 27 Jan 2019 15:27:50 +0100 Subject: [PATCH 2/8] [FLINK-11329] Migrating the NullableSerializer --- .../typeutils/runtime/NullableSerializer.java | 175 +++++++++++++----- ...peSerializerSnapshotMigrationTestBase.java | 52 +++++- .../NullableSerializerMigrationTest.java | 83 +++++++++ ...nk-1.6-nullable-not-padded-serializer-data | Bin 0 -> 58 bytes ....6-nullable-not-padded-serializer-snapshot | Bin 0 -> 944 bytes .../flink-1.6-nullable-padded-serializer-data | Bin 0 -> 90 bytes ...nk-1.6-nullable-padded-serializer-snapshot | Bin 0 -> 952 bytes ...nk-1.7-nullable-not-padded-serializer-data | Bin 0 -> 58 bytes ....7-nullable-not-padded-serializer-snapshot | Bin 0 -> 941 bytes .../flink-1.7-nullable-padded-serializer-data | Bin 0 -> 90 bytes ...nk-1.7-nullable-padded-serializer-snapshot | Bin 0 -> 949 bytes 11 files changed, 260 insertions(+), 50 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data create mode 100644 flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data create mode 100644 flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java index fe392e4b1cb75..72b8620a779f7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java @@ -19,24 +19,23 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import java.io.IOException; -import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; /** * Serializer wrapper to add support of {@code null} value serialization. @@ -65,12 +64,15 @@ public class NullableSerializer extends TypeSerializer { private final byte[] padding; private NullableSerializer(@Nonnull TypeSerializer originalSerializer, boolean padNullValueIfFixedLen) { - this.originalSerializer = originalSerializer; - this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen); + this(originalSerializer, createPadding(originalSerializer.getLength(), padNullValueIfFixedLen)); + } + private NullableSerializer(@Nonnull TypeSerializer originalSerializer, byte[] padding) { + this.originalSerializer = originalSerializer; + this.padding = padding; } - private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) { + private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) { boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen; return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY; } @@ -79,7 +81,7 @@ private static byte[] createPadding(int originalSerializerLength, boolean pa * This method tries to serialize {@code null} value with the {@code originalSerializer} * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}. * - * @param originalSerializer serializer to wrap and add {@code null} support + * @param originalSerializer serializer to wrap and add {@code null} support * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer * @return serializer which supports {@code null} values */ @@ -99,22 +101,24 @@ public static boolean checkIfNullSupported(@Nonnull TypeSerializer serial DataOutputSerializer dos = new DataOutputSerializer(length); try { serializer.serialize(null, dos); - } catch (IOException | RuntimeException e) { + } + catch (IOException | RuntimeException e) { return false; } - Preconditions.checkArgument( + checkArgument( serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length, "The serialized form of the null value should have the same length " + "as any other if the length is fixed in the serializer"); DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer()); try { - Preconditions.checkArgument(serializer.deserialize(dis) == null); - } catch (IOException e) { + checkArgument(serializer.deserialize(dis) == null); + } + catch (IOException e) { throw new RuntimeException( String.format("Unexpected failure to deserialize just serialized null value with %s", serializer.getClass().getName()), e); } - Preconditions.checkArgument( + checkArgument( serializer.copy(null) == null, "Serializer %s has to be able properly copy null value if it can serialize it", serializer.getClass().getName()); @@ -125,10 +129,18 @@ private boolean padNullValue() { return padding.length > 0; } + private int nullPaddingLength() { + return padding.length; + } + + private TypeSerializer originalSerializer() { + return originalSerializer; + } + /** * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped. * - * @param originalSerializer serializer to wrap and add {@code null} support + * @param originalSerializer serializer to wrap and add {@code null} support * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer * @return wrapped serializer which supports {@code null} values */ @@ -176,7 +188,8 @@ public void serialize(T record, DataOutputView target) throws IOException { if (record == null) { target.writeBoolean(true); target.write(padding); - } else { + } + else { target.writeBoolean(false); originalSerializer.serialize(record, target); } @@ -209,7 +222,8 @@ public void copy(DataInputView source, DataOutputView target) throws IOException target.writeBoolean(isNull); if (isNull) { target.write(padding); - } else { + } + else { originalSerializer.copy(source, target); } } @@ -233,45 +247,25 @@ public int hashCode() { } @Override - public NullableSerializerConfigSnapshot snapshotConfiguration() { - return new NullableSerializerConfigSnapshot<>(originalSerializer); + public TypeSerializerSnapshot snapshotConfiguration() { + return new NullableSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof NullableSerializerConfigSnapshot) { - List, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = - ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousKvSerializersAndConfigs.get(0).f1, - originalSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new NullableSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue())); - } - } - - return CompatibilityResult.requiresMigration(); - } /** * Configuration snapshot for serializers of nullable types, containing the * configuration snapshot of its original serializer. */ + @Deprecated @Internal - public static class NullableSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static class NullableSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; - /** This empty nullary constructor is required for deserializing the configuration. */ - @SuppressWarnings("unused") - public NullableSerializerConfigSnapshot() {} + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public NullableSerializerConfigSnapshot() { + } NullableSerializerConfigSnapshot(TypeSerializer originalSerializer) { super(originalSerializer); @@ -281,5 +275,88 @@ public NullableSerializerConfigSnapshot() {} public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + NullableSerializer previousSerializer = (NullableSerializer) restoreSerializer(); + NullableSerializerSnapshot newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength()); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + newCompositeSnapshot, + getSingleNestedSerializerAndConfig().f1 + ); + } + } + + /** + * Snapshot for serializers of nullable types, containing the + * snapshot of its original serializer. + */ + @SuppressWarnings({"unchecked", "WeakerAccess"}) + public static class NullableSerializerSnapshot extends CompositeTypeSerializerSnapshot> { + + private static final int VERSION = 2; + private int nullPaddingLength; + + @SuppressWarnings("unused") + public NullableSerializerSnapshot() { + super(serializerClass()); + } + + public NullableSerializerSnapshot(NullableSerializer serializerInstance) { + super(serializerInstance); + this.nullPaddingLength = serializerInstance.nullPaddingLength(); + } + + private NullableSerializerSnapshot(int nullPaddingLength) { + super(serializerClass()); + checkArgument(nullPaddingLength >= 0, + "Computed NULL padding can not be negative. %d", + nullPaddingLength); + + this.nullPaddingLength = nullPaddingLength; + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(NullableSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.originalSerializer()}; + } + + @Override + protected NullableSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + checkState(nullPaddingLength >= 0, + "Negative padding size after serializer construction: %d", + nullPaddingLength); + + final byte[] padding = (nullPaddingLength == 0) ? EMPTY_BYTE_ARRAY : new byte[nullPaddingLength]; + TypeSerializer nestedSerializer = (TypeSerializer) nestedSerializers[0]; + return new NullableSerializer<>(nestedSerializer, padding); + } + + @Override + protected void writeOuterSnapshot(DataOutputView out) throws IOException { + out.writeInt(nullPaddingLength); + } + + @Override + protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + nullPaddingLength = in.readInt(); + } + + @Override + protected boolean isOuterSnapshotCompatible(NullableSerializer newSerializer) { + return nullPaddingLength == newSerializer.nullPaddingLength(); + } + + private static Class> serializerClass() { + return (Class>) (Class) NullableSerializer.class; + } } + } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java index 57c939d54b0d0..419861da20272 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java @@ -25,6 +25,7 @@ import org.apache.flink.testutils.migration.MigrationVersion; import org.apache.flink.util.TestLogger; +import org.hamcrest.CoreMatchers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -89,9 +90,11 @@ public void restoredSerializerIsAbleToDeserializePreviousData() throws IOExcepti TypeSerializer serializer = snapshot.restoreSerializer(); DataInputView input = dataUnderTest(); + + final Matcher matcher = testSpecification.testDataElementMatcher; for (int i = 0; i < testSpecification.testDataCount; i++) { final ElementT result = serializer.deserialize(input); - assertThat(result, notNullValue()); + assertThat(result, matcher); } } @@ -204,6 +207,9 @@ protected static final class TestSpecification { private String testDataLocation; private int testDataCount; + @SuppressWarnings("unchecked") + private Matcher testDataElementMatcher = (Matcher)notNullValue(); + @SuppressWarnings("unchecked") public static TestSpecification builder( String name, @@ -253,6 +259,11 @@ public TestSpecification withTestData(String testDataLocation, int testDataCo return this; } + public TestSpecification withTestDataMatcher(Matcher matcher) { + testDataElementMatcher = matcher; + return this; + } + private TypeSerializer createSerializer() { try { return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get(); @@ -341,6 +352,45 @@ public void add( } } + /** + * Adds a test specification to be tested for all specified test versions. + * + *

This method adds the specification with pre-defined snapshot and data filenames, + * with the format "flink-<testVersion>-<specName>-<data/snapshot>", + * and each specification's test data count is assumed to always be 10. + * + * @param name test specification name. + * @param serializerClass class of the current serializer. + * @param snapshotClass class of the current serializer snapshot class. + * @param serializerProvider provider for an instance of the current serializer. + * @param elementMatcher an {@code hamcrest} matcher to match test data. + * + * @param type of the test data. + */ + public void add( + String name, + Class serializerClass, + Class snapshotClass, + Supplier> serializerProvider, + Matcher elementMatcher) { + for (MigrationVersion testVersion : testVersions) { + testSpecifications.add( + TestSpecification.builder( + getSpecNameForVersion(name, testVersion), + serializerClass, + snapshotClass, + testVersion) + .withNewSerializerProvider(serializerProvider) + .withSnapshotDataLocation( + String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name)) + .withTestData( + String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name), + DEFAULT_TEST_DATA_COUNT) + .withTestDataMatcher(elementMatcher) + ); + } + } + /** * Adds a test specification to be tested for all specified test versions. * diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java new file mode 100644 index 0000000000000..1da7da7bcd0c4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer.NullableSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * {@link NullableSerializer} migration test. + */ +@RunWith(Parameterized.class) +public class NullableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public NullableSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "nullable-padded-serializer", + NullableSerializer.class, + NullableSerializerSnapshot.class, + () -> NullableSerializer.wrap(LongSerializer.INSTANCE, true), + NULL_OR_LONG); + + testSpecifications.add( + "nullable-not-padded-serializer", + NullableSerializer.class, + NullableSerializerSnapshot.class, + () -> NullableSerializer.wrap(LongSerializer.INSTANCE, false), + NULL_OR_LONG); + + return testSpecifications.get(); + } + + @SuppressWarnings("unchecked") + private static final Matcher NULL_OR_LONG = new NullableMatcher(); + + private static final class NullableMatcher extends BaseMatcher { + + @Override + public boolean matches(Object item) { + return item == null || item instanceof Long; + } + + @Override + public void describeTo(Description description) { + description.appendText("a null or a long"); + } + } + +} diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..ca6d29f57679c0259041c3befd8211cd98719bda GIT binary patch literal 58 XcmZQzfB;rTD1(8OkrBa$3t}<=1Hk|z literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..8c8b27f0ea31533907970047e8219b1d937f76fb GIT binary patch literal 944 zcmd5)Jxjwt7=DxD$0#n&g7ak&K?H}22ogk0w+`~Qy{4y^yLfl0t%Iw7qob8B{tEww zI2N2-9K9y3X=sKj4&Ida!aZN_0{{-B$9V>*4D6?!o#E&mR!ni5}%Fjq8>q@el(oT8$@TH@45(H}lMy^!1EiL@KIhf1O*PDdYGpR4P&*HrP0 z6fR5omR{4&;^vN|XtOX780(HSSu18ufw65!cY60WZOT85?k|dwIz1_kc%fhCs4$|2 QDo(X7t~QG_1^aziU((z@6#xJL literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..42439f7bcd29f68304b2d13fe4eaadfa122ac9a7 GIT binary patch literal 90 VcmZQzfB;q~&46Gq!i30U000G@02crN literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..9bd3af75b1d8b4ba168d1deec86d35b19da442aa GIT binary patch literal 952 zcmd6lF-yZh6vtmu)Q;lfEI3~l5kzpPh#*14bn773_L`nv?&960whpd-4aX{79Q_1- z4RI_uxj1^cgr=bxsyLV>_b%lB`~Tm20N{XqoEL!F*c?smJWoDgMM*diGDJAWojyT5 zM){Q~D7wT0jPxibiKa{;nQ%=bf=(00aGQ~UNQxP~B@&&Sj~~{jTQ0ahYzRq1$}uax zYuNP-!OIAqC76PG9T7!>dozaDG_>Ns{T8gnxZ9;X)Zn%nLkT;TPBWc3uVtMqIXEuB zFg9`bmJ?}XMUXFeSU8$jlF;~pPT5glA~B)`A3feby`r0=)Y>)NK`oJK(#hxM*J{1_ zW*r-7JqM>R2QDhPOOI=Nae{|}_h=Y!9IL+27Bbryn404tzkAO&=AX9kFBZfmJ+5r= Y!k(RBi6f$kWJIa*h_jYjvEMiA8|ddgH2?qr literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..aecf5b80e57660de7b2ecd021d988206bb43a7ff GIT binary patch literal 58 VcmZQ%fC5%VI1A1|2q9Q-NdN=A03!eZ literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..a2e0807bd9345be35b470a78d6f75a4480a5d3c3 GIT binary patch literal 941 zcmZQzU|?d1&o4^XODsrC&Pdfu%gM~k268g>vJ%S@^-3xWQcFuRbBgtfO7luGb5r&F zN^^1&lX6mnQ;RYab26(^i&QX$ob&V2GSh?e5(|nm@=L0Mk9=TYU|$O{1p==WnHZRS7=-eRGSf5j5_1qvDq%450otGs@svK;Q$S9pesX?pZhoFV z+*|q~Ko27HSw}Ol7bK>nWag!pFfc_sRTMFpV09_P9HihuH3#Th5MX3tV5lende@RT zLy{7UQ}ulE^U@J6f*1rbsffXe1anYb6b$rIPHIVhUggU6Rmb(pT|rJOWT<0cE-eE2 zbj@e+A8Z^U55O*lSqkz3l>G|GtcUU+K_hXH`idHH2J|TtLjVb(jT&nzs9ab^0GmpF Gr~v@&^Fn+8 literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..426221eddd6d59a47346368f298f18b24862ca48 GIT binary patch literal 90 YcmZQ%Km)9dFabCNCV)>0t_IEk00eaa9{>OV literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..685f8dd5afacda67d3521f9139652fa60002c74c GIT binary patch literal 949 zcmZQzU|?d1&o4^XODsrC&Pdfu%gM~k268g>vJ%S@^-3xWQcFuRbBgtfO7luGb5r&F zN^^1&lX6mnQ;RYab26(^i&QX$ob&V2GSh?e5(|nm@=L0Mk9=TYU|$O{1p==WnHZRS7=-eRGSf5j5_1qvDq%450otGs@svK;Q$S9pesX?pZhoFV z+*|q~Ko27HSw}Ol7bK>nWag!pFfc_sRTMFpV09_P9HihuH3#Th5MX3tV5lende@RT zLy{7UQ}ulE^U@J6f*1rbsffXe1anYb6b$rIPHIVhUggU6Rmb(pT|rJOWT<0cE-eE2 zbj@e+A8Z^U55O)4TFL>n7(@Xh4vazUS3qVxl>Z1CnS<1K)QCHvPnj43NC Date: Sun, 27 Jan 2019 15:41:49 +0100 Subject: [PATCH 3/8] [FLINK-11329] Migrating the RowSerializer --- .../java/typeutils/runtime/RowSerializer.java | 135 ++++++++++-------- .../runtime/RowSerializerMigrationTest.java | 64 +++++++++ .../resources/flink-1.6-row-serializer-data | Bin 0 -> 240 bytes .../flink-1.6-row-serializer-snapshot | Bin 0 -> 1444 bytes .../resources/flink-1.7-row-serializer-data | Bin 0 -> 240 bytes .../flink-1.7-row-serializer-snapshot | Bin 0 -> 1454 bytes 6 files changed, 136 insertions(+), 63 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java create mode 100644 flink-core/src/test/resources/flink-1.6-row-serializer-data create mode 100644 flink-core/src/test/resources/flink-1.6-row-serializer-snapshot create mode 100644 flink-core/src/test/resources/flink-1.7-row-serializer-data create mode 100644 flink-core/src/test/resources/flink-1.7-row-serializer-snapshot diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index e29f68132540e..47ff695a9c9a2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -15,18 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Row; @@ -34,7 +32,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.Arrays; -import java.util.List; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask; @@ -95,7 +92,8 @@ public Row copy(Row from) { if (fromField != null) { Object copy = fieldSerializers[i].copy(fromField); result.setField(i, copy); - } else { + } + else { result.setField(i, null); } } @@ -123,11 +121,13 @@ public Row copy(Row from, Row reuse) { if (reuseField != null) { Object copy = fieldSerializers[i].copy(fromField, reuseField); reuse.setField(i, copy); - } else { + } + else { Object copy = fieldSerializers[i].copy(fromField); reuse.setField(i, copy); } - } else { + } + else { reuse.setField(i, null); } } @@ -163,7 +163,6 @@ public void serialize(Row record, DataOutputView target) throws IOException { } } - @Override public Row deserialize(DataInputView source) throws IOException { int len = fieldSerializers.length; @@ -176,7 +175,8 @@ public Row deserialize(DataInputView source) throws IOException { for (int i = 0; i < len; i++) { if (nullMask[i]) { result.setField(i, null); - } else { + } + else { result.setField(i, fieldSerializers[i].deserialize(source)); } } @@ -198,11 +198,13 @@ public Row deserialize(Row reuse, DataInputView source) throws IOException { for (int i = 0; i < len; i++) { if (nullMask[i]) { reuse.setField(i, null); - } else { + } + else { Object reuseField = reuse.getField(i); if (reuseField != null) { reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source)); - } else { + } + else { reuse.setField(i, fieldSerializers[i].deserialize(source)); } } @@ -260,73 +262,80 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility + // Serializer configuration snapshoting & compatibility // -------------------------------------------------------------------------------------------- @Override - public RowSerializerConfigSnapshot snapshotConfiguration() { - return new RowSerializerConfigSnapshot(fieldSerializers); + public TypeSerializerSnapshot snapshotConfiguration() { + return new RowSerializerSnapshot(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof RowSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousFieldSerializersAndConfigs = - ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) { - boolean requireMigration = false; - TypeSerializer[] convertDeserializers = new TypeSerializer[fieldSerializers.length]; - - CompatibilityResult compatResult; - int i = 0; - for (Tuple2, TypeSerializerSnapshot> f : previousFieldSerializersAndConfigs) { - compatResult = CompatibilityUtil.resolveCompatibilityResult( - f.f0, - UnloadableDummyTypeSerializer.class, - f.f1, - fieldSerializers[i]); - - if (compatResult.isRequiresMigration()) { - requireMigration = true; - - if (compatResult.getConvertDeserializer() == null) { - // one of the field serializers cannot provide a fallback deserializer - return CompatibilityResult.requiresMigration(); - } else { - convertDeserializers[i] = - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); - } - } + /** + * A snapshot for {@link RowSerializer}. + */ + @Deprecated + public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { - i++; - } + private static final int VERSION = 1; - if (requireMigration) { - return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers)); - } else { - return CompatibilityResult.compatible(); - } - } + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public RowSerializerConfigSnapshot() { + } + + public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) { + super(fieldSerializers); } - return CompatibilityResult.requiresMigration(); + @Override + public int getVersion() { + return VERSION; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + TypeSerializerSnapshot[] nestedSnapshots = getNestedSerializersAndConfigs() + .stream() + .map(t -> t.f1) + .toArray(TypeSerializerSnapshot[]::new); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new RowSerializerSnapshot(), + nestedSnapshots); + } } - public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + /** + * A {@link TypeSerializerSnapshot} for RowSerializer. + */ + public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot { - private static final int VERSION = 1; + private static final int VERSION = 2; - /** This empty nullary constructor is required for deserializing the configuration. */ - public RowSerializerConfigSnapshot() {} + @SuppressWarnings("WeakerAccess") + public RowSerializerSnapshot() { + super(RowSerializer.class); + } - public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) { - super(fieldSerializers); + RowSerializerSnapshot(RowSerializer serializerInstance) { + super(serializerInstance); } @Override - public int getVersion() { + protected int getCurrentOuterSnapshotVersion() { return VERSION; } + + @Override + protected TypeSerializer[] getNestedSerializers(RowSerializer outerSerializer) { + return outerSerializer.fieldSerializers; + } + + @Override + protected RowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new RowSerializer(nestedSerializers); + } } } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java new file mode 100644 index 0000000000000..7a7888aca61b4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.RowSerializer.RowSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; +import org.apache.flink.types.Row; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration test for {@link RowSerializer}. + */ +@RunWith(Parameterized.class) +public class RowSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public RowSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "row-serializer", + RowSerializer.class, + RowSerializerSnapshot.class, + RowSerializerMigrationTest::stringLongRowSupplier); + + return testSpecifications.get(); + } + + private static TypeSerializer stringLongRowSupplier() { + RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + return rowTypeInfo.createSerializer(new ExecutionConfig()); + } +} diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-data b/flink-core/src/test/resources/flink-1.6-row-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..2cd0bd312d8c203ea349c3db8cc0fe721b4083d6 GIT binary patch literal 240 zcmZSJH#IObwlp*_HZ(OhGdE&j(Blg|Sf|DS6Sp)lGPE!>2Z@JCefLC&8yT2dnn1*5 z56ni08=G4iK*bdjA0os}O-(H=4Z-Fpi|s*(o10r0nVNyc)v5vz;+7`HmIlUPagBd3 qVB&@bhK44_W)N}h>0AhLBTEaQ99X?xMioNb#N61x7^2=#_$B}}WiLAb literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..9200e23bfec37035ed3d2fe1e8f0aa3b88cb0488 GIT binary patch literal 1444 zcmd5+yH3L}6uq%ju`)3+!icjVp{g4QA*2q7z|_gKPHRZ)V7v4&ATjX+4EzBC^#}L@ z{sS?B#LmKP5|KtpX%QV7DN?SSd(OGXM;8Dbh##^AP`SfTtjXr+11d&CRp~Zm5o>lh z^#dUHe^|C&-s|*7_Q@OT{IP6Y}znnJ(x@SdgwxH5BKZ0adHs ztUo-jeccmCvCy&Fe4@^n;ncHQ$aXf=q&5w^e?ZF5=pkyrt&)Ss+ou=W+aC+OgliyE zue2~~EU>Po$s41spk4<~OJ&Xzk&Rqn_LTHR+mkFZ9c5!^yq(mNcVrr3*~0fTJ$p+t zy0nfjD4`Ad z3qyu1ML~!#5r!}s-yWxT=shNmiXdO_%Aj@6DqM>BbboPI1_m;pHHW_=)G7G&^X=6= rO3Hn|c%9u5Njclja&vd5qIJl;eRl4-9#Z??t#(IuMG|D|*Q4eGkD4~k literal 0 HcmV?d00001 diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..f632dc4ec17c03e30136ce69651d7469c066172f GIT binary patch literal 1454 zcmd5+yH3L}6uk{9tV~RdVRaTHRAmD(Aay{bOr1<|Q$uQ}jzgOcNKE|&{s4ja0d^$* z12KZc&VrqWw(bKO$UwG}=pLWPbpXJEV=jDzL)>T))C?#)SAe23e1VZnLJ~_FL`cL; z(l$Xyyz3D`aX|Y-*aq6;tVw;3;V^1(+4mmb0idK#*t&m%awMSg>w^@Leo7Wt2T;a> z%A^kKO-h2(v6@K2PQ9wmaE4}_)QkeuY4CQNGiPMSsVQUQNw+6pd#)6C4rbNkN%EHZ`eDz-|sE)1#+Y10p#~ zZU=WysJpKPcHk1Mg_X`&O$Ao@B6*r^P4}xK%yhphI8d+yu(Fzcd&w5*-^NnZ8bK{b uUDL(e(=WcRWSZ_Dv(ptle|w+vmS6a87FNPxjuOn}woPahF#j*OOlqGv9_?HJ literal 0 HcmV?d00001 From 58a369295d5cf92ecbe925d2030df42c57ea9600 Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Sun, 27 Jan 2019 16:40:40 +0100 Subject: [PATCH 4/8] [FLINK-11329] Migrating the UnionSerializer --- .../api/datastream/CoGroupedStreams.java | 107 +++++++++++------- .../UnionSerializerMigrationTest.java | 67 +++++++++++ .../resources/flink-1.6-union-serializer-data | 1 + .../flink-1.6-union-serializer-snapshot | Bin 0 -> 1393 bytes .../resources/flink-1.7-union-serializer-data | 1 + .../flink-1.7-union-serializer-snapshot | Bin 0 -> 1403 bytes 6 files changed, 135 insertions(+), 41 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 19d978322f09c..bfd74e05cab0f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -25,14 +25,13 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -504,7 +503,9 @@ public boolean canEqual(Object obj) { } } - private static class UnionSerializer extends TypeSerializer> { + @VisibleForTesting + @Internal + static class UnionSerializer extends TypeSerializer> { private static final long serialVersionUID = 1L; private final TypeSerializer oneSerializer; @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot> snapshotConfiguration() { - return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof UnionSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousSerializersAndConfigs = - ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(0).f1, - oneSerializer); - - CompatibilityResult twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(1).f1, - twoSerializer); - - if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new UnionSerializer<>( - new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new UnionSerializerSnapshot<>(this); } } /** * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. */ + @Deprecated public static class UnionSerializerConfigSnapshot - extends CompositeTypeSerializerConfigSnapshot> { + extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; - /** This empty nullary constructor is required for deserializing the configuration. */ - public UnionSerializerConfigSnapshot() {} + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public UnionSerializerConfigSnapshot() { + } public UnionSerializerConfigSnapshot(TypeSerializer oneSerializer, TypeSerializer twoSerializer) { super(oneSerializer, twoSerializer); } + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + List, TypeSerializerSnapshot>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs(); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new UnionSerializerSnapshot<>(), + nestedSerializersAndConfigs.get(0).f1, + nestedSerializersAndConfigs.get(1).f1 + ); + } + @Override public int getVersion() { return VERSION; } } + /** + * The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}. + */ + public static class UnionSerializerSnapshot + extends CompositeTypeSerializerSnapshot, UnionSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings("WeakerAccess") + public UnionSerializerSnapshot() { + super(correspondingSerializerClass()); + } + + UnionSerializerSnapshot(UnionSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(UnionSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer}; + } + + @SuppressWarnings("unchecked") + @Override + protected UnionSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new UnionSerializer<>((TypeSerializer) nestedSerializers[0], (TypeSerializer) nestedSerializers[1]); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) UnionSerializer.class; + } + } + // ------------------------------------------------------------------------ // Utility functions that implement the CoGroup logic based on the tagged // union window reduce diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java new file mode 100644 index 0000000000000..2b85d7e4909a7 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration tests for {@link UnionSerializer}. + */ +@RunWith(Parameterized.class) +public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + public UnionSerializerMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "union-serializer", + UnionSerializer.class, + UnionSerializerSnapshot.class, + UnionSerializerMigrationTest::stringLongRowSupplier); + + return testSpecifications.get(); + } + + private static TypeSerializer> stringLongRowSupplier() { + return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE); + } + +} + + + diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data new file mode 100644 index 0000000000000..cb29a995ec68b --- /dev/null +++ b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data @@ -0,0 +1 @@ +53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270 \ No newline at end of file diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..178d007196e40066639db92d9369060aa1554aae GIT binary patch literal 1393 zcmd6m%}T>S6oqdBS^5^PCmRu@JBt(~3n2<_$7b3NCUfJ=l%`#{^;s0(!}k!EitgNc zGYO3$3btm^fx!GE-#z!t2>^KDhW7~&aGKKEdw+a`z-Tx{J!L%Sb2c z)H7Mq2>H&^aJ&JjttlpIDlDqy{oTV8Tb%d+McJiK0*^MpM);e&k`}mYzwGd~zU(2Y ziJZoY=Vpf1(E&)i8?O|w6)h8bKI*fVI&eE{dLN;3y^cCA|Cr4)u6ZV`(5Wz{9g50a Y2m7a8<*)x-JKs;5dJp?<`o6tcUz~8%(*OVf literal 0 HcmV?d00001 diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data new file mode 100644 index 0000000000000..d2f04fdc56736 --- /dev/null +++ b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data @@ -0,0 +1 @@ +53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393 \ No newline at end of file diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..747eb82103473a1761b2986a7c6d1942d0dba4be GIT binary patch literal 1403 zcmd6m&q~8U5XL8wdh;!E-dseG-Yi9s9E4Qxy0+VPG1*CXxAf1!YoA5&F$CX3JSzGK z&L*vCDuU4*3<+c=$#1{;z6}8QaEW@vc*gr{XDi&JuL@tHypc=|itaxU|XqWNBI{pF36N}mKC*T6>j zojgZtxnSrbdVai6xuS9cu(wYg_w0GSCH}1~O~g)f`p~Q4>u&JDZe#Ov<>Wtl=VEwT b$IwjpjqlCILO8DxLUnTo)v^-6?_K) Date: Sun, 27 Jan 2019 20:54:23 +0100 Subject: [PATCH 5/8] [FLINK-11329] Migrate BufferEntrySerializer --- .../operators/co/IntervalJoinOperator.java | 100 +++++++++++------- .../BufferEntrySerializerMigrationTest.java | 57 ++++++++++ .../flink-1.6-buffer-entry-serializer-data | Bin 0 -> 160 bytes ...flink-1.6-buffer-entry-serializer-snapshot | Bin 0 -> 935 bytes .../flink-1.7-buffer-entry-serializer-data | Bin 0 -> 160 bytes ...flink-1.7-buffer-entry-serializer-snapshot | Bin 0 -> 936 bytes 6 files changed, 120 insertions(+), 37 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 43085cb42c475..2ef0c1c5968c8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -22,17 +22,15 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.StateInitializationContext; @@ -361,7 +359,9 @@ public void output(OutputTag outputTag, X value) { * This will contain the element itself along with a flag indicating * if it has been joined or not. */ - private static class BufferEntry { + @Internal + @VisibleForTesting + static class BufferEntry { private final T element; private final boolean hasBeenJoined; @@ -375,13 +375,15 @@ private static class BufferEntry { /** * A {@link TypeSerializer serializer} for the {@link BufferEntry}. */ - private static class BufferEntrySerializer extends TypeSerializer> { + @Internal + @VisibleForTesting + static class BufferEntrySerializer extends TypeSerializer> { private static final long serialVersionUID = -20197698803836236L; private final TypeSerializer elementSerializer; - private BufferEntrySerializer(TypeSerializer elementSerializer) { + BufferEntrySerializer(TypeSerializer elementSerializer) { this.elementSerializer = Preconditions.checkNotNull(elementSerializer); } @@ -464,40 +466,16 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new BufferSerializerConfigSnapshot<>(elementSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof BufferSerializerConfigSnapshot) { - Tuple2, TypeSerializerConfigSnapshot> previousSerializerAndConfig = - ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult compatResult = - CompatibilityUtil.resolveCompatibilityResult( - previousSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousSerializerAndConfig.f1, - elementSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new BufferEntrySerializer<>( - new TypeDeserializerAdapter<>( - compatResult.getConvertDeserializer()))); - } - } - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new BufferEntrySerializerSnapshot<>(this); } } /** * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer. */ - public static class BufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + @Deprecated + public static class BufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; @@ -512,6 +490,54 @@ public BufferSerializerConfigSnapshot(final TypeSerializer userTypeSerializer public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new BufferEntrySerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); + } + } + + /** + * A {@link TypeSerializerSnapshot} for {@link BufferEntrySerializer}. + */ + public static final class BufferEntrySerializerSnapshot + extends CompositeTypeSerializerSnapshot, BufferEntrySerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings({"unused", "WeakerAccess"}) + public BufferEntrySerializerSnapshot() { + super(correspondingSerializerClass()); + } + + BufferEntrySerializerSnapshot(BufferEntrySerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(BufferEntrySerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.elementSerializer}; + } + + @Override + @SuppressWarnings("unchecked") + protected BufferEntrySerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new BufferEntrySerializer<>((TypeSerializer) nestedSerializers[0]); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) BufferEntrySerializer.class; + } } @VisibleForTesting diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java new file mode 100644 index 0000000000000..d4d267386c53d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration tests for {@link BufferEntrySerializer}. + */ +@RunWith(Parameterized.class) +public class BufferEntrySerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + public BufferEntrySerializerMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "buffer-entry-serializer", + BufferEntrySerializer.class, + BufferEntrySerializerSnapshot.class, + () -> new BufferEntrySerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..a4af1fc5caf765da00f55269dc2195738971aeea GIT binary patch literal 160 zcmYk!!4beP2m?WPkXArgfckGNd1Q>AH;Y&gsfM-EndtSmylF;Ei^;UpFcvfpXSeTh Wq$RKWXofD|a_I8!J*#G)Q3$*&5gl{@ literal 0 HcmV?d00001 diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..6141180d0d6008010ac4cd393bf88c9f8c2a6205 GIT binary patch literal 935 zcmd5)J5Izf5S=XI0NjDvX%Njy6pK~}v=T@`6g0?0lU)nP4r4EDwxB}d2sD&5AW8~u zfVc-GoB<{u2&+)=D{y1aSo7x1n+E^^lqa|Zm^X0!fS?2VHIsc$^x5t5QxO_5&iRt68JySE(( z2C#q{Q#8K$_OLwQ#$nYsEE{v6!q}#%HKETl9DAi*xXrJot~()EYB4T&R)yldP|Oxq z3P8{Sm}ES5_iG^g(xHevu^J7Dk5~k~Zy?QbiWpU+wUQ%E+}VRYl-}+nYJX!6xsY5otxca LC29%w8@XNqU`ar0 literal 0 HcmV?d00001 diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..36c9dc75b657d229d94e11a51e945b3c36889c40 GIT binary patch literal 160 zcmYkx!4beP2m`Qe&{b@V3GBaEKfNYToz~+*W^sBuXRpr@J*1C?oDTl40D_lgYUaG5 UQXW?U?c)}$`kg>lT;?u^UJNuG{r~^~ literal 0 HcmV?d00001 diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..af92e1b8fb8a4717a11c758b48731264d913c47d GIT binary patch literal 936 zcmd5*y-EW?5S}~m0epwldyUuxEFvkC1B(k5HnN7fWN~lzoZV}|*itj?PSjenP3qpWt&a}I~138fnMdFDCXoskwYAk}@3P_7HC5)=kTT|T3 zZ=b|PzZ~uiF8Lu;8kIU#_R8+OiM=$s{aXU-5Qw>kFj!%+%Mx{#17AO&LjKF{J^{I* BM!o<5 literal 0 HcmV?d00001 From 0fa9adb0b8f77a523ed4d90014f8d72305321328 Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Mon, 28 Jan 2019 16:32:14 +0100 Subject: [PATCH 6/8] [FLINK-11329] Migrate StreamElementSerializer --- .../streamrecord/StreamElementSerializer.java | 93 ++++++++++-------- .../StreamElementSerializerMigrationTest.java | 55 +++++++++++ .../flink-1.6-stream-element-serializer-data | Bin 0 -> 158 bytes ...ink-1.6-stream-element-serializer-snapshot | Bin 0 -> 931 bytes .../flink-1.7-stream-element-serializer-data | Bin 0 -> 158 bytes ...ink-1.7-stream-element-serializer-snapshot | Bin 0 -> 932 bytes 6 files changed, 109 insertions(+), 39 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data create mode 100644 flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index f84ff15a6ee91..e8af0f9cdaf6c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -19,15 +19,11 @@ package org.apache.flink.streaming.runtime.streamrecord; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -283,56 +279,75 @@ public int hashCode() { // -------------------------------------------------------------------------------------------- @Override - public StreamElementSerializerConfigSnapshot snapshotConfiguration() { - return new StreamElementSerializerConfigSnapshot<>(typeSerializer); + public StreamElementSerializerSnapshot snapshotConfiguration() { + return new StreamElementSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - Tuple2, TypeSerializerSnapshot> previousTypeSerializerAndConfig; + /** + * Configuration snapshot specific to the {@link StreamElementSerializer}. + * @deprecated see {@link StreamElementSerializerSnapshot}. + */ + @Deprecated + public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { - // we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer - if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { - previousTypeSerializerAndConfig = - ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - } else { - return CompatibilityResult.requiresMigration(); - } + private static final int VERSION = 1; - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousTypeSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousTypeSerializerAndConfig.f1, - typeSerializer); + /** This empty nullary constructor is required for deserializing the configuration. */ + public StreamElementSerializerConfigSnapshot() {} - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new StreamElementSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } else { - return CompatibilityResult.requiresMigration(); + @Override + public int getVersion() { + return VERSION; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new StreamElementSerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); } } /** * Configuration snapshot specific to the {@link StreamElementSerializer}. */ - public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class StreamElementSerializerSnapshot + extends CompositeTypeSerializerSnapshot> { - private static final int VERSION = 1; + private static final int VERSION = 2; - /** This empty nullary constructor is required for deserializing the configuration. */ - public StreamElementSerializerConfigSnapshot() {} + @SuppressWarnings("WeakerAccess") + public StreamElementSerializerSnapshot() { + super(serializerClass()); + } - public StreamElementSerializerConfigSnapshot(TypeSerializer typeSerializer) { - super(typeSerializer); + StreamElementSerializerSnapshot(StreamElementSerializer serializerInstance) { + super(serializerInstance); } @Override - public int getVersion() { + protected int getCurrentOuterSnapshotVersion() { return VERSION; } + + @Override + protected TypeSerializer[] getNestedSerializers(StreamElementSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()}; + } + + @Override + protected StreamElementSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer casted = (TypeSerializer) nestedSerializers[0]; + + return new StreamElementSerializer<>(casted); + } + + @SuppressWarnings("unchecked") + private static Class> serializerClass() { + Class streamElementSerializerClass = StreamElementSerializer.class; + return (Class>) streamElementSerializerClass; + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java new file mode 100644 index 0000000000000..b6169d48a5729 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * Migration tests for {@link StreamElementSerializer}. + */ +@RunWith(Parameterized.class) +public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public StreamElementSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "stream-element-serializer", + StreamElementSerializer.class, + StreamElementSerializerSnapshot.class, + () -> new StreamElementSerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..81b80c3422c2b1a3febea536e6d59a75cdfdb010 GIT binary patch literal 158 zcmZQHOi9bm%E?M8PRq$o$pVwbSwMDjSw)^so{6E6WoAK2abkX2W_EE(T8^~>69a>D z;|k*q+n5<3fE7wJLTR7^1{M|&hmjGYIJZ=V2*q4w$rZVIKznl;Ia1QHvT{<=0LbMZ A6aWAK literal 0 HcmV?d00001 diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..8ffdb43b17b32db0a2e112ecd0b1d8ef05e12b98 GIT binary patch literal 931 zcmd5)J5Izf5S=Wd}6bqusd(kx*IfN69a?u z^hL}jF3b!NzzU@qp)^nd0}Bg?!^j9x9F!hOgkr9;J5S?9#mQzsM4Wda0ifGa~7ck3YW8u$YJIbe^jl@kr90Q4qP=dq}7<(7O zf{=(v5zCFoe$VgC8vrQ5IA1f_$ZCpgqLsa3A$pWX*_!#8jmltG@mM*3$wa;$YBXr0 zK(C}$EA&Ud)X$uqs9IRrgsF=w@%RP+1?}61S2zeB&UYk$Xwxc6C`ND?mkn;jMK~GJ zH@x-9vrjrGUO8i&<>~b-DjoQ^hZ`(B9RJWqK-L1;Cy@1lt(GJP77dvk{Eg&7222g6 z=5NG*AXbgh+2!5({)x?pN!&Hu&=BNAQYv}9&~e?>p1YLx*1#?TBCjQk Vw`lD5h|=))GW(eK%>19;y#v_(L16#@ literal 0 HcmV?d00001 From 307f6ee394629ca9fef15c662e3cf65fb6d3d38a Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Mon, 28 Jan 2019 16:56:50 +0100 Subject: [PATCH 7/8] [FLINK-11329] Migrating ScalaOptionSerializer --- .../ScalaOptionSerializerConfigSnapshot.java | 11 ++++ .../ScalaOptionSerializerSnapshot.java | 62 ++++++++++++++++++ .../scala/typeutils/OptionSerializer.scala | 41 +----------- ...OptionSerializerSnapshotMigrationTest.java | 58 ++++++++++++++++ .../flink-1.6-scala-option-serializer-data | Bin 0 -> 31 bytes ...flink-1.6-scala-option-serializer-snapshot | Bin 0 -> 876 bytes .../flink-1.7-scala-option-serializer-data | Bin 0 -> 31 bytes ...flink-1.7-scala-option-serializer-snapshot | Bin 0 -> 877 bytes 8 files changed, 133 insertions(+), 39 deletions(-) create mode 100644 flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java create mode 100644 flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java create mode 100644 flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data create mode 100644 flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot create mode 100644 flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data create mode 100644 flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java index 215bd447c05d4..e6bc88c518a05 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java @@ -19,8 +19,10 @@ package org.apache.flink.api.scala.typeutils; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import scala.Option; @@ -31,6 +33,7 @@ * allow calling different base class constructors from subclasses, while we need that * for the default empty constructor. */ +@Deprecated public final class ScalaOptionSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; @@ -46,4 +49,12 @@ public ScalaOptionSerializerConfigSnapshot(TypeSerializer elementSerializer) public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new ScalaOptionSerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); + } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java new file mode 100644 index 0000000000000..dfa91780c9cc6 --- /dev/null +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import scala.Option; + +/** + * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}. + */ +public final class ScalaOptionSerializerSnapshot extends CompositeTypeSerializerSnapshot, OptionSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings("WeakerAccess") + public ScalaOptionSerializerSnapshot() { + super(underlyingClass()); + } + + public ScalaOptionSerializerSnapshot(OptionSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(OptionSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.elemSerializer()}; + } + + @Override + protected OptionSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") TypeSerializer nestedSerializer = (TypeSerializer) nestedSerializers[0]; + return new OptionSerializer<>(nestedSerializer); + } + + @SuppressWarnings("unchecked") + private static Class> underlyingClass() { + return (Class>) (Class) OptionSerializer.class; + } +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index 7f3aa8cd81f59..ea8f22ad452e4 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) // Serializer configuration snapshotting & compatibility // -------------------------------------------------------------------------------------------- - override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = { - new ScalaOptionSerializerConfigSnapshot[A](elemSerializer) - } - - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = { - - configSnapshot match { - case optionSerializerConfigSnapshot - : ScalaOptionSerializerConfigSnapshot[A] => - ensureCompatibilityInternal(optionSerializerConfigSnapshot) - case legacyOptionSerializerConfigSnapshot - : OptionSerializer.OptionSerializerConfigSnapshot[A] => - ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot) - case _ => CompatibilityResult.requiresMigration() - } - } - - private def ensureCompatibilityInternal( - compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]]) - : CompatibilityResult[Option[A]] = { - - val compatResult = CompatibilityUtil.resolveCompatibilityResult( - compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0, - classOf[UnloadableDummyTypeSerializer[_]], - compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1, - elemSerializer) - - if (compatResult.isRequiresMigration) { - if (compatResult.getConvertDeserializer != null) { - CompatibilityResult.requiresMigration( - new OptionSerializer[A]( - new TypeDeserializerAdapter(compatResult.getConvertDeserializer))) - } else { - CompatibilityResult.requiresMigration() - } - } else { - CompatibilityResult.compatible() - } + override def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]] = { + new ScalaOptionSerializerSnapshot[A](this) } } diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java new file mode 100644 index 0000000000000..efc94ec4b8439 --- /dev/null +++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +import scala.Option; + +/** + * Migration test for the {@link ScalaEitherSerializerSnapshot}. + */ +@RunWith(Parameterized.class) +public class ScalaOptionSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + private static final String SPEC_NAME = "scala-option-serializer"; + + public ScalaOptionSerializerSnapshotMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + SPEC_NAME, + OptionSerializer.class, + ScalaOptionSerializerSnapshot.class, + () -> new OptionSerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..3cdb252f8c9914cae2794b9d4b6f7a970063204b GIT binary patch literal 31 ScmZQ%FHT7VB1SZt0RjMv^9P3j literal 0 HcmV?d00001 diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..d7be0c22891bbac3c24f14de696f941b7d674482 GIT binary patch literal 876 zcmd6lF-`+95Jks9wA_QnZh$5o2tg4EBvaCu)nsGEYcIBeO$tN@R{?PkYVLqU16M#y z)`882EkID<#vWVq^yhyA0E2q5D}a?_I#SK5`&U>E9yXoNxhhpQLOwLwj;YW*WXdQW zCncUmZ2~_iqjpxJ*UIQA`kVFemOVb7_kayx4Gk8}TZF9vK}CX!8cKy0nJWrs#V1u9 z5x?o{R~|Mx)uhG+Iqk+Y9k3oiRnkcP??FyfKrW-#Hg7~MvOH}6fYjC`iE1!9JGs5S zd*BzlGjXSI3AB^{`<>ur*X1mQW$SmG9qL?KRfds^Db0_B;f3JS`Q6hz;eTt#U&LL{ by4UIEmtS+JyecrFHyA=Yu4HNr`;A@ieK#nd literal 0 HcmV?d00001 diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data new file mode 100644 index 0000000000000000000000000000000000000000..3cdb252f8c9914cae2794b9d4b6f7a970063204b GIT binary patch literal 31 ScmZQ%FHT7VB1SZt0RjMv^9P3j literal 0 HcmV?d00001 diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot new file mode 100644 index 0000000000000000000000000000000000000000..bdedf0e84250f6aec258d4eb19ebe55ae02d315a GIT binary patch literal 877 zcmZQzU|?eK$S+FQODsrC&Pdfu%gM~k268g>ijxy_67@IR!kN!S4Zq!cev literal 0 HcmV?d00001 From 688b91007b3c1ea32afb39e26c49879ade74a393 Mon Sep 17 00:00:00 2001 From: Igal Shilman Date: Mon, 28 Jan 2019 20:21:55 +0100 Subject: [PATCH 8/8] [FLINK-11329] Adjusting checkstyle --- .../TypeSerializerSnapshotMigrationTestBase.java | 15 ++++++++------- .../flink/streaming/tests/verify/ValueWithTs.java | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java index 419861da20272..2e363fb278e78 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java @@ -25,11 +25,9 @@ import org.apache.flink.testutils.migration.MigrationVersion; import org.apache.flink.util.TestLogger; -import org.hamcrest.CoreMatchers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; - import org.junit.Test; import java.io.IOException; @@ -196,6 +194,10 @@ private static Path resourcePath(String resourceName) { // Test Specification // -------------------------------------------------------------------------------------------------------------- + /** + * Test Specification. + */ + @SuppressWarnings("WeakerAccess") protected static final class TestSpecification { private final Class> serializerType; private final Class> snapshotClass; @@ -208,7 +210,7 @@ protected static final class TestSpecification { private int testDataCount; @SuppressWarnings("unchecked") - private Matcher testDataElementMatcher = (Matcher)notNullValue(); + private Matcher testDataElementMatcher = (Matcher) notNullValue(); @SuppressWarnings("unchecked") public static TestSpecification builder( @@ -285,10 +287,6 @@ private MigrationVersion getTestMigrationVersion() { return testMigrationVersion; } - public Class> getSnapshotClass() { - return snapshotClass; - } - @Override public String toString() { return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName()); @@ -435,6 +433,9 @@ private static String getSpecNameForVersion(String baseName, MigrationVersion te } } + /** + * Supplier of paths based on {@link MigrationVersion}. + */ protected interface TestResourceFilenameSupplier { String get(MigrationVersion testVersion); } diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java index ba178f4aaa701..7070c6d0c08b2 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java @@ -118,7 +118,7 @@ public TypeSerializerSnapshot> snapshotConfiguration() { */ public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot, Serializer> { - private final static int VERSION = 2; + private static final int VERSION = 2; @SuppressWarnings("unused") public ValueWithTsSerializerSnapshot() {