Skip to content

Commit

Permalink
[FLINK-11329] Migrating the UnionSerializer
Browse files Browse the repository at this point in the history
  • Loading branch information
Igal Shilman committed Jan 28, 2019
1 parent 3a1a867 commit 317e6b8
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 41 deletions.
Expand Up @@ -25,14 +25,13 @@
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -504,7 +503,9 @@ public boolean canEqual(Object obj) {
}
}

private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
@VisibleForTesting
@Internal
static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;

private final TypeSerializer<T1> oneSerializer;
Expand Down Expand Up @@ -618,63 +619,87 @@ public boolean canEqual(Object obj) {
}

@Override
public TypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
return new UnionSerializerConfigSnapshot<>(oneSerializer, twoSerializer);
}

@Override
public CompatibilityResult<TaggedUnion<T1, T2>> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof UnionSerializerConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> previousSerializersAndConfigs =
((UnionSerializerConfigSnapshot<?, ?>) configSnapshot).getNestedSerializersAndConfigs();

CompatibilityResult<T1> oneSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
previousSerializersAndConfigs.get(0).f0,
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(0).f1,
oneSerializer);

CompatibilityResult<T2> twoSerializerCompatResult = CompatibilityUtil.resolveCompatibilityResult(
previousSerializersAndConfigs.get(1).f0,
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(1).f1,
twoSerializer);

if (!oneSerializerCompatResult.isRequiresMigration() && !twoSerializerCompatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (oneSerializerCompatResult.getConvertDeserializer() != null && twoSerializerCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new UnionSerializer<>(
new TypeDeserializerAdapter<>(oneSerializerCompatResult.getConvertDeserializer()),
new TypeDeserializerAdapter<>(twoSerializerCompatResult.getConvertDeserializer())));
}
}

return CompatibilityResult.requiresMigration();
public TypeSerializerSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() {
return new UnionSerializerSnapshot<>(this);
}
}

/**
* The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}.
*/
@Deprecated
public static class UnionSerializerConfigSnapshot<T1, T2>
extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {
extends CompositeTypeSerializerConfigSnapshot<TaggedUnion<T1, T2>> {

private static final int VERSION = 1;

/** This empty nullary constructor is required for deserializing the configuration. */
public UnionSerializerConfigSnapshot() {}
/**
* This empty nullary constructor is required for deserializing the configuration.
*/
public UnionSerializerConfigSnapshot() {
}

public UnionSerializerConfigSnapshot(TypeSerializer<T1> oneSerializer, TypeSerializer<T2> twoSerializer) {
super(oneSerializer, twoSerializer);
}

@Override
public TypeSerializerSchemaCompatibility<TaggedUnion<T1, T2>> resolveSchemaCompatibility(TypeSerializer<TaggedUnion<T1, T2>> newSerializer) {
List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs = getNestedSerializersAndConfigs();

return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new UnionSerializerSnapshot<>(),
nestedSerializersAndConfigs.get(0).f1,
nestedSerializersAndConfigs.get(1).f1
);
}

@Override
public int getVersion() {
return VERSION;
}
}

/**
* The {@link TypeSerializerSnapshot} for the {@link UnionSerializer}.
*/
public static class UnionSerializerSnapshot<T1, T2>
extends CompositeTypeSerializerSnapshot<TaggedUnion<T1, T2>, UnionSerializer<T1, T2>> {

private static final int VERSION = 2;

@SuppressWarnings("WeakerAccess")
public UnionSerializerSnapshot() {
super(correspondingSerializerClass());
}

UnionSerializerSnapshot(UnionSerializer<T1, T2> serializerInstance) {
super(serializerInstance);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(UnionSerializer<T1, T2> outerSerializer) {
return new TypeSerializer[]{outerSerializer.oneSerializer, outerSerializer.twoSerializer};
}

@SuppressWarnings("unchecked")
@Override
protected UnionSerializer<T1, T2> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
return new UnionSerializer<>((TypeSerializer<T1>) nestedSerializers[0], (TypeSerializer<T2>) nestedSerializers[1]);
}

@SuppressWarnings("unchecked")
private static <T1, T2> Class<UnionSerializer<T1, T2>> correspondingSerializerClass() {
return (Class<UnionSerializer<T1, T2>>) (Class<?>) UnionSerializer.class;
}
}

// ------------------------------------------------------------------------
// Utility functions that implement the CoGroup logic based on the tagged
// union window reduce
Expand Down
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializerSnapshot;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;

/**
* State migration tests for {@link UnionSerializer}.
*/
@RunWith(Parameterized.class)
public class UnionSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TaggedUnion<String, Long>> {

public UnionSerializerMigrationTest(TestSpecification<TaggedUnion<String, Long>> testSpecification) {
super(testSpecification);
}

@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?>> testSpecifications() {

final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);

testSpecifications.add(
"union-serializer",
UnionSerializer.class,
UnionSerializerSnapshot.class,
UnionSerializerMigrationTest::stringLongRowSupplier);

return testSpecifications.get();
}

private static TypeSerializer<TaggedUnion<String, Long>> stringLongRowSupplier() {
return new UnionSerializer<>(StringSerializer.INSTANCE, LongSerializer.INSTANCE);
}

}



@@ -0,0 +1 @@
53778725243338537787315927955377873178348653778731961974537787321387515377873232740853778732510096537787327007985377873288617553778733069270
Binary file not shown.
@@ -0,0 +1 @@
53711258937562537112614539985371126164162753711261837231537112620123325371126218543153711262413321537112625282785371126263382753711262735393
Binary file not shown.

0 comments on commit 317e6b8

Please sign in to comment.