-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side #8556
[FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side #8556
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit c568988 (Tue Aug 06 15:53:58 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
74bdb6d
to
3796381
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed this sounds like a bug:
TaskManagerServices#calculateHeapSizeMB
allows network buffer fractions > 0.5, butNettyShuffleEnvironmentConfiguration#calculateNewNetworkBufferMemory()
does not (so far)
However, the whole memory calculation thing is quite fragile and we need to take care that it is right everywhere we expect it to be. Apparently, there were no unit tests for the scenario you were fixing. These should be added. Playing around with it a bit on my side, I came up with some initial tests and changes (to improve the floating point arithmetic) - maybe you also need to add something to TaskManagerHeapSizeCalculationJavaBashTest
:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index daccd3e6e8..d0e5d46f7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -235,8 +235,10 @@ public class NettyShuffleEnvironmentConfiguration {
// = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
// jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
- long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
- return calculateNewNetworkBufferMemory(config, networkBufSize, Long.MAX_VALUE);
+ double heapFraction = 1.0 - Double.valueOf(Float.toString(networkBufFraction));
+ long totalJavaMemorySize = (long) (jvmHeapNoNet / heapFraction);
+ long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
+ return calculateNewNetworkBufferMemory(config, networkBufSize, Long.MAX_VALUE);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index 84a08c125b..1a17fee200 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -238,6 +238,10 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
+ config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
+ config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.6f);
+ assertEquals(400, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 39b9d7407c..1823455b43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -42,27 +42,37 @@ public class NetworkBufferCalculationTest extends TestLogger {
public void calculateNetworkBufFromHeapSize() {
Configuration config;
+ final long networkBufMin = 64L << 20; // 64MB
+ final long networkBufMax = 1L << 30; // 1GB
+
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
- 0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
- assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+ 0.1f, networkBufMin, networkBufMax, MemoryType.HEAP);
+ assertEquals((100L << 20),
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 900L << 20)); // 900MB
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
- 0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
- assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
+ 0.6f, networkBufMin, networkBufMax, MemoryType.HEAP);
+ assertEquals((600L << 20),
+ NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 800MB
+
+ config = getConfig(
+ Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
+ TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+ 0.2f, networkBufMin, networkBufMax, MemoryType.HEAP);
+ assertEquals((200L << 20),
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 800L << 20)); // 800MB
config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
- 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
- assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+ 0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
+ assertEquals((100L << 20),
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 890L << 20)); // 890MB
- config = getConfig(0, 0.1f, 0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
- assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+ config = getConfig(0, 0.1f, 0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
+ assertEquals((100L << 20),
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 810L << 20)); // 810MB
}
// jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction) | ||
// jvmHeapNoNet = jvmTotal - networkBufBytes | ||
// = jvmTotal - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction) | ||
// jvmTotal = jvmHeapNoNet / (1.0 - networkBufFraction) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Terminology wise, maybe, we should go with the names of TaskManagerServices#calculateHeapSizeMB
which this function should be the inverse of (also adapt in the lines above):
final long totalProcessMemory = megabytesToBytes(totalJavaMemorySizeMB);
final long networkReservedMemory = getReservedNetworkMemory(config, totalProcessMemory);
final long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agree with that unified terminology should be more clear. I have modified the terminology used in NettyShuffleEnvironmentConfiguration
.
|
||
// Do not need to check the maximum allowed memory since the computed total memory should always | ||
// be larger than the computed network buffer memory as long as the fraction is less than 1. | ||
return calculateNewNetworkBufferMemory(config, networkBufSize, Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that you can change this without a single test failing indicates no test coverage for it. Please add some tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very sorry for missing the tests, I have complemented the tests according to the comments.
Very thanks @NicoK for the review and sorry for missing the tests. I will address the comments and complement the tests right now. |
f701635
to
4e2a56f
Compare
Hi @NicoK , very thanks for the review and the suggestions! The modification truly increase the precision of the computation, I have updated the PR to address the issues in the comments and include the proposed enhancement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update - code looks good now with two minor things:
- the two
FLINK-12171
commits should be merged together (test and implementation belong together - one cannot stand without the other) - an indentation (see inline)
...src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
Outdated
Show resolved
Hide resolved
83fda20
to
8f34663
Compare
Very thanks @NicoK review the codes! I have merged the two related commits, and fixed the indentation problem. |
8f34663
to
c568988
Compare
What is the purpose of the change
This pull request fixes the bug that limits the network buffer size with the heap size on the TM side. In fact, network buffer occupies a part of direct memory and is independent with the heap.
To fix this problem, The limitation on the TM side is removed. Although we may want to compare the network memory size with the total memory size on the TM side, currently we can only compute the total memory with heap + computed network memory and the computed total memory should be always larger than the computed network memory.
To remove the limitation, the max allowed memory used to check the network memory size on TM side is changed to Long.MAX_VALUE. Another option is to move the checking to the caller function on the RM side. however, it is not easy to achieve since the checking relies on the configured values of MIN and MAX, and it is not accessible outside of the current function.
Brief change log
NettyShuffleEnvironmentConfiguration
Verifying this change
This change added tests and can be verified as follows:
fraction > 0.5
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation