diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java index 2db7a309ab647..8a0c9034eb011 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java @@ -204,13 +204,10 @@ public boolean canEqual(Object obj) { return obj != null && getClass().equals(obj.getClass()); } - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new ConfigSnapshot(fieldSerializers); - } - @Override public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + // We can not remove this method, as long as we support restoring into CompositeTypeSerializerConfigSnapshot. + // Previously (pre 1.8), multiple composite serializers were using this class directly as their snapshot class. if (configSnapshot instanceof ConfigSnapshot) { List, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = ((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); @@ -302,6 +299,7 @@ static PrecomputedParameters precompute( } /** Snapshot field serializers of composite type. */ + @Deprecated public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 0; diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java index fe392e4b1cb75..72b8620a779f7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java @@ -19,24 +19,23 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; import java.io.IOException; -import java.util.List; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; /** * Serializer wrapper to add support of {@code null} value serialization. @@ -65,12 +64,15 @@ public class NullableSerializer extends TypeSerializer { private final byte[] padding; private NullableSerializer(@Nonnull TypeSerializer originalSerializer, boolean padNullValueIfFixedLen) { - this.originalSerializer = originalSerializer; - this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen); + this(originalSerializer, createPadding(originalSerializer.getLength(), padNullValueIfFixedLen)); + } + private NullableSerializer(@Nonnull TypeSerializer originalSerializer, byte[] padding) { + this.originalSerializer = originalSerializer; + this.padding = padding; } - private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) { + private static byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) { boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen; return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY; } @@ -79,7 +81,7 @@ private static byte[] createPadding(int originalSerializerLength, boolean pa * This method tries to serialize {@code null} value with the {@code originalSerializer} * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}. * - * @param originalSerializer serializer to wrap and add {@code null} support + * @param originalSerializer serializer to wrap and add {@code null} support * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer * @return serializer which supports {@code null} values */ @@ -99,22 +101,24 @@ public static boolean checkIfNullSupported(@Nonnull TypeSerializer serial DataOutputSerializer dos = new DataOutputSerializer(length); try { serializer.serialize(null, dos); - } catch (IOException | RuntimeException e) { + } + catch (IOException | RuntimeException e) { return false; } - Preconditions.checkArgument( + checkArgument( serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length, "The serialized form of the null value should have the same length " + "as any other if the length is fixed in the serializer"); DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer()); try { - Preconditions.checkArgument(serializer.deserialize(dis) == null); - } catch (IOException e) { + checkArgument(serializer.deserialize(dis) == null); + } + catch (IOException e) { throw new RuntimeException( String.format("Unexpected failure to deserialize just serialized null value with %s", serializer.getClass().getName()), e); } - Preconditions.checkArgument( + checkArgument( serializer.copy(null) == null, "Serializer %s has to be able properly copy null value if it can serialize it", serializer.getClass().getName()); @@ -125,10 +129,18 @@ private boolean padNullValue() { return padding.length > 0; } + private int nullPaddingLength() { + return padding.length; + } + + private TypeSerializer originalSerializer() { + return originalSerializer; + } + /** * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped. * - * @param originalSerializer serializer to wrap and add {@code null} support + * @param originalSerializer serializer to wrap and add {@code null} support * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer * @return wrapped serializer which supports {@code null} values */ @@ -176,7 +188,8 @@ public void serialize(T record, DataOutputView target) throws IOException { if (record == null) { target.writeBoolean(true); target.write(padding); - } else { + } + else { target.writeBoolean(false); originalSerializer.serialize(record, target); } @@ -209,7 +222,8 @@ public void copy(DataInputView source, DataOutputView target) throws IOException target.writeBoolean(isNull); if (isNull) { target.write(padding); - } else { + } + else { originalSerializer.copy(source, target); } } @@ -233,45 +247,25 @@ public int hashCode() { } @Override - public NullableSerializerConfigSnapshot snapshotConfiguration() { - return new NullableSerializerConfigSnapshot<>(originalSerializer); + public TypeSerializerSnapshot snapshotConfiguration() { + return new NullableSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof NullableSerializerConfigSnapshot) { - List, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = - ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousKvSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousKvSerializersAndConfigs.get(0).f1, - originalSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new NullableSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue())); - } - } - - return CompatibilityResult.requiresMigration(); - } /** * Configuration snapshot for serializers of nullable types, containing the * configuration snapshot of its original serializer. */ + @Deprecated @Internal - public static class NullableSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static class NullableSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { private static final int VERSION = 1; - /** This empty nullary constructor is required for deserializing the configuration. */ - @SuppressWarnings("unused") - public NullableSerializerConfigSnapshot() {} + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public NullableSerializerConfigSnapshot() { + } NullableSerializerConfigSnapshot(TypeSerializer originalSerializer) { super(originalSerializer); @@ -281,5 +275,88 @@ public NullableSerializerConfigSnapshot() {} public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + NullableSerializer previousSerializer = (NullableSerializer) restoreSerializer(); + NullableSerializerSnapshot newCompositeSnapshot = new NullableSerializerSnapshot<>(previousSerializer.nullPaddingLength()); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + newCompositeSnapshot, + getSingleNestedSerializerAndConfig().f1 + ); + } + } + + /** + * Snapshot for serializers of nullable types, containing the + * snapshot of its original serializer. + */ + @SuppressWarnings({"unchecked", "WeakerAccess"}) + public static class NullableSerializerSnapshot extends CompositeTypeSerializerSnapshot> { + + private static final int VERSION = 2; + private int nullPaddingLength; + + @SuppressWarnings("unused") + public NullableSerializerSnapshot() { + super(serializerClass()); + } + + public NullableSerializerSnapshot(NullableSerializer serializerInstance) { + super(serializerInstance); + this.nullPaddingLength = serializerInstance.nullPaddingLength(); + } + + private NullableSerializerSnapshot(int nullPaddingLength) { + super(serializerClass()); + checkArgument(nullPaddingLength >= 0, + "Computed NULL padding can not be negative. %d", + nullPaddingLength); + + this.nullPaddingLength = nullPaddingLength; + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(NullableSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.originalSerializer()}; + } + + @Override + protected NullableSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + checkState(nullPaddingLength >= 0, + "Negative padding size after serializer construction: %d", + nullPaddingLength); + + final byte[] padding = (nullPaddingLength == 0) ? EMPTY_BYTE_ARRAY : new byte[nullPaddingLength]; + TypeSerializer nestedSerializer = (TypeSerializer) nestedSerializers[0]; + return new NullableSerializer<>(nestedSerializer, padding); + } + + @Override + protected void writeOuterSnapshot(DataOutputView out) throws IOException { + out.writeInt(nullPaddingLength); + } + + @Override + protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + nullPaddingLength = in.readInt(); + } + + @Override + protected boolean isOuterSnapshotCompatible(NullableSerializer newSerializer) { + return nullPaddingLength == newSerializer.nullPaddingLength(); + } + + private static Class> serializerClass() { + return (Class>) (Class) NullableSerializer.class; + } } + } diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java index e29f68132540e..47ff695a9c9a2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java @@ -15,18 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.Row; @@ -34,7 +32,6 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.util.Arrays; -import java.util.List; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoAndCopyNullMask; import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask; @@ -95,7 +92,8 @@ public Row copy(Row from) { if (fromField != null) { Object copy = fieldSerializers[i].copy(fromField); result.setField(i, copy); - } else { + } + else { result.setField(i, null); } } @@ -123,11 +121,13 @@ public Row copy(Row from, Row reuse) { if (reuseField != null) { Object copy = fieldSerializers[i].copy(fromField, reuseField); reuse.setField(i, copy); - } else { + } + else { Object copy = fieldSerializers[i].copy(fromField); reuse.setField(i, copy); } - } else { + } + else { reuse.setField(i, null); } } @@ -163,7 +163,6 @@ public void serialize(Row record, DataOutputView target) throws IOException { } } - @Override public Row deserialize(DataInputView source) throws IOException { int len = fieldSerializers.length; @@ -176,7 +175,8 @@ public Row deserialize(DataInputView source) throws IOException { for (int i = 0; i < len; i++) { if (nullMask[i]) { result.setField(i, null); - } else { + } + else { result.setField(i, fieldSerializers[i].deserialize(source)); } } @@ -198,11 +198,13 @@ public Row deserialize(Row reuse, DataInputView source) throws IOException { for (int i = 0; i < len; i++) { if (nullMask[i]) { reuse.setField(i, null); - } else { + } + else { Object reuseField = reuse.getField(i); if (reuseField != null) { reuse.setField(i, fieldSerializers[i].deserialize(reuseField, source)); - } else { + } + else { reuse.setField(i, fieldSerializers[i].deserialize(source)); } } @@ -260,73 +262,80 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility + // Serializer configuration snapshoting & compatibility // -------------------------------------------------------------------------------------------- @Override - public RowSerializerConfigSnapshot snapshotConfiguration() { - return new RowSerializerConfigSnapshot(fieldSerializers); + public TypeSerializerSnapshot snapshotConfiguration() { + return new RowSerializerSnapshot(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof RowSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousFieldSerializersAndConfigs = - ((RowSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - if (previousFieldSerializersAndConfigs.size() == fieldSerializers.length) { - boolean requireMigration = false; - TypeSerializer[] convertDeserializers = new TypeSerializer[fieldSerializers.length]; - - CompatibilityResult compatResult; - int i = 0; - for (Tuple2, TypeSerializerSnapshot> f : previousFieldSerializersAndConfigs) { - compatResult = CompatibilityUtil.resolveCompatibilityResult( - f.f0, - UnloadableDummyTypeSerializer.class, - f.f1, - fieldSerializers[i]); - - if (compatResult.isRequiresMigration()) { - requireMigration = true; - - if (compatResult.getConvertDeserializer() == null) { - // one of the field serializers cannot provide a fallback deserializer - return CompatibilityResult.requiresMigration(); - } else { - convertDeserializers[i] = - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()); - } - } + /** + * A snapshot for {@link RowSerializer}. + */ + @Deprecated + public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { - i++; - } + private static final int VERSION = 1; - if (requireMigration) { - return CompatibilityResult.requiresMigration(new RowSerializer(convertDeserializers)); - } else { - return CompatibilityResult.compatible(); - } - } + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public RowSerializerConfigSnapshot() { + } + + public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) { + super(fieldSerializers); } - return CompatibilityResult.requiresMigration(); + @Override + public int getVersion() { + return VERSION; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + TypeSerializerSnapshot[] nestedSnapshots = getNestedSerializersAndConfigs() + .stream() + .map(t -> t.f1) + .toArray(TypeSerializerSnapshot[]::new); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new RowSerializerSnapshot(), + nestedSnapshots); + } } - public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + /** + * A {@link TypeSerializerSnapshot} for RowSerializer. + */ + public static final class RowSerializerSnapshot extends CompositeTypeSerializerSnapshot { - private static final int VERSION = 1; + private static final int VERSION = 2; - /** This empty nullary constructor is required for deserializing the configuration. */ - public RowSerializerConfigSnapshot() {} + @SuppressWarnings("WeakerAccess") + public RowSerializerSnapshot() { + super(RowSerializer.class); + } - public RowSerializerConfigSnapshot(TypeSerializer[] fieldSerializers) { - super(fieldSerializers); + RowSerializerSnapshot(RowSerializer serializerInstance) { + super(serializerInstance); } @Override - public int getVersion() { + protected int getCurrentOuterSnapshotVersion() { return VERSION; } + + @Override + protected TypeSerializer[] getNestedSerializers(RowSerializer outerSerializer) { + return outerSerializer.fieldSerializers; + } + + @Override + protected RowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new RowSerializer(nestedSerializers); + } } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java index fc5c241c20566..719ea493ab76f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeSerializerTest.java @@ -191,6 +191,11 @@ protected CompositeSerializer> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { return new TestListCompositeSerializer(precomputed, originalSerializers); } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + throw new UnsupportedOperationException(); + } } private static class CompositeSerializerTestInstance extends SerializerTestInstance> { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java index 57c939d54b0d0..2e363fb278e78 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java @@ -28,7 +28,6 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; - import org.junit.Test; import java.io.IOException; @@ -89,9 +88,11 @@ public void restoredSerializerIsAbleToDeserializePreviousData() throws IOExcepti TypeSerializer serializer = snapshot.restoreSerializer(); DataInputView input = dataUnderTest(); + + final Matcher matcher = testSpecification.testDataElementMatcher; for (int i = 0; i < testSpecification.testDataCount; i++) { final ElementT result = serializer.deserialize(input); - assertThat(result, notNullValue()); + assertThat(result, matcher); } } @@ -193,6 +194,10 @@ private static Path resourcePath(String resourceName) { // Test Specification // -------------------------------------------------------------------------------------------------------------- + /** + * Test Specification. + */ + @SuppressWarnings("WeakerAccess") protected static final class TestSpecification { private final Class> serializerType; private final Class> snapshotClass; @@ -204,6 +209,9 @@ protected static final class TestSpecification { private String testDataLocation; private int testDataCount; + @SuppressWarnings("unchecked") + private Matcher testDataElementMatcher = (Matcher) notNullValue(); + @SuppressWarnings("unchecked") public static TestSpecification builder( String name, @@ -253,6 +261,11 @@ public TestSpecification withTestData(String testDataLocation, int testDataCo return this; } + public TestSpecification withTestDataMatcher(Matcher matcher) { + testDataElementMatcher = matcher; + return this; + } + private TypeSerializer createSerializer() { try { return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get(); @@ -274,10 +287,6 @@ private MigrationVersion getTestMigrationVersion() { return testMigrationVersion; } - public Class> getSnapshotClass() { - return snapshotClass; - } - @Override public String toString() { return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName()); @@ -341,6 +350,45 @@ public void add( } } + /** + * Adds a test specification to be tested for all specified test versions. + * + *

