diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 2866b03eff6..a2aa58a81fb 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -315,13 +315,6 @@ public class Config extends HashMap { @IsInteger @IsPositiveNumber public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH = "topology.ras.constraint.max.state.search"; - /** - * The maximum number of states that will be searched looking for a solution in the constraint solver strategy. - * Backward compatibility config value for old topologies - */ - @IsInteger - @IsPositiveNumber - public static final String TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL = "topology.ras.constraint.max.state.traversal"; /** * The maximum number of seconds to spend scheduling a topology using the constraint solver. Null means no limit. */ diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java index 0a6b2791fda..9f8265db43d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java @@ -248,24 +248,11 @@ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { Map> workerCompAssignment = new HashMap<>(); Map> nodeCompAssignment = new HashMap<>(); - //set max number of states to search maintaining backward compatibility for old topologies - String stormVersionString = td.getTopology().get_storm_version(); - boolean is2xTopology = stormVersionString != null && stormVersionString.startsWith("2"); - - Object confMaxStateSearch = null; - if (is2xTopology == false) { - //backward compatibility - confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_TRAVERSAL); - } - if (confMaxStateSearch == null) { - //new topology or old topology using new config - confMaxStateSearch = td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH); - } - int daemonMaxStateSearch = ObjectReader.getInt(td.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)); - final int maxStateSearch = Math.min(daemonMaxStateSearch, ObjectReader.getInt(confMaxStateSearch)); + int confMaxStateSearch = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH)); + int daemonMaxStateSearch = ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)); + final int maxStateSearch = Math.min(daemonMaxStateSearch, confMaxStateSearch); - final long maxTimeMs = - ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1).intValue() * 1000L; + final long maxTimeMs = ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS), -1) * 1000L; favoredNodeIds = makeHostToNodeIds((List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES)); unFavoredNodeIds = makeHostToNodeIds((List) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES)); diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java index 2bad23a355e..4ae5c88db75 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java @@ -87,10 +87,16 @@ public TopologyDetails makeTopology(Map config, int boltParallel return genTopology("testTopo", config, 1, 4, 4, boltParallel, 0, 0, "user"); } - public Cluster makeCluster(TopologyDetails topo) { - Topologies topologies = new Topologies(topo); - Map supMap = genSupervisors(4, 2, 120, 1200); - return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, new Config()); + public Cluster makeCluster(Topologies topologies) { + return makeCluster(topologies, null); + } + + public Cluster makeCluster(Topologies topologies, Map supMap) { + if (supMap == null) { + supMap = genSupervisors(4, 2, 120, 1200); + } + Map config = Utils.readDefaultConfig(); + return new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config); } public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int boltParallel) { @@ -98,7 +104,8 @@ public void basicUnitTestWithKillAndRecover(ConstraintSolverStrategy cs, int bol cs.prepare(config); TopologyDetails topo = makeTopology(config, boltParallel); - Cluster cluster = makeCluster(topo); + Topologies topologies = new Topologies(topo); + Cluster cluster = makeCluster(topologies); LOG.info("Scheduling..."); SchedulingResult result = cs.schedule(cluster, topo); @@ -164,7 +171,8 @@ public void basicFailureTest(String confKey, Object confValue, ConstraintSolverS cs.prepare(config); TopologyDetails topo = makeTopology(config, NORMAL_BOLT_PARALLEL); - Cluster cluster = makeCluster(topo); + Topologies topologies = new Topologies(topo); + Cluster cluster = makeCluster(topologies); LOG.info("Scheduling..."); SchedulingResult result = cs.schedule(cluster, topo); @@ -196,8 +204,6 @@ protected SolverResult backtrackSearch(SearcherState state) { @Test public void testIntegrationWithRAS() { - Map supMap = genSupervisors(30, 16, 400, 1024 * 4); - List> constraints = new LinkedList<>(); addContraints("spout-0", "bolt-0", constraints); addContraints("bolt-1", "bolt-1", constraints); @@ -206,8 +212,6 @@ public void testIntegrationWithRAS() { spread.add("spout-0"); Map config = Utils.readDefaultConfig(); - config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, DefaultSchedulingPriorityStrategy.class.getName()); - config.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH, MAX_TRAVERSAL_DEPTH); config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName()); config.put(Config.TOPOLOGY_SPREAD_COMPONENTS, spread); config.put(Config.TOPOLOGY_RAS_CONSTRAINTS, constraints); @@ -222,7 +226,8 @@ public void testIntegrationWithRAS() { Map topoMap = new HashMap<>(); topoMap.put(topo.getId(), topo); Topologies topologies = new Topologies(topoMap); - Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<>(), topologies, config); + Map supMap = genSupervisors(30, 16, 400, 1024 * 4); + Cluster cluster = makeCluster(topologies, supMap); ResourceAwareScheduler rs = new ResourceAwareScheduler(); rs.prepare(config); try {