Skip to content

Commit

Permalink
[FLINK-11329] Migrating ScalaOptionSerializer
Browse files Browse the repository at this point in the history
  • Loading branch information
Igal Shilman committed Jan 28, 2019
1 parent 9a0ddc1 commit 4e1a755
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 39 deletions.
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.api.scala.typeutils;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;

import scala.Option;

Expand All @@ -31,6 +33,7 @@
* allow calling different base class constructors from subclasses, while we need that
* for the default empty constructor.
*/
@Deprecated
public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot<Option<E>> {

private static final int VERSION = 1;
Expand All @@ -46,4 +49,12 @@ public ScalaOptionSerializerConfigSnapshot(TypeSerializer<E> elementSerializer)
public int getVersion() {
return VERSION;
}

@Override
public TypeSerializerSchemaCompatibility<Option<E>> resolveSchemaCompatibility(TypeSerializer<Option<E>> newSerializer) {
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new ScalaOptionSerializerSnapshot<>(),
getSingleNestedSerializerAndConfig().f1);
}
}
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.typeutils;

import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import scala.Option;

/**
* A {@link org.apache.flink.api.common.typeutils.TypeSerializerSnapshot} for the Scala {@link OptionSerializer}.
*/
public final class ScalaOptionSerializerSnapshot<E> extends CompositeTypeSerializerSnapshot<Option<E>, OptionSerializer<E>> {

private static final int VERSION = 2;

@SuppressWarnings("WeakerAccess")
public ScalaOptionSerializerSnapshot() {
super(underlyingClass());
}

public ScalaOptionSerializerSnapshot(OptionSerializer<E> serializerInstance) {
super(serializerInstance);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(OptionSerializer<E> outerSerializer) {
return new TypeSerializer[]{outerSerializer.elemSerializer()};
}

@Override
protected OptionSerializer<E> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked") TypeSerializer<E> nestedSerializer = (TypeSerializer<E>) nestedSerializers[0];
return new OptionSerializer<>(nestedSerializer);
}

@SuppressWarnings("unchecked")
private static <E> Class<OptionSerializer<E>> underlyingClass() {
return (Class<OptionSerializer<E>>) (Class<?>) OptionSerializer.class;
}
}
Expand Up @@ -101,45 +101,8 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------

override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
}

override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[Option[A]] = {

configSnapshot match {
case optionSerializerConfigSnapshot
: ScalaOptionSerializerConfigSnapshot[A] =>
ensureCompatibilityInternal(optionSerializerConfigSnapshot)
case legacyOptionSerializerConfigSnapshot
: OptionSerializer.OptionSerializerConfigSnapshot[A] =>
ensureCompatibilityInternal(legacyOptionSerializerConfigSnapshot)
case _ => CompatibilityResult.requiresMigration()
}
}

private def ensureCompatibilityInternal(
compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot[Option[A]])
: CompatibilityResult[Option[A]] = {

val compatResult = CompatibilityUtil.resolveCompatibilityResult(
compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
classOf[UnloadableDummyTypeSerializer[_]],
compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
elemSerializer)

if (compatResult.isRequiresMigration) {
if (compatResult.getConvertDeserializer != null) {
CompatibilityResult.requiresMigration(
new OptionSerializer[A](
new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
} else {
CompatibilityResult.requiresMigration()
}
} else {
CompatibilityResult.compatible()
}
override def snapshotConfiguration(): TypeSerializerSnapshot[Option[A]] = {
new ScalaOptionSerializerSnapshot[A](this)
}
}

Expand Down
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.typeutils;

import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;

import scala.Option;

/**
* Migration test for the {@link ScalaEitherSerializerSnapshot}.
*/
@RunWith(Parameterized.class)
public class ScalaOptionSerializerSnapshotMigrationTest extends TypeSerializerSnapshotMigrationTestBase<Option<String>> {

private static final String SPEC_NAME = "scala-option-serializer";

public ScalaOptionSerializerSnapshotMigrationTest(TestSpecification<Option<String>> testSpecification) {
super(testSpecification);
}

@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?>> testSpecifications() {

final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);

testSpecifications.add(
SPEC_NAME,
OptionSerializer.class,
ScalaOptionSerializerSnapshot.class,
() -> new OptionSerializer<>(StringSerializer.INSTANCE));

return testSpecifications.get();
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 4e1a755

Please sign in to comment.