Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4545] preparations for removing the network buffers parameter #3467

Closed
wants to merge 6 commits into from

Conversation

NicoK
Copy link
Contributor

@NicoK NicoK commented Mar 3, 2017

This PR includes some preparations for following PRs that ultimately lead to removing the network buffer parameter that was hard to tune.

@zhijiangW
Copy link
Contributor

Hi @NicoK , I am interested in this issue and I like the way of asserting hold lock in this PR.

It is really necessary to manage network buffers by framework, because it is difficult to set the exact number of buffers by users. And our current simple solution is to expand the ResourceProfile by adding the total number of input and output edges for Execution. Then the ResourceManager would calculate the buffer amounts based on that and overwrite the parameter value to TaskManager configuration.

From @StephanEwen mentioned before, I know a little for this issue. Would you share some detail designs for plans for it if have, then I can learn and track the progress in time. Thank you !

@NicoK
Copy link
Contributor Author

NicoK commented Mar 6, 2017

Hi @zhijiangW,
actually, the solution I am working on is to replace the network buffers parameter by something like "max memory in percent" and "min MB to use". For this to not create buffer bloat in our network stack, I have started to implement limited LocalBufferPool instances which tune their size based on the actual number of outgoing and ingoing channels. It is actually not much more complicated than this and I already started on this in my local branch at https://github.com/NicoK/flink/tree/flink-4545 - expect a new PR within the week with more details.

Nico Kruber added 2 commits March 6, 2017 14:19
…t partition type

This removes
JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern)
and requires the developer to call
JobVertex#connectNewDataSetAsInput(JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType)
instead and think about the partition type to add.
These were implying a default result partition type which we want the developer
to actively decide upon.
@StephanEwen
Copy link
Contributor

I think this is a good change, merging this...

@zhijiangW Managing the buffers changes in some followup PR, first adjusting the local pools, then the global pool. Managing buffers in a global pool can help when caching data, such as for batch jobs. But we can take suggestions followup improvements as a separate thread, after this improvement is in.

@@ -265,11 +281,15 @@ public String toString() {
// ------------------------------------------------------------------------

private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I have a question about assert, because I found that assertion is disabled in java by default. why not use explicit synchronized(availableMemorySegments) which may be more common usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using synchronized again would impact performance, while assertions only do when they are enabled which is the case in our unit tests (see https://maven.apache.org/surefire/maven-surefire-plugin/test-mojo.html#enableAssertions).

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Mar 9, 2017
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Mar 9, 2017
StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request Mar 9, 2017
@asfgit asfgit closed this in 8b49ee5 Mar 9, 2017
@zhijiangW
Copy link
Contributor

@NicoK ,thank you for explanation, and I already trace the code in your local branch. Wish your further change commit in global pool.

@StephanEwen , thanks for further elaboration. From my understanding, each task can decide the core number of buffers in LocalBufferPool based on input, output channels and configuration, the maximum number of buffers based on ResultPartitionType. And all the LocalBufferPools make effect on the total number of buffers in NetworkBufferPool, may need consider maximum memory usages.

And my concern is to consider the memory usages in NetworkBufferPool before starts the TaskManager, and this part of memory should be added into the total resource of TaskManager.
I am willing to do that as a part of my current work in Fine-grained Resource Configuration after this feature completes.

@StephanEwen
Copy link
Contributor

@zhijiangW Yes, let's discuss this when the feature is complete.
Our thinking so far is:

  • One can specify an absolute amount of network memory (similar as one can specify an absolute amount of managed memory for batch)
  • If no absolute amount is specified, a relative fraction of the JVM heap will be pre-allocated as network buffers.

p16i pushed a commit to p16i/flink that referenced this pull request Apr 16, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants