-
Notifications
You must be signed in to change notification settings - Fork 13.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-11334][core] Migrate EnumValueSerializer to use new serialization compatibility abstractions #7734
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. Bot commandsThe @flinkbot bot supports the following commands:
|
…ion compatibility abstractions This commit migrate EnumValueSerializer to use new serialization compatibilty abstractions * add a new class `ScalaEnumSerializerSnapshot` * return a `ScalaEnumSerializerConfigSnapshot ` with `ScalaEnumSerializerSnapshot` when calling `EnumValueSerializer#snapshotConfiguration` * add a migration test `EnumValueSerializerSnapshotMigrationTest` to test the compatibility * remove function `EnumValueSerializer#ensureCompatibility()`
41718d5
to
00e1022
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution! Overall, the code is good, but I requested some changes to functionality. That we also need to discuss with @igalshilman
finally if (inViewWrapper != null) inViewWrapper.close() | ||
} | ||
|
||
override def restoreSerializer(): TypeSerializer[E#Value] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not create a serializer but tries to create an instance of the enum (which I think would fail). I'm wondering why this method is never called in the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aljoscha I think the implementation is wrong, there should create an EnumValueSerializer
, I'll update it, and I'll dig it a bit to find why this function did not been called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the function will be called when restore
, in the migration test we just restore the serializer of 1.6 and 1.7, so there didn't have any place will call this function. I will add a test to test this function.
if (!previousEnumConstant.equals(enumValue.toString)) { | ||
// compatible only if new enum constants are only appended, | ||
// and original constants must be in the exact same order | ||
return TypeSerializerSchemaCompatibility.compatibleAfterMigration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that the previous checking code in EnumValueSerializer
also returned compatible after migration here but I don't think we do that migration here, so what happens if the index of the value actually changed? Maybe we should be conservative here and disallow any changes to the enum.
@igalshilman, what do you think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the index of the value actually changed, there should be incompatible, I'll update it. because in EnumValueSerializer#serializer
we use Enumeration#Value.id
readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { | ||
val inViewWrapper = new DataInputViewStream(in) | ||
try { | ||
if (readVersion == 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can't happen that this path would ever be taken, because we never try and restore the old TypeSerializerConfigSnapshot
in here.
@igalshilman Could you confirm?
} | ||
|
||
object ScalaEnumSerializerSnapshot { | ||
val VERSION = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be in line with the other newly added TypeSerializerSnapshots
, this should maybe be version 2
. But I see that version 3
could also be valid since the old TypeSerializerConfigSnapshot
for enums was already at version 2
.
@igalshilman Sorry for bothering again, but what do you think?
@@ -100,15 +100,15 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike { | |||
*/ | |||
@Test | |||
def checkRemovedField(): Unit = { | |||
assertTrue(checkCompatibility(enumA, enumC).isIncompatible) | |||
assertTrue(checkCompatibility(enumA, enumC).isCompatibleAfterMigration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, this should probably be incompatible.
} | ||
|
||
/** | ||
* Check that changing the enum field order requires migration | ||
*/ | ||
@Test | ||
def checkDifferentFieldOrder(): Unit = { | ||
assertTrue(checkCompatibility(enumA, enumD).isIncompatible) | ||
assertTrue(checkCompatibility(enumA, enumD).isCompatibleAfterMigration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned above, this should probably be incompatible.
Please don't change anything right now. I'm preparing a set of changes on your PR. |
Got it. |
@klion26 Thanks for your contribution again! 😄It turns out that the enum serializers are quite tricky, @igalshilman and I spent some time to create an updated PR that includes your commit: #7766 |
What is the purpose of the change
Migrate
EnumValueSerializer
to use new serialization compatibility abstractionsBrief change log
This patch contains:
ScalaEnumSerializerSnapshot
ScalaEnumSerializerConfigSnapshot
withScalaEnumSerializerSnapshot
when callingEnumValueSerializer#snapshotConfiguration
EnumValueSerializerSnapshotMigrationTest
to test the compatibilityEnumValueSerializer#ensureCompatibility()
Verifying this change
This change is already covered by existing tests
EnumValueSerializerTest
andEnumValueSerializerUpgradeTest
and add a migration test
EnumValueSerializerSnapshotMigrationTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation