diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java index 75ae8c76bfd..953d9341564 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java @@ -225,8 +225,7 @@ private void scheduleTopology(TopologyDetails td, Cluster cluster, final User to } //Only place we fall though to do the loop over again... } else { //Any other failure result - //The assumption is that the strategy set the status... - topologySubmitter.markTopoUnsuccess(td, cluster); + topologySubmitter.markTopoUnsuccess(td, cluster, result.toString()); return; } } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java index 94f24070025..fce65b8e81b 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/User.java @@ -79,15 +79,18 @@ public TreeSet getPendingTopologies(ISchedulingState cluster) { return ret; } - public void markTopoUnsuccess(TopologyDetails topo, Cluster cluster) { + public void markTopoUnsuccess(TopologyDetails topo, Cluster cluster, String msg) { unsuccess.add(topo); if (cluster != null) { - cluster.setStatus(topo.getId(), "Scheduling Attempted but topology is invalid"); + if (msg == null) { + msg = "Scheduling Attempted but topology is invalid"; + } + cluster.setStatus(topo.getId(), msg); } } public void markTopoUnsuccess(TopologyDetails topo) { - this.markTopoUnsuccess(topo, null); + this.markTopoUnsuccess(topo, null, null); } public double getResourcePoolAverageUtilization(ISchedulingState cluster) { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java index 81994ba0275..b99bab258b5 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java @@ -252,7 +252,10 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)); final int maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch); - final long maxTimeMs = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1) * 1000L; + // expect to be killed by DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY seconds, terminate slightly before + int daemonMaxTimeSec = ObjectReader.getInt(td.getConf().get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60); + int confMaxTimeSec = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), daemonMaxTimeSec); + final long maxTimeMs = (confMaxTimeSec >= daemonMaxTimeSec) ? daemonMaxTimeSec * 1000L - 200L : confMaxTimeSec * 1000L; favoredNodeIds = makeHostToNodeIds((List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES)); unFavoredNodeIds = makeHostToNodeIds((List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES));