Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11329][core] Migrating CompositeSerializers #7590

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -204,13 +204,10 @@ public boolean canEqual(Object obj) {
return obj != null && getClass().equals(obj.getClass());
}

@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return new ConfigSnapshot(fieldSerializers);
}

@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
// We can not remove this method, as long as we support restoring into CompositeTypeSerializerConfigSnapshot.
// Previously (pre 1.8), multiple composite serializers were using this class directly as their snapshot class.
if (configSnapshot instanceof ConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
Expand Down Expand Up @@ -302,6 +299,7 @@ static PrecomputedParameters precompute(
}

/** Snapshot field serializers of composite type. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add @deprecated message and direct to new snapshot class.

@Deprecated
public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
private static final int VERSION = 0;

Expand Down
Expand Up @@ -191,6 +191,11 @@ protected CompositeSerializer<List<Object>> createSerializerInstance(
PrecomputedParameters precomputed, TypeSerializer<?>... originalSerializers) {
return new TestListCompositeSerializer(precomputed, originalSerializers);
}

@Override
public TypeSerializerSnapshot<List<Object>> snapshotConfiguration() {
throw new UnsupportedOperationException();
}
}

private static class CompositeSerializerTestInstance extends SerializerTestInstance<List<Object>> {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
Expand Down Expand Up @@ -143,7 +144,10 @@ public void initializeState(FunctionInitializationContext context) {
prevUpdatesByVerifierId = TtlStateVerifier.VERIFIERS.stream()
.collect(Collectors.toMap(TtlStateVerifier::getId, v -> {
checkNotNull(v);
TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(v.getUpdateSerializer());
final TypeSerializer<ValueWithTs<?>> typeSerializer = new ValueWithTs.Serializer(
v.getUpdateSerializer(),
LongSerializer.INSTANCE);

ListStateDescriptor<ValueWithTs<?>> stateDesc = new ListStateDescriptor<>(
"TtlPrevValueState_" + v.getId(), typeSerializer);
KeyedStateStore store = context.getKeyedStateStore();
Expand Down
Expand Up @@ -19,8 +19,9 @@
package org.apache.flink.streaming.tests.verify;

import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.util.FlinkRuntimeException;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -56,8 +57,8 @@ public String toString() {
/** Serializer for Serializer. */
public static class Serializer extends CompositeSerializer<ValueWithTs<?>> {
Copy link
Contributor

@tzulitai tzulitai Jan 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This serializer class previously did not have a serialVersionUID defined.
Need to explicitly set it to what it was before, because I guess the serial version UID would have changed when adding the new constructors.

OTOH, there seems to be missing a migration test for this serializer, because that would have caught this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah scratch that, just realized that this is only a serializer used in tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still, I'd actually suggest adding that, to enforce good practices.


public Serializer(TypeSerializer<?> userValueSerializer) {
super(true, userValueSerializer, LongSerializer.INSTANCE);
public Serializer(TypeSerializer<?> valueSerializer, TypeSerializer<Long> timestampSerializer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a public constructor that accepts the timestamp serializer.
This should be a private constructor used only by the snapshot class.

We should still have a public constructor that accepts the user value serializer, and by default just uses LongSerializer.INSTANCE as the new timestamp serializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this is not a user facing serializer, rather used internally by the TtlSerializer,
and I think it is important to make it explicit that this is a composite serializer and these are the nested serializers that define it.

As a way to reduce verbosity, we can add a static factory method with an explicit name.
Feel free to decide for yourself :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that's a fair argument that I can agree with. Since I don't have a strong opinion on this, I'll leave the constructors as is in this PR.

super(true, valueSerializer, timestampSerializer);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -92,7 +93,58 @@ protected Object getField(@Nonnull ValueWithTs<?> value, int index) {
protected CompositeSerializer<ValueWithTs<?>> createSerializerInstance(
PrecomputedParameters precomputed,
TypeSerializer<?>... originalSerializers) {
return new Serializer(precomputed, (TypeSerializer<Object>) originalSerializers[0]);

return new Serializer(precomputed, originalSerializers[0], originalSerializers[1]);
}

TypeSerializer<?> getValueSerializer() {
return fieldSerializers[0];
}

@SuppressWarnings("unchecked")
TypeSerializer<Long> getTimestampSerializer() {
TypeSerializer<?> fieldSerializer = fieldSerializers[1];
return (TypeSerializer<Long>) fieldSerializer;
}

@Override
public TypeSerializerSnapshot<ValueWithTs<?>> snapshotConfiguration() {
return new ValueWithTsSerializerSnapshot(this);
}
}

/**
* A {@link TypeSerializerSnapshot} for ValueWithTs Serializer.
*/
public static final class ValueWithTsSerializerSnapshot extends CompositeTypeSerializerSnapshot<ValueWithTs<?>, Serializer> {

private final static int VERSION = 2;

@SuppressWarnings("unused")
public ValueWithTsSerializerSnapshot() {
super(Serializer.class);
}

ValueWithTsSerializerSnapshot(Serializer serializerInstance) {
super(serializerInstance);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(Serializer outerSerializer) {
return new TypeSerializer[]{outerSerializer.getValueSerializer(), outerSerializer.getTimestampSerializer()};
}

@SuppressWarnings("unchecked")
@Override
protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
TypeSerializer<?> valueSerializer = nestedSerializers[0];
TypeSerializer<Long> timeSerializer = (TypeSerializer<Long>) nestedSerializers[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: time --> timestamp for naming consistency

return new Serializer(valueSerializer, timeSerializer);
}
}
}
Expand Up @@ -28,7 +28,9 @@
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyedStateBackend;
Expand Down Expand Up @@ -126,15 +128,15 @@ private IS createState() throws Exception {
@SuppressWarnings("unchecked")
private IS createValueState() throws Exception {
ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, having to pass in a LongSerializer.INSTANCE every time we're instantiating a TtlSerializer seems very redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, same reasons as above.

return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
}

@SuppressWarnings("unchecked")
private <T> IS createListState() throws Exception {
ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, listStateDesc.getElementSerializer()));
return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor));
}

Expand All @@ -144,7 +146,7 @@ private <UK, UV> IS createMapState() throws Exception {
MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
stateDesc.getName(),
mapStateDesc.getKeySerializer(),
new TtlSerializer<>(mapStateDesc.getValueSerializer()));
new TtlSerializer<>(LongSerializer.INSTANCE, mapStateDesc.getValueSerializer()));
return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor));
}

Expand All @@ -154,7 +156,7 @@ private IS createReducingState() throws Exception {
ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
stateDesc.getName(),
new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
new TtlSerializer<>(stateDesc.getSerializer()));
new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor));
}

Expand All @@ -165,7 +167,7 @@ private <IN, OUT> IS createAggregatingState() throws Exception {
TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
return (IS) new TtlAggregatingState<>(createTtlStateContext(ttlDescriptor), ttlAggregateFunction);
}

Expand All @@ -178,7 +180,7 @@ private <T> IS createFoldingState() throws Exception {
stateDesc.getName(),
ttlInitAcc,
new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
new TtlSerializer<>(stateDesc.getSerializer()));
new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
return (IS) new TtlFoldingState<>(createTtlStateContext(ttlDescriptor));
}

Expand Down Expand Up @@ -237,8 +239,8 @@ public static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> {
private static final long serialVersionUID = 131020282727167064L;

@SuppressWarnings("WeakerAccess")
public TtlSerializer(TypeSerializer<T> userValueSerializer) {
super(true, LongSerializer.INSTANCE, userValueSerializer);
public TtlSerializer(TypeSerializer<Long> timestampSerializer, TypeSerializer<T> userValueSerializer) {
super(true, timestampSerializer, userValueSerializer);
}

@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -272,5 +274,61 @@ protected CompositeSerializer<TtlValue<T>> createSerializerInstance(
Preconditions.checkArgument(originalSerializers.length == 2);
return new TtlSerializer<>(precomputed, originalSerializers);
}

@SuppressWarnings("unchecked")
TypeSerializer<Long> getTimestampSerializer() {
return (TypeSerializer<Long>) (TypeSerializer<?>) fieldSerializers[0];
}

@SuppressWarnings("unchecked")
TypeSerializer<T> getValueSerializer() {
return (TypeSerializer<T>) fieldSerializers[1];
}

@Override
public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
return new TtlSerializerSnapshot<>(this);
}
}

/**
* A {@link TypeSerializerSnapshot} for TtlSerializer.
*/
public static final class TtlSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<TtlValue<T>, TtlSerializer<T>> {

private static final int VERSION = 2;

@SuppressWarnings({"WeakerAccess", "unused"})
public TtlSerializerSnapshot() {
super(correspondingSerializerClass());
}

TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
super(serializerInstance);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> outerSerializer) {
return new TypeSerializer[]{ outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer()};
}

@Override
@SuppressWarnings("unchecked")
protected TtlSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
TypeSerializer<Long> timestampSerializer = (TypeSerializer<Long>) nestedSerializers[0];
TypeSerializer<T> valueSerializer = (TypeSerializer<T>) nestedSerializers[1];

return new TtlSerializer<>(timestampSerializer, valueSerializer);
}

@SuppressWarnings("unchecked")
private static <T> Class<TtlSerializer<T>> correspondingSerializerClass() {
return (Class<TtlSerializer<T>>) (Class<?>) TtlSerializer.class;
}
}
}
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.ttl;

import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializer;
import org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializerSnapshot;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;

/**
* State migration test for {@link TtlSerializerStateMigrationTest}.
*/
@RunWith(Parameterized.class)
public class TtlSerializerStateMigrationTest extends TypeSerializerSnapshotMigrationTestBase<TtlValue<String>> {

private static final String SPEC_NAME = "ttl-serializer";

public TtlSerializerStateMigrationTest(TestSpecification<TtlValue<String>> testSpecification) {
super(testSpecification);
}

@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?>> testSpecifications() {

final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);

testSpecifications.add(
SPEC_NAME,
TtlSerializer.class,
TtlSerializerSnapshot.class,
() -> new TtlSerializer<>(LongSerializer.INSTANCE, StringSerializer.INSTANCE));

return testSpecifications.get();
}
}

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.