Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger #6479

Closed
wants to merge 4 commits into from

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

cc @StephanEwen

Brief change log

  • Introduce InMemorySorterFactory to make UnilateralSortMerger testable
  • Keep reference to created InMemorySorters in UnilateralSortMerger
  • Dispose all InMemorySorters before releasing memory to MemoryManager

Verifying this change

  • Added UnilateralSortMergerTest#testInMemorySorterDisposal

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

if (initialSerializer == null) {
typeSerializer = typeSerializerFactory.getSerializer();
} else {
typeSerializer = initialSerializer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we simplify this by always discarding the serializer created in the constructor? We could then remove the initialSerializer field and always create a new serializer here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the initialSerializer is an optimization to decrease the number of created TypeSerializer instances at the cost of a slightly more complicated create method. I guess both ways should work given that the UnilateralSortMerger does not create thousands of InMemorySorters.

new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
}

protected UnilateralSortMerger(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VisibleForTesting?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? we might config which InMemorySorterFactory to use if needed. semantically it's no need to be @VisibleForTesting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @tisonkun

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;

Copy link
Member

@tisonkun tisonkun Aug 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can be suppressed if merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right it's unrelated. I will remove it from the commit.

tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Aug 3, 2018
…SortMerger

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

This closes apache#6479.
StephanEwen and others added 4 commits August 3, 2018 09:22
To guard against memory leaks, the Task releases the reference to its AbstractInvokable
when it shuts down or cancels.

This closes apache#6480.
…SortMerger

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

This closes apache#6479.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Aug 3, 2018
…SortMerger

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

This closes apache#6479.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Aug 3, 2018
…SortMerger

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

This closes apache#6479.
@asfgit asfgit closed this in bf94d28 Aug 3, 2018
asfgit pushed a commit that referenced this pull request Aug 3, 2018
…SortMerger

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

This closes #6479.
asfgit pushed a commit that referenced this pull request Aug 3, 2018
…SortMerger

This commit changes the behaviour of the UnilateralSortMerger to keep references of the created
InMemorySorters in order to explicitly dispse them when the sort merger is closed. This prevents
that InMemorySorters leak and block the garbage collection of MemorySegments to which they keep
references.

This closes #6479.
@tillrohrmann tillrohrmann deleted the fixYarnBatchMemory branch August 9, 2018 14:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants