Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8836] Fix duplicate method in KryoSerializer to perform deep c… #5880

Closed
wants to merge 1 commit into from

Conversation

@StefanRRichter
Copy link
Contributor

StefanRRichter commented Apr 19, 2018

…opy of default/registered serializer instances.

This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.

Brief change log

This PR fixes a problem with the duplicate method of KryoSerializer. We do now perform deep copies for default and registered serializer objects.

Otherwise, if we share duplicated KryoSerializer instances across different threads, some of their internal serializers might be a shared instance of a stateful object.

Verifying this change

This change added tests and can be verified by running KryoSerializerConcurrencyTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (indirectly)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (not applicable)
…opy of default/registered serializer instances.

This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.
}

// deep copy the serializer instances in kryoRegistrations
for (Map.Entry<String, KryoRegistration> entry : toCopy.kryoRegistrations.entrySet()) {

This comment has been minimized.

Copy link
@tzulitai

tzulitai Apr 19, 2018

Contributor

One alternative approach to this loop (though I'm not sure would be better), is in the buildKryoRegistrations method we always make a copy of the ExecutionConfig.SerializableSerializer when instantiating its corresponding KryoRegistration.
See

. Here we can make a copy already when building the registrations.

Then, when duplicating the KryoSerializer, for duplicating the registrations, this would only be a matter of calling buildKryoRegistrations again with the execution config because that method would handle stateful serializer registrations properly.
IMO, this seems like a cleaner solution. What do you think?

This comment has been minimized.

Copy link
@aljoscha

aljoscha Apr 19, 2018

Contributor

The problem is that we don't have the ExecutionConfig in the copy constructor.

*
* <p><b>Important:</b> This test only works if assertions are activated (-ea) on the JVM
* when running tests.
*/
public class KryoSerializerConcurrencyTest {

@Test
public void testDuplicateSerializerWithDefaultSerializerClass() {

This comment has been minimized.

Copy link
@aljoscha

aljoscha Apr 19, 2018

Contributor

test names are mixed up, this and the next one should be switched

@sihuazhou

This comment has been minimized.

Copy link
Contributor

sihuazhou commented Apr 20, 2018

+1, Will this PR also get into 1.4.x?

@StefanRRichter

This comment has been minimized.

Copy link
Contributor Author

StefanRRichter commented Apr 20, 2018

@sihuazhou I think it can and should also go into 1.4.

@aljoscha is that a +1 once I have fixed the method names?

@aljoscha

This comment has been minimized.

Copy link
Contributor

aljoscha commented Apr 20, 2018

Yes, +1 after that is fixed.

@StefanRRichter

This comment has been minimized.

Copy link
Contributor Author

StefanRRichter commented Apr 20, 2018

Thanks for the comments, will fix the names and then merge.

@asfgit asfgit closed this in 7d0bfd5 Apr 20, 2018
@StefanRRichter StefanRRichter deleted the StefanRRichter:FLINK-8836 branch Apr 20, 2018
asfgit pushed a commit that referenced this pull request Apr 20, 2018
…opy of default/registered serializer instances.

This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.

This closes #5880.

(cherry picked from commit 7d0bfd5)
asfgit pushed a commit that referenced this pull request Apr 20, 2018
…opy of default/registered serializer instances.

This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.

This closes #5880.

(cherry picked from commit 7d0bfd5)
sampathBhat pushed a commit to sampathBhat/flink that referenced this pull request Jul 26, 2018
…opy of default/registered serializer instances.

This method did create deep copies of registered or default serializer instances and
as a result those serializer instances can accidentally be shared across different threads.

This closes apache#5880.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.