-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
STORM-3335 allow timing out when scheduling a topology #2957
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good. Left a few comments.
@@ -78,6 +85,9 @@ public void prepare(Map<String, Object> conf) { | |||
configLoader = ConfigLoaderFactoryService.createConfigLoader(conf); | |||
maxSchedulingAttempts = ObjectReader.getInt( | |||
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_TOPOLOGY_SCHEDULING_ATTEMPTS), 5); | |||
schedulingTimeoutSeconds = ObjectReader.getInt( | |||
conf.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60); | |||
backgroundScheduling = Executors.newFixedThreadPool(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to add a method that allows cleanup of IScheduler
, similar to the cleanup
method we have on IBolt
. This executor needs to be shut down, otherwise the thread will leak if this class gets used in tests.
+ td.getId() + " using strategy " + rasStrategy.getClass().getName() + " timeout after " | ||
+ schedulingTimeoutSeconds + " seconds."); | ||
schedulingFuture.cancel(true); | ||
rasStrategy.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider dropping the stop
method and make the scheduler check for interrupts instead. You're already sending an interrupt to it via the cancel
call above.
try { | ||
result = schedulingFuture.get(schedulingTimeoutSeconds, TimeUnit.SECONDS); | ||
} catch (TimeoutException te) { | ||
markFailedTopology(topologySubmitter, cluster, td, "Scheduling took too long for " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Consider referencing the config parameter users should change if they want to raise the timeout, so it is obvious what users can do when they get this message.
@srdo - I made the changes you suggested. |
@@ -101,6 +101,10 @@ public void prepare(Map<String, Object> conf) { | |||
//noop | |||
} | |||
|
|||
@Override | |||
public void cleanup() { | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the cleanup() method seems a no-op, can we define a default implementation of it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 - Updated. Please take a look.
@agresch Thanks, it looks great. I'm wondering if we can put the setup and cleanup in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
somehow missed some test files, more changes to come... |
@srdo - please look at the latest couple commits for the test cleanup. |
+1, thanks for addressing comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
@@ -74,6 +85,7 @@ public void TestBadSupervisor() { | |||
Cluster cluster = new Cluster(iNimbus, resourceMetrics, supMap, new HashMap<>(), topologies, config); | |||
BlacklistScheduler bs = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry); | |||
bs.prepare(config); | |||
scheduler = bs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it doesn't matter. But we could just use scheduler = new BlacklistScheduler(new DefaultScheduler(), metricsRegistry);
directly
@@ -252,6 +270,7 @@ public void testTopologySetCpuAndMemLoad() { | |||
TopologyDetails topology1 = new TopologyDetails("topology1", config, stormTopology1, 0, executorMap1, 0, "user"); | |||
|
|||
ResourceAwareScheduler rs = new ResourceAwareScheduler(); | |||
scheduler = rs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Could we use scheduler = new ResourceAwareScheduler();
directly?
@@ -723,7 +769,7 @@ public void testSubmitUsersWithNoGuarantees() { | |||
ResourceAwareScheduler rs = new ResourceAwareScheduler(); | |||
rs.prepare(config); | |||
rs.schedule(topologies, cluster); | |||
|
|||
scheduler = rs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. scheduler = new ResourceAwareScheduler();
directly?
...java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
Show resolved
Hide resolved
@Ethanlm - please check again. Thanks. |
@agresch Thank you for your patience. Please squash to one commit, and I'll merge. |
b59d009
to
435e57b
Compare
@srdo - squashed |
This is based on a couple of internal PRs @govind-menon committed to our older version of storm.