-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism. #4904
Conversation
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
// Generate parallel Callable tasks | ||
for (SubClusterId subClusterId : subClusterIds) { | ||
callables.add(() -> { | ||
ApplicationClientProtocol protocol = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your help reviewing the code, I will fix it.
callables.add(() -> { | ||
ApplicationClientProtocol protocol = | ||
getClientRMProxyForSubCluster(subClusterId); | ||
Method method = ApplicationClientProtocol.class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
new Class[]{GetClusterNodesRequest.class}, new Object[]{request}); | ||
Collection<GetClusterNodesResponse> clusterNodes = null; | ||
try { | ||
clusterNodes = invokeConcurrent(remoteMethod, GetClusterNodesResponse.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we define and return inside the try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will refactor the code.
Assert.assertTrue(clusterMetrics.isEmpty()); | ||
Collection<GetClusterMetricsResponse> clusterMetrics = interceptor. | ||
invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class); | ||
Assert.assertTrue(!clusterMetrics.isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertFalse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
Assert.assertTrue(clusterMetrics.isEmpty()); | ||
Collection<GetClusterMetricsResponse> clusterMetrics = interceptor. | ||
invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class); | ||
Assert.assertTrue(!clusterMetrics.isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the expectation is the opposite? What has changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invokeConcurrent
This function originally contains 3 parameters
- ArrayList clusterIds
- ClientMethod request
- Class clazz
The first parameter is the SubClusters currently active.
In order to get the first parameter, all calling functions need to call the following function
Map<SubClusterId, SubClusterInfo> subclusters = federationFacade.getSubClusters(true);
I think we can put the fetching of the 1st parameter inside invokeConcurrent
, which is more reasonable.
This unit test tests a situation where there is no Subcluster available, so an empty result is returned.
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
I removed the 1st parameter, it is a little difficult to simulate the situation where all Subclusters are unavailable.
I try to ensure that the original test logic can be completed, and I will refactor part of the code.
* @throws YarnException if the call to get active subClusterIds is unsuccessful | ||
*/ | ||
public Collection<SubClusterId> getActiveSubClusterIds() throws YarnException { | ||
Map<SubClusterId, SubClusterInfo> activeSubClusters = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
*/ | ||
public Collection<SubClusterId> getActiveSubClusterIds() throws YarnException { | ||
Map<SubClusterId, SubClusterInfo> activeSubClusters = | ||
getSubClusters(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put the true in a variable to make it easier to understand what the true represents?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will use the getSubClusters
method. The way of using parameters (true) is better for code understanding.
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@goiri Please help to review this pr again, thank you very much! |
@goiri Can you help review this pr? Thank you very much! |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
throws YarnException { | ||
|
||
// Get Active SubClusters | ||
Map<SubClusterId, SubClusterInfo> subClusterInfo = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your help reviewing the code, I will fix it.
ClientMethod remoteMethod = new ClientMethod("getClusterMetrics", | ||
new Class[] {GetClusterMetricsRequest.class}, | ||
new Object[] {GetClusterMetricsRequest.newInstance()}); | ||
Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor. | ||
invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class); | ||
Collection<GetClusterMetricsResponse> clusterMetrics = interceptor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.invokeConcurrent(
remoteMethod, GetClusterMetricsResponse.class);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@goiri Thank you very much for your help reviewing the code! |
JIRA: YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism.