Skip to content

Commit

Permalink
[FLINK-6178] [core] Allow serializer upgrades for managed state
Browse files Browse the repository at this point in the history
This commit adds the functionality of allowing serializer upgrades for
Flink's managed state. It consists of 2 major changes: 1) new
user-facing API in `TypeSerializer`, and 2) activate serializer upgrades
in state backends.

For 1) new user-facing API for `TypeSerializer`, the following is added:
- new class: TypeSerializerConfigSnapshot
- new class: CompatibilityResult
- new method: TypeSerializer#snapshotConfiguration()
- new method:
  TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)

Generally speaking, configuration snapshots contains a point-in-time
view of a serializer's state / configuration, and is persisted along
with checkpoints. On restore, the configuration is confronted with the
new serializer of the state to check for compatibility, which may
introduce reconfiguration of the new serializer to be compatible.

This compatibility check is integrated in the state backends' restore
flow in 2). Currently, if the check results in the need to perform state
migration, the restore simply fails as the state migration feature isn't
yet available.
  • Loading branch information
tzulitai committed May 7, 2017
1 parent 409319a commit 8aa5e05
Show file tree
Hide file tree
Showing 120 changed files with 6,429 additions and 1,011 deletions.
Expand Up @@ -20,7 +20,11 @@


import com.esotericsoftware.kryo.Kryo;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
Expand All @@ -30,7 +34,8 @@

import java.io.IOException;

public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
@Internal
public final class WritableSerializer<T extends Writable> extends TypeSerializer<T> {

private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -149,4 +154,42 @@ public boolean equals(Object obj) {
public boolean canEqual(Object obj) {
return obj instanceof WritableSerializer;
}

// --------------------------------------------------------------------------------------------
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------

@Override
public WritableSerializerConfigSnapshot<T> snapshotConfiguration() {
return new WritableSerializerConfigSnapshot<>(typeClass);
}

@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof WritableSerializerConfigSnapshot
&& typeClass.equals(((WritableSerializerConfigSnapshot) configSnapshot).getTypeClass())) {

return CompatibilityResult.compatible();
} else {
return CompatibilityResult.requiresMigration(null);
}
}

public static final class WritableSerializerConfigSnapshot<T extends Writable>
extends GenericTypeSerializerConfigSnapshot<T> {

private static final int VERSION = 1;

/** This empty nullary constructor is required for deserializing the configuration. */
public WritableSerializerConfigSnapshot() {}

public WritableSerializerConfigSnapshot(Class<T> writableTypeClass) {
super(writableTypeClass);
}

@Override
public int getVersion() {
return VERSION;
}
}
}

Large diffs are not rendered by default.

Expand Up @@ -75,7 +75,7 @@ public static class FinalSemiAsyncSnapshot {
}

private static void throwExceptionOnLoadingThisClass() {
throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. "
throw new RuntimeException("Attempt to requiresMigration RocksDB state created with semi async snapshot mode failed. "
+ "Unfortunately, this is not supported. Please create a new savepoint for the job using fully "
+ "async mode in Flink 1.1 and run migration again with the new savepoint.");
}
Expand Down
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.PublicEvolving;

/**
* A {@code CompatibilityResult} contains information about whether or not data migration
* is required in order to continue using new serializers for previously serialized data.
*
* @param <T> the type of the data being migrated.
*/
@PublicEvolving
public final class CompatibilityResult<T> {

/** Whether or not migration is required. */
private final boolean requiresMigration;

/**
* The convert deserializer to use for reading previous data during migration,
* in the case that the preceding serializer cannot be found.
*
* <p>This is only relevant if migration is required.
*/
private final TypeSerializer<T> convertDeserializer;

/**
* Returns a strategy that signals that the new serializer is compatible and no migration is required.
*
* @return a result that signals migration is not required for the new serializer
*/
public static <T> CompatibilityResult<T> compatible() {
return new CompatibilityResult<>(false, null);
}

/**
* Returns a strategy that signals migration to be performed.
*
* <p>Furthermore, in the case that the preceding serializer cannot be found or restored to read the
* previous data during migration, a provided convert deserializer can be used (may be {@code null}
* if one cannot be provided).
*
* <p>In the case that the preceding serializer cannot be found and a convert deserializer is not
* provided, the migration will fail due to the incapability of reading previous data.
*
* @return a result that signals migration is necessary, possibly providing a convert deserializer.
*/
public static <T> CompatibilityResult<T> requiresMigration(TypeSerializer<T> convertDeserializer) {
return new CompatibilityResult<>(true, convertDeserializer);
}

private CompatibilityResult(boolean requiresMigration, TypeSerializer<T> convertDeserializer) {
this.requiresMigration = requiresMigration;
this.convertDeserializer = convertDeserializer;
}

public TypeSerializer<T> getConvertDeserializer() {
return convertDeserializer;
}

public boolean requiresMigration() {
return requiresMigration;
}
}
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.Arrays;

/**
* A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers.
* The configuration snapshot consists of the configuration snapshots of all nested serializers.
*/
@Internal
public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {

private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;

/** This empty nullary constructor is required for deserializing the configuration. */
public CompositeTypeSerializerConfigSnapshot() {}

public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
}

@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots);
}

@Override
public void read(DataInputView in) throws IOException {
super.read(in);
nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
}

public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() {
return nestedSerializerConfigSnapshots;
}

public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() {
return nestedSerializerConfigSnapshots[0];
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}

if (obj == null) {
return false;
}

return (obj.getClass().equals(getClass()))
&& Arrays.equals(
nestedSerializerConfigSnapshots,
((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
}

@Override
public int hashCode() {
return Arrays.hashCode(nestedSerializerConfigSnapshots);
}
}
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

/**
* Configuration snapshot for serializers for generic types.
*
* @param <T> The type to be instantiated.
*/
@Internal
public abstract class GenericTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot {

private Class<T> typeClass;

/** This empty nullary constructor is required for deserializing the configuration. */
public GenericTypeSerializerConfigSnapshot() {}

public GenericTypeSerializerConfigSnapshot(Class<T> typeClass) {
this.typeClass = Preconditions.checkNotNull(typeClass);
}

@Override
public void write(DataOutputView out) throws IOException {
super.write(out);

// write only the classname to avoid Java serialization
out.writeUTF(typeClass.getName());
}

@SuppressWarnings("unchecked")
@Override
public void read(DataInputView in) throws IOException {
super.read(in);

String genericTypeClassname = in.readUTF();
try {
typeClass = (Class<T>) Class.forName(genericTypeClassname, true, getUserCodeClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Could not find the requested class " + genericTypeClassname + " in classpath.", e);
}
}

public Class<T> getTypeClass() {
return typeClass;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}

if (obj == null) {
return false;
}

return (obj.getClass().equals(getClass()))
&& typeClass.equals(((GenericTypeSerializerConfigSnapshot) obj).getTypeClass());
}

@Override
public int hashCode() {
return typeClass.hashCode();
}
}

0 comments on commit 8aa5e05

Please sign in to comment.