This method adds the specification with pre-defined snapshot and data filenames, + * with the format "flink-<testVersion>-<specName>-<data/snapshot>", + * and each specification's test data count is assumed to always be 10. + * + * @param name test specification name. + * @param serializerClass class of the current serializer. + * @param snapshotClass class of the current serializer snapshot class. + * @param serializerProvider provider for an instance of the current serializer. + * @param elementMatcher an {@code hamcrest} matcher to match test data. + * + * @param type of the test data. + */ + public void add( + String name, + Class serializerClass, + Class snapshotClass, + Supplier> serializerProvider, + Matcher elementMatcher) { + for (MigrationVersion testVersion : testVersions) { + testSpecifications.add( + TestSpecification.builder( + getSpecNameForVersion(name, testVersion), + serializerClass, + snapshotClass, + testVersion) + .withNewSerializerProvider(serializerProvider) + .withSnapshotDataLocation( + String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name)) + .withTestData( + String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name), + DEFAULT_TEST_DATA_COUNT) + .withTestDataMatcher(elementMatcher) + ); + } + } + /** * Adds a test specification to be tested for all specified test versions. * @@ -385,6 +433,9 @@ private static String getSpecNameForVersion(String baseName, MigrationVersion te } } + /** + * Supplier of paths based on {@link MigrationVersion}. + */ protected interface TestResourceFilenameSupplier { String get(MigrationVersion testVersion); } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java new file mode 100644 index 0000000000000..1da7da7bcd0c4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerMigrationTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer.NullableSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * {@link NullableSerializer} migration test. + */ +@RunWith(Parameterized.class) +public class NullableSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public NullableSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "nullable-padded-serializer", + NullableSerializer.class, + NullableSerializerSnapshot.class, + () -> NullableSerializer.wrap(LongSerializer.INSTANCE, true), + NULL_OR_LONG); + + testSpecifications.add( + "nullable-not-padded-serializer", + NullableSerializer.class, + NullableSerializerSnapshot.class, + () -> NullableSerializer.wrap(LongSerializer.INSTANCE, false), + NULL_OR_LONG); + + return testSpecifications.get(); + } + + @SuppressWarnings("unchecked") + private static final Matcher NULL_OR_LONG = new NullableMatcher(); + + private static final class NullableMatcher extends BaseMatcher { + + @Override + public boolean matches(Object item) { + return item == null || item instanceof Long; + } + + @Override + public void describeTo(Description description) { + description.appendText("a null or a long"); + } + } + +} diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java new file mode 100644 index 0000000000000..7a7888aca61b4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/RowSerializerMigrationTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.RowSerializer.RowSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; +import org.apache.flink.types.Row; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration test for {@link RowSerializer}. + */ +@RunWith(Parameterized.class) +public class RowSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public RowSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "row-serializer", + RowSerializer.class, + RowSerializerSnapshot.class, + RowSerializerMigrationTest::stringLongRowSupplier); + + return testSpecifications.get(); + } + + private static TypeSerializer stringLongRowSupplier() { + RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + return rowTypeInfo.createSerializer(new ExecutionConfig()); + } +} diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data new file mode 100644 index 0000000000000..ca6d29f57679c Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-data differ diff --git a/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot new file mode 100644 index 0000000000000..8c8b27f0ea315 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-not-padded-serializer-snapshot differ diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data new file mode 100644 index 0000000000000..42439f7bcd29f Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-data differ diff --git a/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot new file mode 100644 index 0000000000000..9bd3af75b1d8b Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-nullable-padded-serializer-snapshot differ diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-data b/flink-core/src/test/resources/flink-1.6-row-serializer-data new file mode 100644 index 0000000000000..2cd0bd312d8c2 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-row-serializer-data differ diff --git a/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot new file mode 100644 index 0000000000000..9200e23bfec37 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.6-row-serializer-snapshot differ diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data new file mode 100644 index 0000000000000..aecf5b80e5766 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-data differ diff --git a/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot new file mode 100644 index 0000000000000..a2e0807bd9345 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-not-padded-serializer-snapshot differ diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data new file mode 100644 index 0000000000000..426221eddd6d5 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-data differ diff --git a/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot new file mode 100644 index 0000000000000..685f8dd5afacd Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-nullable-padded-serializer-snapshot differ diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-data b/flink-core/src/test/resources/flink-1.7-row-serializer-data new file mode 100644 index 0000000000000..41aa07837b417 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-row-serializer-data differ diff --git a/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot b/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot new file mode 100644 index 0000000000000..f632dc4ec17c0 Binary files /dev/null and b/flink-core/src/test/resources/flink-1.7-row-serializer-snapshot differ diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java index f9e492eb91bb3..18f6e358e914a 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/TtlVerifyUpdateFunction.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -143,7 +144,10 @@ public void initializeState(FunctionInitializationContext context) { prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream() .collect(Collectors.toMap(TtlStateVerifier::getId, v -> { checkNotNull(v); - TypeSerializer> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer()); + final TypeSerializer> typeSerializer = new ValueWithTs.Serializer( + v.getUpdateSerializer(), + LongSerializer.INSTANCE); + ListStateDescriptor> stateDesc = new ListStateDescriptor<>( "TtlPrevValueState_" + v.getId(), typeSerializer); KeyedStateStore store = context.getKeyedStateStore(); diff --git a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java index a4f30804e4fd2..7070c6d0c08b2 100644 --- a/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java +++ b/flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/ValueWithTs.java @@ -19,8 +19,9 @@ package org.apache.flink.streaming.tests.verify; import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nonnull; @@ -56,8 +57,8 @@ public String toString() { /** Serializer for Serializer. */ public static class Serializer extends CompositeSerializer> { - public Serializer(TypeSerializer userValueSerializer) { - super(true, userValueSerializer, LongSerializer.INSTANCE); + public Serializer(TypeSerializer valueSerializer, TypeSerializer timestampSerializer) { + super(true, valueSerializer, timestampSerializer); } @SuppressWarnings("unchecked") @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs value, int index) { protected CompositeSerializer> createSerializerInstance( PrecomputedParameters precomputed, TypeSerializer... originalSerializers) { - return new Serializer(precomputed, (TypeSerializer) originalSerializers[0]); + + return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]); + } + + TypeSerializer getValueSerializer() { + return fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer getTimestampSerializer() { + TypeSerializer fieldSerializer = fieldSerializers[1]; + return (TypeSerializer) fieldSerializer; + } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new ValueWithTsSerializerSnapshot(this); + } + } + + /** + * A {@link TypeSerializerSnapshot} for ValueWithTs Serializer. + */ + public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot, Serializer> { + + private static final int VERSION = 2; + + @SuppressWarnings("unused") + public ValueWithTsSerializerSnapshot() { + super(Serializer.class); + } + + ValueWithTsSerializerSnapshot(Serializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(Serializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()}; + } + + @SuppressWarnings("unchecked") + @Override + protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + TypeSerializer valueSerializer = nestedSerializers[0]; + TypeSerializer timeSerializer = (TypeSerializer) nestedSerializers[1]; + return new Serializer(valueSerializer, timeSerializer); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 453f43aded6df..6f05e1c2f7a26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -28,7 +28,9 @@ import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.KeyedStateBackend; @@ -126,7 +128,7 @@ private IS createState() throws Exception { @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor)); } @@ -134,7 +136,7 @@ private IS createValueState() throws Exception { private IS createListState() throws Exception { ListStateDescriptor listStateDesc = (ListStateDescriptor) stateDesc; ListStateDescriptor> ttlDescriptor = new ListStateDescriptor<>( - stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); + stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, listStateDesc.getElementSerializer())); return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor)); } @@ -144,7 +146,7 @@ private IS createMapState() throws Exception { MapStateDescriptor> ttlDescriptor = new MapStateDescriptor<>( stateDesc.getName(), mapStateDesc.getKeySerializer(), - new TtlSerializer<>(mapStateDesc.getValueSerializer())); + new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor)); } @@ -154,7 +156,7 @@ private IS createReducingState() throws Exception { ReducingStateDescriptor> ttlDescriptor = new ReducingStateDescriptor<>( stateDesc.getName(), new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), - new TtlSerializer<>(stateDesc.getSerializer())); + new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor)); } @@ -165,7 +167,7 @@ private IS createAggregatingState() throws Exception { TtlAggregateFunction ttlAggregateFunction = new TtlAggregateFunction<>( aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider); AggregatingStateDescriptor, OUT> ttlDescriptor = new AggregatingStateDescriptor<>( - stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); + stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>(createTtlStateContext(ttlDescriptor), ttlAggregateFunction); } @@ -178,7 +180,7 @@ private IS createFoldingState() throws Exception { stateDesc.getName(), ttlInitAcc, new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), - new TtlSerializer<>(stateDesc.getSerializer())); + new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); return (IS) new TtlFoldingState<>(createTtlStateContext(ttlDescriptor)); } @@ -237,8 +239,8 @@ public static class TtlSerializer extends CompositeSerializer> { private static final long serialVersionUID = 131020282727167064L; @SuppressWarnings("WeakerAccess") - public TtlSerializer(TypeSerializer userValueSerializer) { - super(true, LongSerializer.INSTANCE, userValueSerializer); + public TtlSerializer(TypeSerializer timestampSerializer, TypeSerializer userValueSerializer) { + super(true, timestampSerializer, userValueSerializer); } @SuppressWarnings("WeakerAccess") @@ -272,5 +274,61 @@ protected CompositeSerializer> createSerializerInstance( Preconditions.checkArgument(originalSerializers.length == 2); return new TtlSerializer<>(precomputed, originalSerializers); } + + @SuppressWarnings("unchecked") + TypeSerializer getTimestampSerializer() { + return (TypeSerializer) (TypeSerializer) fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer getValueSerializer() { + return (TypeSerializer) fieldSerializers[1]; + } + + @Override + public TypeSerializerSnapshot> snapshotConfiguration() { + return new TtlSerializerSnapshot<>(this); + } + } + + /** + * A {@link TypeSerializerSnapshot} for TtlSerializer. + */ + public static final class TtlSerializerSnapshot extends CompositeTypeSerializerSnapshot, TtlSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings({"WeakerAccess", "unused"}) + public TtlSerializerSnapshot() { + super(correspondingSerializerClass()); + } + + TtlSerializerSnapshot(TtlSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(TtlSerializer outerSerializer) { + return new TypeSerializer[]{ outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer()}; + } + + @Override + @SuppressWarnings("unchecked") + protected TtlSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + TypeSerializer timestampSerializer = (TypeSerializer) nestedSerializers[0]; + TypeSerializer valueSerializer = (TypeSerializer) nestedSerializers[1]; + + return new TtlSerializer<>(timestampSerializer, valueSerializer); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) TtlSerializer.class; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java new file mode 100644 index 0000000000000..87ab3962c3e50 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlSerializerStateMigrationTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializer; +import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration test for {@link TtlSerializerStateMigrationTest}. + */ +@RunWith(Parameterized.class) +public class TtlSerializerStateMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + private static final String SPEC_NAME = "ttl-serializer"; + + public TtlSerializerStateMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + SPEC_NAME, + TtlSerializer.class, + TtlSerializerSnapshot.class, + () -> new TtlSerializer<>(LongSerializer.INSTANCE, StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} + diff --git a/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data new file mode 100644 index 0000000000000..9d156cae2fd5c Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-data differ diff --git a/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot new file mode 100644 index 0000000000000..0d3a8a0771879 Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.6-ttl-serializer-snapshot differ diff --git a/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data new file mode 100644 index 0000000000000..16c2bda923c48 Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-data differ diff --git a/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot new file mode 100644 index 0000000000000..74aca45a9693d Binary files /dev/null and b/flink-runtime/src/test/resources/flink-1.7-ttl-serializer-snapshot differ diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java index 215bd447c05d4..e6bc88c518a05 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java @@ -19,8 +19,10 @@ package org.apache.flink.api.scala.typeutils; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import scala.Option; @@ -31,6 +33,7 @@ * allow calling different base class constructors from subclasses, while we need that * for the default empty constructor. */ +@Deprecated public final class ScalaOptionSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; @@ -46,4 +49,12 @@ public ScalaOptionSerializerConfigSnapshot(TypeSerializer elementSerializer) public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new ScalaOptionSerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); + } } diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java new file mode 100644 index 0000000000000..dfa91780c9cc6 --- /dev/null +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshot.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import scala.Option; + +/** + * A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}. + */ +public final class ScalaOptionSerializerSnapshot extends CompositeTypeSerializerSnapshot, OptionSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings("WeakerAccess") + public ScalaOptionSerializerSnapshot() { + super(underlyingClass()); + } + + public ScalaOptionSerializerSnapshot(OptionSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(OptionSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.elemSerializer()}; + } + + @Override + protected OptionSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") TypeSerializer nestedSerializer = (TypeSerializer) nestedSerializers[0]; + return new OptionSerializer<>(nestedSerializer); + } + + @SuppressWarnings("unchecked") + private static Class> underlyingClass() { + return (Class>) (Class) OptionSerializer.class; + } +} diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index 7f3aa8cd81f59..ea8f22ad452e4 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) // Serializer configuration snapshotting & compatibility // -------------------------------------------------------------------------------------------- - override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = { - new ScalaOptionSerializerConfigSnapshot[A](elemSerializer) - } - - override def ensureCompatibility( - configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = { - - configSnapshot match { - case optionSerializerConfigSnapshot - : ScalaOptionSerializerConfigSnapshot[A] => - ensureCompatibilityInternal(optionSerializerConfigSnapshot) - case legacyOptionSerializerConfigSnapshot - : OptionSerializer.OptionSerializerConfigSnapshot[A] => - ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot) - case _ => CompatibilityResult.requiresMigration() - } - } - - private def ensureCompatibilityInternal( - compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]]) - : CompatibilityResult[Option[A]] = { - - val compatResult = CompatibilityUtil.resolveCompatibilityResult( - compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0, - classOf[UnloadableDummyTypeSerializer[_]], - compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1, - elemSerializer) - - if (compatResult.isRequiresMigration) { - if (compatResult.getConvertDeserializer != null) { - CompatibilityResult.requiresMigration( - new OptionSerializer[A]( - new TypeDeserializerAdapter(compatResult.getConvertDeserializer))) - } else { - CompatibilityResult.requiresMigration() - } - } else { - CompatibilityResult.compatible() - } + override def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]] = { + new ScalaOptionSerializerSnapshot[A](this) } } diff --git a/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java new file mode 100644 index 0000000000000..efc94ec4b8439 --- /dev/null +++ b/flink-scala/src/test/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerSnapshotMigrationTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.typeutils; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +import scala.Option; + +/** + * Migration test for the {@link ScalaEitherSerializerSnapshot}. + */ +@RunWith(Parameterized.class) +public class ScalaOptionSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + private static final String SPEC_NAME = "scala-option-serializer"; + + public ScalaOptionSerializerSnapshotMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + SPEC_NAME, + OptionSerializer.class, + ScalaOptionSerializerSnapshot.class, + () -> new OptionSerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data new file mode 100644 index 0000000000000..3cdb252f8c991 Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-data differ diff --git a/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot new file mode 100644 index 0000000000000..d7be0c22891bb Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.6-scala-option-serializer-snapshot differ diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data new file mode 100644 index 0000000000000..3cdb252f8c991 Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-data differ diff --git a/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot new file mode 100644 index 0000000000000..bdedf0e84250f Binary files /dev/null and b/flink-scala/src/test/resources/flink-1.7-scala-option-serializer-snapshot differ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 19d978322f09c..bfd74e05cab0f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -25,14 +25,13 @@ import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.translation.WrappingFunction; import org.apache.flink.api.java.tuple.Tuple2; @@ -504,7 +503,9 @@ public boolean canEqual(Object obj) { } } - private static class UnionSerializer extends TypeSerializer> { + @VisibleForTesting + @Internal + static class UnionSerializer extends TypeSerializer> { private static final long serialVersionUID = 1L; private final TypeSerializer oneSerializer; @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot> snapshotConfiguration() { - return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof UnionSerializerConfigSnapshot) { - List, TypeSerializerSnapshot>> previousSerializersAndConfigs = - ((UnionSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); - - CompatibilityResult oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(0).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(0).f1, - oneSerializer); - - CompatibilityResult twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(1).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(1).f1, - twoSerializer); - - if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new UnionSerializer<>( - new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()), - new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new UnionSerializerSnapshot<>(this); } } /** * The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. */ + @Deprecated public static class UnionSerializerConfigSnapshot - extends CompositeTypeSerializerConfigSnapshot> { + extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; - /** This empty nullary constructor is required for deserializing the configuration. */ - public UnionSerializerConfigSnapshot() {} + /** + * This empty nullary constructor is required for deserializing the configuration. + */ + public UnionSerializerConfigSnapshot() { + } public UnionSerializerConfigSnapshot(TypeSerializer oneSerializer, TypeSerializer twoSerializer) { super(oneSerializer, twoSerializer); } + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + List, TypeSerializerSnapshot>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs(); + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new UnionSerializerSnapshot<>(), + nestedSerializersAndConfigs.get(0).f1, + nestedSerializersAndConfigs.get(1).f1 + ); + } + @Override public int getVersion() { return VERSION; } } + /** + * The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}. + */ + public static class UnionSerializerSnapshot + extends CompositeTypeSerializerSnapshot, UnionSerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings("WeakerAccess") + public UnionSerializerSnapshot() { + super(correspondingSerializerClass()); + } + + UnionSerializerSnapshot(UnionSerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(UnionSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer}; + } + + @SuppressWarnings("unchecked") + @Override + protected UnionSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new UnionSerializer<>((TypeSerializer) nestedSerializers[0], (TypeSerializer) nestedSerializers[1]); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) UnionSerializer.class; + } + } + // ------------------------------------------------------------------------ // Utility functions that implement the CoGroup logic based on the tagged // union window reduce diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java index 43085cb42c475..2ef0c1c5968c8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java @@ -22,17 +22,15 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.StateInitializationContext; @@ -361,7 +359,9 @@ public void output(OutputTag outputTag, X value) { * This will contain the element itself along with a flag indicating * if it has been joined or not. */ - private static class BufferEntry { + @Internal + @VisibleForTesting + static class BufferEntry { private final T element; private final boolean hasBeenJoined; @@ -375,13 +375,15 @@ private static class BufferEntry { /** * A {@link TypeSerializer serializer} for the {@link BufferEntry}. */ - private static class BufferEntrySerializer extends TypeSerializer> { + @Internal + @VisibleForTesting + static class BufferEntrySerializer extends TypeSerializer> { private static final long serialVersionUID = -20197698803836236L; private final TypeSerializer elementSerializer; - private BufferEntrySerializer(TypeSerializer elementSerializer) { + BufferEntrySerializer(TypeSerializer elementSerializer) { this.elementSerializer = Preconditions.checkNotNull(elementSerializer); } @@ -464,40 +466,16 @@ public boolean canEqual(Object obj) { } @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new BufferSerializerConfigSnapshot<>(elementSerializer); - } - - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof BufferSerializerConfigSnapshot) { - Tuple2, TypeSerializerConfigSnapshot> previousSerializerAndConfig = - ((BufferSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult compatResult = - CompatibilityUtil.resolveCompatibilityResult( - previousSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousSerializerAndConfig.f1, - elementSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new BufferEntrySerializer<>( - new TypeDeserializerAdapter<>( - compatResult.getConvertDeserializer()))); - } - } - return CompatibilityResult.requiresMigration(); + public TypeSerializerSnapshot> snapshotConfiguration() { + return new BufferEntrySerializerSnapshot<>(this); } } /** * The {@link CompositeTypeSerializerConfigSnapshot configuration} of our serializer. */ - public static class BufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + @Deprecated + public static class BufferSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot> { private static final int VERSION = 1; @@ -512,6 +490,54 @@ public BufferSerializerConfigSnapshot(final TypeSerializer userTypeSerializer public int getVersion() { return VERSION; } + + @Override + public TypeSerializerSchemaCompatibility> resolveSchemaCompatibility(TypeSerializer> newSerializer) { + + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new BufferEntrySerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); + } + } + + /** + * A {@link TypeSerializerSnapshot} for {@link BufferEntrySerializer}. + */ + public static final class BufferEntrySerializerSnapshot + extends CompositeTypeSerializerSnapshot, BufferEntrySerializer> { + + private static final int VERSION = 2; + + @SuppressWarnings({"unused", "WeakerAccess"}) + public BufferEntrySerializerSnapshot() { + super(correspondingSerializerClass()); + } + + BufferEntrySerializerSnapshot(BufferEntrySerializer serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer[] getNestedSerializers(BufferEntrySerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.elementSerializer}; + } + + @Override + @SuppressWarnings("unchecked") + protected BufferEntrySerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + return new BufferEntrySerializer<>((TypeSerializer) nestedSerializers[0]); + } + + @SuppressWarnings("unchecked") + private static Class> correspondingSerializerClass() { + return (Class>) (Class) BufferEntrySerializer.class; + } } @VisibleForTesting diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index f84ff15a6ee91..e8af0f9cdaf6c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -19,15 +19,11 @@ package org.apache.flink.streaming.runtime.streamrecord; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -283,56 +279,75 @@ public int hashCode() { // -------------------------------------------------------------------------------------------- @Override - public StreamElementSerializerConfigSnapshot snapshotConfiguration() { - return new StreamElementSerializerConfigSnapshot<>(typeSerializer); + public StreamElementSerializerSnapshot snapshotConfiguration() { + return new StreamElementSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - Tuple2, TypeSerializerSnapshot> previousTypeSerializerAndConfig; + /** + * Configuration snapshot specific to the {@link StreamElementSerializer}. + * @deprecated see {@link StreamElementSerializerSnapshot}. + */ + @Deprecated + public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { - // we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer - if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { - previousTypeSerializerAndConfig = - ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - } else { - return CompatibilityResult.requiresMigration(); - } + private static final int VERSION = 1; - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousTypeSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousTypeSerializerAndConfig.f1, - typeSerializer); + /** This empty nullary constructor is required for deserializing the configuration. */ + public StreamElementSerializerConfigSnapshot() {} - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new StreamElementSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } else { - return CompatibilityResult.requiresMigration(); + @Override + public int getVersion() { + return VERSION; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new StreamElementSerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); } } /** * Configuration snapshot specific to the {@link StreamElementSerializer}. */ - public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class StreamElementSerializerSnapshot + extends CompositeTypeSerializerSnapshot> { - private static final int VERSION = 1; + private static final int VERSION = 2; - /** This empty nullary constructor is required for deserializing the configuration. */ - public StreamElementSerializerConfigSnapshot() {} + @SuppressWarnings("WeakerAccess") + public StreamElementSerializerSnapshot() { + super(serializerClass()); + } - public StreamElementSerializerConfigSnapshot(TypeSerializer typeSerializer) { - super(typeSerializer); + StreamElementSerializerSnapshot(StreamElementSerializer serializerInstance) { + super(serializerInstance); } @Override - public int getVersion() { + protected int getCurrentOuterSnapshotVersion() { return VERSION; } + + @Override + protected TypeSerializer[] getNestedSerializers(StreamElementSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()}; + } + + @Override + protected StreamElementSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer casted = (TypeSerializer) nestedSerializers[0]; + + return new StreamElementSerializer<>(casted); + } + + @SuppressWarnings("unchecked") + private static Class> serializerClass() { + Class streamElementSerializerClass = StreamElementSerializer.class; + return (Class>) streamElementSerializerClass; + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java new file mode 100644 index 0000000000000..2b85d7e4909a7 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerMigrationTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration tests for {@link UnionSerializer}. + */ +@RunWith(Parameterized.class) +public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + public UnionSerializerMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "union-serializer", + UnionSerializer.class, + UnionSerializerSnapshot.class, + UnionSerializerMigrationTest::stringLongRowSupplier); + + return testSpecifications.get(); + } + + private static TypeSerializer> stringLongRowSupplier() { + return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE); + } + +} + + + diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java new file mode 100644 index 0000000000000..d4d267386c53d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/BufferEntrySerializerMigrationTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntry; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializer; +import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator.BufferEntrySerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * State migration tests for {@link BufferEntrySerializer}. + */ +@RunWith(Parameterized.class) +public class BufferEntrySerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase> { + + public BufferEntrySerializerMigrationTest(TestSpecification> testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "buffer-entry-serializer", + BufferEntrySerializer.class, + BufferEntrySerializerSnapshot.class, + () -> new BufferEntrySerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java new file mode 100644 index 0000000000000..b6169d48a5729 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * Migration tests for {@link StreamElementSerializer}. + */ +@RunWith(Parameterized.class) +public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public StreamElementSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "stream-element-serializer", + StreamElementSerializer.class, + StreamElementSerializerSnapshot.class, + () -> new StreamElementSerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data new file mode 100644 index 0000000000000..a4af1fc5caf76 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-data differ diff --git a/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot new file mode 100644 index 0000000000000..6141180d0d600 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-buffer-entry-serializer-snapshot differ diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data new file mode 100644 index 0000000000000..81b80c3422c2b Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data differ diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot new file mode 100644 index 0000000000000..8ffdb43b17b32 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot differ diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data new file mode 100644 index 0000000000000..cb29a995ec68b --- /dev/null +++ b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-data @@ -0,0 +1 @@ +53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270 \ No newline at end of file diff --git a/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot new file mode 100644 index 0000000000000..178d007196e40 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-union-serializer-snapshot differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data new file mode 100644 index 0000000000000..36c9dc75b657d Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-data differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot new file mode 100644 index 0000000000000..af92e1b8fb8a4 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-buffer-entry-serializer-snapshot differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data new file mode 100644 index 0000000000000..01f05e7e3f994 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot new file mode 100644 index 0000000000000..dc7f76b7144b9 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data new file mode 100644 index 0000000000000..d2f04fdc56736 --- /dev/null +++ b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-data @@ -0,0 +1 @@ +53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393 \ No newline at end of file diff --git a/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot new file mode 100644 index 0000000000000..747eb82103473 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-union-serializer-snapshot differ