Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11073] (part 1) Introduce COMPATIBLE_WITH_RECONFIGURED_SERIALIZER option in TypeSerializerSchemaCompatibility #7329

Closed
wants to merge 9 commits into from

Conversation

tzulitai
Copy link
Contributor

What is the purpose of the change

This commit introduces a new option to the TypeSerializerSchemaCompatibility class:

TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(TypeSerializer<T> reconfiguredSerializerInstance);

The main point to introduce this option is so that we can allow cleaner serializer implementations which don't need to be mutable to accommodate the fact that sometimes it needs to be reconfigured (e.g. the KryoSerializer or PojoSerializer are example serializers that reconfigure themselves).
This is a step towards the principle that all serializers in Flink should be immutable.

After introducing this option, this PR also lets the state backends framework code respect the new option, i.e. using the returned reconfigured serializer instance to access state instead of the originally registered serializer. Serializer's that need to take this account include:

  1. State value serializers in keyed backends / operator backends
  2. Key serializers in keyed backends
  3. Namespace serializers in keyed backends

For 1. and 3., the change is pretty straightforward, and requires only small changes to the StateSerializerProvider; when we get a new registered serializer for restored state, we consider the reconfigured case there and maybe reassign the registered serializer reference to be the reconfigured one.

For 2., things are a bit more evolved since for the key serializer, we ALWAYS get the new serializer instance first, and THEN maybe get the previous key serializer snapshot if we're restored.
In this scenario, the StateSerializerProvider needs to be modified further to support this, as it previously only assumed that for restored state we always get the serializer snapshot first.
Simply put, the StateSerializerProvider had to be changed so that it supports both directions, either first getting the new serializer or the old serializer snapshot.

Brief change log

  • f5ab6b8: Extend the TypeSerializerSchemaCompatibility class to have the new option
  • c25b8fb: Preliminary test utilities extension. It introduces a ReconfigurationRequiringTestTypeSerializer to be used by various reconfiguring related migration tests later on.
  • 1a7f5a8: This modifies the StateSerializerProvider to work for the serializer reconfiguration cases 1. and 3. mentioned above. Tests are added to StateSerializerProviderTest for the new feature scope.
  • e028f29: This modifies the StateSerializerProvider to work for both directions. Tests are added for the new feature scope StateSerializerProviderTest.
  • b3ce23e: Let the key serializer in AbstractKeyedStateBackend be a StateSerializerProvider. Also makes accessing the key serializer more secure and well-defined.
  • 32485e1: Extend StateBackendMigrationTestBase to cover serializer reconfiguration cases for all kinds of state, including ValueState / ListState / key and namespace serializers in keyed state backends, and partitionable / union list state as well as broadcast state in operator state backends. This includes major refactoring of the StateBackendMigrationTestBase to let the cases be much clearer.

Verifying this change

Changes in StateBackendMigrationTestBase, StateSerializerProviderTest should reflect the new features and expected behaviours.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes (state access code paths are affected) / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

…ests

This commit prepares the test utilities we use for migration related
tests, so that we can test serializer reconfiguration cases.
…MPATIBLE_WITH_RECONFIGURED_SERIALIZER

This lets the StateSerializerProvider reassign the registeredSerializer
to be the wrapped reconfigured serializer instance, if that is the case
of the result compatibility.
…oth eagerly / lazily registered serializers

This is required so that the StateSerializerProvider can be used for the
key serializer in AbstractKeyedStateBackend. For the key serializer,
regardless of whether or not we're restoring old state, we always first
get the new key serializer, and then maybe get the previous key
serializer's snapshot in the restore phase.

This commit makes the StateSerializerProvider work in both directions,
meaning that it can be first instantiated with either a serializer
instance or the previous serializer's snapshot, and then complemented
with the other one later on.
… test names for StateSerializerProvider

With the new concepts in the StateSerializerProvider, the factory
methods and unit test names can be renamed to be more appropriate.
…erializer in AbstractKeyedStateBackend

This commit replaces the keySerializer instance field in
AbstractKeyedStateBackend to be a StateSerializerProvider, so that it
reuses the schema compatibility result handling / reconfigured
serializer assigning logic implemented in that utility class.

This commit makes access to the key serializer in state backend
subclasses more well defined.
…tBase to cover reconfigured serializer cases

Extends the migration tests to cover new cases where the new serializer
is reconfigured to be compatible, for ValueState, ListState, key /
namespace serializer in keyed backends, and partitionable ListState,
union ListState and boradcast state in operator state backends..

This also fixes some compatibility checks in state backends to have the
expected behaviours as tested in StateBackendMigrationTestBase.
…in KeyedBackendSerializationProxy

Remove "config" from the field names and getter name to reflect the new
abstraction TypeSerializerSnapshot.
@igalshilman
Copy link
Contributor

LGTM 👍

@tzulitai
Copy link
Contributor Author

tzulitai commented Jan 7, 2019

Thanks for the review and numerous offline discussions @igalshilman!
Since Travis is green, I'll proceed to merge this ..

@asfgit asfgit closed this in e03cea5 Jan 7, 2019
tisonkun pushed a commit to tisonkun/flink that referenced this pull request Jan 17, 2019
…state backends

This commit adapts the StateSerializerProvider to respect the
COMPATIBLE_WITH_RECONFIGURED_SERIALIZER case, which effectively lets all
state backends respect serializer reconfiguration because state
serializers are always obtained via StateSerializerProviders.

It also makes StateSerializerProvider work for both eagerly / lazily
registered serializers.
This is required so that the StateSerializerProvider can be used for the
key serializer in AbstractKeyedStateBackend. For the key serializer,
regardless of whether or not we're restoring old state, we always first
get the new key serializer, and then maybe get the previous key
serializer's snapshot in the restore phase.

Therefore, after this commit, access the key serializer in
AbstractKeyedStateBackend is also governed by a StateSerializerProvider.

For a more fine-grained explaination of all the changes that build up to
this, please refer to apache#7329.

This closes apache#7329.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants