New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11329][core] Migrating CompositeSerializers #7590
Conversation
6c2eb62
to
688b910
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review for commit 516d39a.
@@ -302,6 +299,7 @@ static PrecomputedParameters precompute( | |||
} | |||
|
|||
/** Snapshot field serializers of composite type. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add @deprecated
message and direct to new snapshot class.
@@ -56,8 +57,8 @@ public String toString() { | |||
/** Serializer for Serializer. */ | |||
public static class Serializer extends CompositeSerializer<ValueWithTs<?>> { | |||
|
|||
public Serializer(TypeSerializer<?> userValueSerializer) { | |||
super(true, userValueSerializer, LongSerializer.INSTANCE); | |||
public Serializer(TypeSerializer<?> valueSerializer, TypeSerializer<Long> timestampSerializer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a public constructor that accepts the timestamp serializer.
This should be a private constructor used only by the snapshot class.
We should still have a public constructor that accepts the user value serializer, and by default just uses LongSerializer.INSTANCE
as the new timestamp serializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell this is not a user facing serializer, rather used internally by the TtlSerializer
,
and I think it is important to make it explicit that this is a composite serializer and these are the nested serializers that define it.
As a way to reduce verbosity, we can add a static factory method with an explicit name.
Feel free to decide for yourself :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, that's a fair argument that I can agree with. Since I don't have a strong opinion on this, I'll leave the constructors as is in this PR.
@Override | ||
protected Serializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) { | ||
TypeSerializer<?> valueSerializer = nestedSerializers[0]; | ||
TypeSerializer<Long> timeSerializer = (TypeSerializer<Long>) nestedSerializers[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: time
--> timestamp
for naming consistency
@@ -126,15 +128,15 @@ private IS createState() throws Exception { | |||
@SuppressWarnings("unchecked") | |||
private IS createValueState() throws Exception { | |||
ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>( | |||
stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); | |||
stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, having to pass in a LongSerializer.INSTANCE
every time we're instantiating a TtlSerializer seems very redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree, same reasons as above.
@@ -56,8 +57,8 @@ public String toString() { | |||
/** Serializer for Serializer. */ | |||
public static class Serializer extends CompositeSerializer<ValueWithTs<?>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This serializer class previously did not have a serialVersionUID
defined.
Need to explicitly set it to what it was before, because I guess the serial version UID would have changed when adding the new constructors.
OTOH, there seems to be missing a migration test for this serializer, because that would have caught this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah scratch that, just realized that this is only a serializer used in tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But still, I'd actually suggest adding that, to enforce good practices.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review for 8bba092.
Changes here looks good 👍
} | ||
|
||
return CompatibilityResult.requiresMigration(); | ||
} | ||
|
||
/** | ||
* Configuration snapshot for serializers of nullable types, containing the | ||
* configuration snapshot of its original serializer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add @deprecated
message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d876fd0 LGTM 👍
} | ||
/** | ||
* A snapshot for {@link RowSerializer}. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add @deprecated
message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
58a3692 LGTM 👍
|
||
return CompatibilityResult.requiresMigration(); | ||
public TypeSerializerSnapshot<TaggedUnion<T1, T2>> snapshotConfiguration() { | ||
return new UnionSerializerSnapshot<>(this); | ||
} | ||
} | ||
|
||
/** | ||
* The {@link TypeSerializerConfigSnapshot} for the {@link UnionSerializer}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add @deprecated
message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c375ca0 LGTM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0fa9adb LGTM 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One problem with 307f6ee.
Please see comment.
@@ -31,6 +33,7 @@ | |||
* allow calling different base class constructors from subclasses, while we need that | |||
* for the default empty constructor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add @deprecated
message.
: ScalaOptionSerializerConfigSnapshot[A] => | ||
ensureCompatibilityInternal(optionSerializerConfigSnapshot) | ||
case legacyOptionSerializerConfigSnapshot | ||
: OptionSerializer.OptionSerializerConfigSnapshot[A] => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this path will lead to problems when restoring from Flink 1.3, because this snapshot class was used back in Flink 1.3.
OTOH, it should be possible to redirect OptionSerializerConfigSnapshot
's compatibility check to the new ScalaOptionSerializerSnapshot
.
…lizerConfigSnapshot to ScalaOptionSerializerSnapshot This closes apache#7590.
…lizerConfigSnapshot to ScalaOptionSerializerSnapshot This closes apache#7590.
Thanks for the work @igalshilman! For every serializer that is touched, I checked that:
LGTM. I'll merge https://github.com/tzulitai/flink/tree/igal-dawid-gordon-composite once Travis is green! |
What is the purpose of the change
This PR migrates the following serializers to the new
TypeSerializerSnapshot
abstraction:Brief change log
Each commit migrates a single serializer's
TypeSerializerConfigSnapshot
to the newTypeSerializerSnapsoht
.Verifying this change
Each migrated serializer has an associated
StateMigrationTest
that verifies snapshot backwards compatibility with previous versions.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation