-
Notifications
You must be signed in to change notification settings - Fork 137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1165] improvement(tez): Unregister shuffle data after completing the execution of a DAG. #1166
Conversation
@zhengchenyu Could you help me review this pr? |
// and there is no correlation between the DAGs. | ||
// Therefore, after completing the execution of a DAG, | ||
// you can unregister the relevant shuffle data. | ||
tezRemoteShuffleManager.unregisterShuffle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that we run two DAGs concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Through analyzing the source code, you will find that this situation does not exist. Perhaps you can refer to the following links.
- https://github.com/apache/tez/blob/5beab4ced9d07bc813a8d79ded111b72af5a2f02/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java#L1315
- https://github.com/apache/tez/blob/5beab4ced9d07bc813a8d79ded111b72af5a2f02/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java#L1337
Its mechanism seems to ensure that the DAGs are executed in sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we register the app again after we unregister the app?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though only one current dag can run, I think we can not make sure the tez will not change in the future version.
As shuffleId is composed of dagId, source vertexId, vertexId. We should unregister all shuffle ids which are owned by dag when dag is finished, but not all shuffle id owned by app. I think this is more reasonable. Maybe the tez will support parallel dag in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhengchenyu hi, thank you for your reply, and I know what you mean. I have currently chosen a relatively simple way to implement it, but it can also be achieved by listening to the DAG_FINISHED
event. If you think the latter is better, I can make changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Let @zhengchenyu take another look.
Codecov Report
@@ Coverage Diff @@
## master #1166 +/- ##
============================================
+ Coverage 53.63% 54.71% +1.08%
- Complexity 2576 2581 +5
============================================
Files 391 371 -20
Lines 22359 20043 -2316
Branches 1875 1878 +3
============================================
- Hits 11992 10967 -1025
+ Misses 9660 8439 -1221
+ Partials 707 637 -70
... and 25 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@jerqi @zhengchenyu hi, could you review for my PR? thanks! |
// 9 send INTERNAL_ERROR to dispatcher | ||
dispatcher.getEventHandler().handle(new DAGEvent(dagImpl.getID(), DAGEventType.INTERNAL_ERROR)); | ||
|
||
// 10 wait DAGImpl transient to INITED state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some spell error in comments. wait DAGImpl transient to ERROR state?
appMaster, (OnStateChangedCallback) callbackMap.get(finalState)))); | ||
} | ||
|
||
static class DagFinalStateCallback implements OnStateChangedCallback<DAGState, DAGImpl> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think DagFinalStateCallback should be inherit from DAGImpl::DagStateChangedCallback.
What a pitty that DAGImpl::DagStateChangedCallback is private. But we can not ignore the logical in DAGImpl::DagStateChangedCallback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new DagFinalStateCallback(appMaster, (OnStateChangedCallback) callbackMap.get(finalState))
DagFinalStateCallback has field called callback
which got from callbackMap
, and the callback
will be called when dag state was changed. callback.onStateChanged(dag, dagState);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I miss it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @zhuyaogai @zhengchenyu , merged to master.
What changes were proposed in this pull request?
Generally, one application will execute multiple DAGs, and there is no correlation between the DAGs. Therefore, after completing the execution of a DAG, you can unregister the relevant shuffle data. Otherwise, when there are many DAGs, an application will occupy a large amount of resources for a long period of time.
Why are the changes needed?
Fix: #1165
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing test cases.