New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-9028][flip6] perform parameters checking before starting cluster #5726
[FLINK-9028][flip6] perform parameters checking before starting cluster #5726
Conversation
CC: @tillrohrmann |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @sihuazhou. The fix is a good idea. I think we can still improve it a bit by refactoring the verification logic and moving the check to AbstractYarnClusterDescriptor#deployInternal
.
// aim to check the parameters | ||
ContaineredTaskManagerParameters.create( | ||
effectiveConfiguration, | ||
effectiveConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should pull the verification logic out of the ContaineredTaskManagerParameters#create
method and place it somewhere else. I would also suggest to move this check to AbstractYarnClusterDescriptor#deployInternal
and use the provided ClusterSpecification#getTaskManagerMemory
for the memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I will pull it out and add a test to guard it.
@tillrohrmann I made a fixup according to your comments, could you please have a look again? |
efd1ec5
to
b39bd8b
Compare
b39bd8b
to
c984370
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some more comments.
/** | ||
* Tests to guard {@link ResourceManagerRuntimeServices}. | ||
*/ | ||
public class ResourceManagerRuntimeServicesTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing extends TestLogger
@@ -423,6 +431,9 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException | |||
@Nullable JobGraph jobGraph, | |||
boolean detached) throws Exception { | |||
|
|||
// ------------------ Check if configuration is valid -------------------- | |||
checkConfig(clusterSpecification); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's not abbreviate and name it validateClusterSpecification
@@ -409,6 +411,12 @@ public void terminateCluster(ApplicationId applicationId) throws FlinkException | |||
} | |||
} | |||
|
|||
private void checkConfig(ClusterSpecification clusterSpecification) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should throw a checked exception. Something like InvalidConfigurationException
or so. But not just fail with an IllegalStateException
.
private void checkConfig(ClusterSpecification clusterSpecification) { | ||
long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB(); | ||
long cutoff = ResourceManagerRuntimeServices.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize); | ||
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I somehow don't like that we calculate some value just in order to check whether an unchecked exception is thrown. This should be imo more explicit. Or at least it should get a comment explaining what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm...This is a bit tricky, the calculation logic
is a bit hard to be separated from the check logical
so clearly, because when we perform the checking we also need to do the calculation by the way. So pull the check logical
from the calculateFunction
to be a separated function means we will have some
code duplication. So I'd like to add a comment here, if you still think we should pull out a new function I would just do it ;).
|
||
final long javaMemorySizeMB = containerMemoryMB - cutoff; | ||
// (1) try to compute how much memory used by container | ||
final long cutoffMB = ResourceManagerRuntimeServices.calculateCutoffMB(config, containerMemoryMB); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep this method in the ContaineredTaskManagerParameters
class.
b8776f2
to
2e65190
Compare
@tillrohrmann I have addressed your comments, could you please review it again. |
Thanks for your contribution @sihuazhou. Changes look good to me. Merging this PR once Travis gave green light. |
2e65190
to
f6e1105
Compare
…cluster This closes apache#5726.
...found a checkstyle bug in my local built, I made a fixup. |
Have seen it as well and fixed it. So no worries. |
…cluster This closes apache#5726.
…cluster This closes apache#5726.
What is the purpose of the change
Perform parameters checking before starting cluster to prevent to setup a problematic cluster.
Brief change log
Verifying this change
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation
no