Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4545] replace the network buffers parameter #3721

Closed
wants to merge 10 commits into from

Conversation

NicoK
Copy link
Contributor

@NicoK NicoK commented Apr 13, 2017

(based on #3708 and #3713)

Instead, allow the configuration with the following three new (more flexible) parameters:

  • taskmanager.network.memory.fraction: fraction of JVM memory to use for network buffers (default: 0.1)
  • taskmanager.network.memory.min: minimum memory size for network buffers (default: 64 MB)
  • taskmanager.network.memory.max: maximum memory size for network buffers (default: 1 GB)

Note that I needed to adapt two unit tests which would have been killed on Travis CI because these defaults result in ~150MB memory being used for network buffers which apparently was too much there.

@zentol
Copy link
Contributor

zentol commented Apr 18, 2017

I've merged the 2 PR's that this one build upon; could you rebase this one?

Nico Kruber added 9 commits April 18, 2017 13:54
Instead, allow the configuration with the following three new (more flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)

 # Please enter the commit message for your changes. Lines starting
These verify that the results are the same as in the calculation done by Java.
Also update the descriptions of taskmanager.memory.fraction not being relative
to the full size of taskmanager.heap.mb but that network buffer memory is
subtracted before!
Due to the increased defaults for network buffer memory use, some builds on
Travis CI fail with unit tests being killed. This affects
* RocksDbBackendEventTimeWindowCheckpointingITCase and
* HBaseConnectorITCase

We fix this by limiting the maximum amount of network buffer memory to 80MB
(current defaults would yield 150MB, previously 64MB were used).
### Configuring the Network Buffers

If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers:
If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, you
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually don't do manual line breaks in the documentation; otherwise if you resize the window funky things start to happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik those things only happen if you leave an empty space at the end of the line or so - here, the lines are properly joined inside HTML

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What error message occurred (if any) when the tests failed on travis?

network_buffers_bytes=`awk "BEGIN { x = lshift(${FLINK_TM_HEAP},20) * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
fi

# recalculate the JVM heap memory by taking the network buffers into account
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a verification isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, "recalculate" implied that it would change some configuration value, but that's not happening. It's only verifying that the memory for network buffers is less than the heap memory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, actually, the user may give the FLINK_TM_HEAP environment variable or configure the "flink heap size" via taskmanager.heap.mb but this is not the real "heap" size - rather the overall memory size used by flink (including off-heap). So this function removes the off-heap part and returns the real heap sizes to use with -Xmx and -Xms

public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) {
assert totalJavaMemorySize > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Preconditions instead?


long javaMem = Math.max(max + 1, ran.nextLong());

final long networkBufMem = TaskManagerServices.calculateNetworkBuf(javaMem, config);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we definitely need here is a catch block that prints the used parameters if any assertion fails. It is a bit odd to use random parameters in the first place though :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random testing is quite powerful to identify unforeseen errors, especially since these inputs are user-configurable. I'll keep that but add better error messages as suggested.

*
* @return memory to use for network buffers (in bytes)
*/
public static long calculateNetworkBuf(long totalJavaMemorySize, Configuration config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a slightly longer method name?

// old + 1 new parameter = new:
Configuration config1 = config.clone();
config1.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
assertEquals(Math.min(defaultMax, Math.max(defaultMin, (long) (0.1f * (10L << 20)))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems overly complicated. If we would use a set of well-defined values instead of the defaults we could get rid of the min/max calls, no? This applies to the whole test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if we define our own test-defaults, your are right - however, we would decouple the test from the defaults that are set for real-world applications and I wanted to keep them as close as possible including any future change in the default values


config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that i see this test it got me thinking: the managedMemory and networkBuffersmemory frations do not work on the same base value; i.e if both are set at 0.5 then one (the network i think) gets 0.5 of the total memory, while the managedMemory gets 0.25. I'm wondering how intuitive this is; they are similar when used alone, but when both are used 0.5 doesn't equal 0.5 in a way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, unfortunately, that is the case and it has always been that way in the past (with the network buffers not being a fraction but a fixed amount of memory) but it is also properly documented (now).

With the possibility to specify min and max values for the network buffer memory size, the actual fraction may be different than the given one and we don't really want to fail jobs because we can't ensure the given fraction for the managed memory in that case, do we?
As a side note, this is also the safest way to ensure that the invariants hold, especially for the 0.5 vs. 0.5 example: inside Java, we will always allocate the network buffer memory first and then identify the remaining free heap space (if on-heap) from which we will use the given fraction.

@@ -450,7 +450,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc
}
catch (Throwable t) {
if (LOG.isErrorEnabled()) {
LOG.error("Unexpected problen while getting the file statistics for file '" + this.filePath + "': "
LOG.error("Unexpected problem while getting the file statistics for file '" + this.filePath + "': "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's see whether i can remember to not squash this commit :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, should I create a separate PR? (a separate JIRA is definitely overkill for this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can leave it as it is.


@Before
public void checkOperatingSystem() {
Assume.assumeTrue("This test checks shell scripts not available on Windows.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing "that are"?

Random ran = new Random();
for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
// tolerate that values differ by 1% (due to different floating point precisions)
compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As with the other randomized tests we should print the configured used for the test in case of failure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, here, I actually do already print the configuration in the error message


float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);

// long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these lines can be removed i guess

@NicoK
Copy link
Contributor Author

NicoK commented Apr 19, 2017

For the two tests that failed on Travis CI: they were simply killed and a "Killed" appeared in their logs which is usually an indicator that memory ran out and the kernel killed a process

@StephanEwen
Copy link
Contributor

I am checking this PR out now...

@StephanEwen
Copy link
Contributor

StephanEwen commented May 4, 2017

Code is good in general and well tested (including the shell scripts, which is great!)

I would do some on-the fly polishing while merging.
The main thing I want to adjust is that configuration parameters are specified in Megabytes, not Bytes. All other memory related parameters are in Megabytes, so that one should be as well, for consistency. Also, I think we don't need a finer granularity these days.

@StephanEwen
Copy link
Contributor

Okay, taking a step back. Looking through the code some more, the internal arithmetric should certainly stay in bytes. However, bytes are tedious to configure.

I suggest to add support to the configuration to interpret memory units, so that we can configure values via

  • 512m
  • 10 kb
  • ...

I have started some utility here: https://github.com/StephanEwen/incubator-flink/tree/mem_size

That means that we would keep the PR in this form and add memory configuration parsing as a followup.

@StephanEwen
Copy link
Contributor

Merging this.
I filed a follow-up JIRA to address the "configuration with units" to make sure all memory-related parameters behave the same way, without loss of byte precision where needed: https://issues.apache.org/jira/browse/FLINK-6469

StephanEwen pushed a commit to StephanEwen/flink that referenced this pull request May 6, 2017
Instead, allow the configuration with the following three new (more flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)

This closes apache#3721
@asfgit asfgit closed this in 0bb49e5 May 6, 2017
fanyon pushed a commit to fanyon/flink that referenced this pull request May 15, 2017
Instead, allow the configuration with the following three new (more flexible)
parameters:
 * "taskmanager.network.memory.fraction": fraction of JVM memory to use for network buffers (default: 0.1)
 * "taskmanager.network.memory.min": minimum memory size for network buffers (default: 64 MB)
 * "taskmanager.network.memory.max": maximum memory size for network buffers (default: 1 GB)

This closes apache#3721
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants