Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STORM-3650] Add topology config to attempt to schedule system component evenly throughout workers #3283

Closed
wants to merge 1 commit into from

Conversation

RuiLi8080
Copy link
Contributor

What is the purpose of the change

Add topology config for RAS scheduler to attempt to schedule system components/executors across all workers evenly instead of scheduling them all together after having finished all topology components.

How was the change tested

  1. test with even distribution option:
    storm jar /home/y/lib64/jars/storm-starter.jar org.apache.storm.starter.WordCountTopology wc1 -c topology..system.components.even.distribution=true -c topology.acker.executors=8 -c topology.component.resources.onheap.memory.mb=100 -c topology.acker.resources.onheap.memory.mb=150 -c topology.workers=4 -c experimental.topology.ras.order.executors.by.proximity.needs=true -c topology.worker.max.heap.size.mb=1000

image

  1. test without even distribution option storm jar /home/y/lib64/jars/storm-starter.jar org.apache.storm.starter.WordCountTopology wc-no-even-distribution -c topology.acker.executors=8 -c topology.component.resources.onheap.memory.mb=100 -c topology.acker.resources.onheap.memory.mb=150 -c topology.workers=4 -c experimental.topology.ras.order.executors.by.proximity.needs=true -c topology.worker.max.heap.size.mb=1000
    image

@RuiLi8080
Copy link
Contributor Author

Travis has a ExecutorTransferMultiThreadingTest failed. Re-run the test locally and it passed.

@@ -1695,10 +1695,10 @@ public HeartbeatCache getHeartbeatsCache() {

private Set<List<Integer>> getOrUpdateExecutors(String topoId, StormBase base, Map<String, Object> topoConf,
StormTopology topology)
throws IOException, AuthorizationException, InvalidTopologyException, KeyNotFoundException {
throws InvalidTopologyException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the Nimbus cleanup of parameter (topoId) and throws clause be part of a separate Jira?

@@ -84,6 +89,8 @@ public TopologyDetails(String topologyId, Map<String, Object> topologyConf, Stor
initConfigs();
this.launchTime = launchTime;
this.topoName = (String) topologyConf.get(Config.TOPOLOGY_NAME);
this.distributeSysCompEvenly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property is more useful in Scheduler code where is can be used to sort executors in a manner conducive to even distribution.

* with the list of its parents, children and execs assigned to that component.
*
* @return a map of components
*/
public Map<String, Component> getComponents() {
public Map<String, Component> getTopoComponents() {
if (topologyComponentsMap == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopologyDetails gets instantiated a large number of times. It was my observation that adding members or such cacheing in TopologyDetails was counterproductive. topologyComponentsMap and its sister cache allComponentsMap should not be added unless we actually gain from cacheing.

* with the list of its parents, children and execs assigned to that component.
*
* @return a map of components
*/
public Map<String, Component> getComponents() {
public Map<String, Component> getTopoComponents() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method retain the old name? And the getAllComponents be an overloaded getComponents()?

@@ -539,30 +541,34 @@ protected String nodeToRack(RasNode node) {
* @return a list of executors in sorted order
*/
private List<ExecutorDetails> orderExecutorsDefault(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be convenient if the even-distribution executor re-sorting was a single method that could be applied to executor list.
This way the upcoming refactoring of the scheduling strategies would be easier.

@@ -40,5 +42,5 @@
* @return returns a SchedulingResult object containing SchedulingStatus object to indicate whether scheduling is
* successful.
*/
SchedulingResult schedule(Cluster schedulingState, TopologyDetails td);
SchedulingResult schedule(Cluster schedulingState, TopologyDetails td) throws InvalidTopologyException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is InvalidTopologyException being thrown in schedule.

Copy link
Contributor

@bipinprasad bipinprasad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment for review: Does it make sense to detect ackers source/target component and use that in the executor sorting?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants