From 45f9d0b12f01fcc628c4016a72e232d786d868c5 Mon Sep 17 00:00:00 2001 From: dandsager Date: Tue, 16 Jul 2019 15:16:42 -0500 Subject: [PATCH] STORM-3474 Large fragmented cluster scheduling time test --- .../resource/TestResourceAwareScheduler.java | 210 ++++++++++++++++++ .../TestUtilsForResourceAwareScheduler.java | 8 + 2 files changed, 218 insertions(+) diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java index 405e2aeb5c2..17b9fa5bb64 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java @@ -43,6 +43,7 @@ import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.scheduler.resource.normalization.NormalizedResources; import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy; +import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy; @@ -1067,6 +1068,215 @@ public void minCpuWorkerSplitFails() { assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null); } + protected static class TimeBlockResult { + List firstBlockTime; + List lastBlockTime; + + TimeBlockResult() { + firstBlockTime = new ArrayList<>(); + lastBlockTime = new ArrayList<>(); + } + + void append(TimeBlockResult other) { + this.firstBlockTime.addAll(other.firstBlockTime); + this.lastBlockTime.addAll(other.lastBlockTime); + } + } + + private long getMedianValue(List values) { + final int numValues = values.size(); + assert(numValues % 2 == 1); // number of values must be odd to compute median as below + List sortedValues = new ArrayList(); + sortedValues.addAll(values); + Collections.sort(sortedValues); + + final int medianIndex = (int) Math.floor(numValues / 2); + return sortedValues.get(medianIndex); + } + + /** + * Check time to schedule a fragmented cluster using different strategies + * + * Simulate scheduling on a large production cluster. Find the ratio of time to schedule a set of topologies when + * the cluster is empty and when the cluster is nearly full. While the cluster has sufficient resources to schedule + * all topologies, when nearly full the cluster becomes fragmented and some topologies fail to schedule. + */ + @Test + public void TestLargeFragmentedClusterScheduling() { + /* + Without fragmentation, the cluster would be able to schedule both topologies on each node. Let's call each node + with both topologies scheduled as 100% scheduled. + + We schedule the cluster in 3 blocks of topologies, measuring the time to schedule the blocks. The first, middle + and last blocks attempt to schedule the following 0-10%, 10%-90%, 90%-100%. The last block has a number of + scheduling failures due to cluster fragmentation and its time is dominated by attempting to evict topologies. + + Timing results for scheduling are noisy. As a result, we do multiple runs and use median values for FirstBlock + and LastBlock times. (somewhere a statistician is crying). The ratio of LastBlock / FirstBlock remains fairly constant. + + + TestLargeFragmentedClusterScheduling took 91118 ms + DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1734.0 ratio 6.963855421686747 + GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1673.0 ratio 7.78139534883721 + ConstraintSolverStrategy, FirstBlock 279.0, LastBlock 2200.0 ratio 7.885304659498208 + + TestLargeFragmentedClusterScheduling took 98455 ms + DefaultResourceAwareStrategy, FirstBlock 266.0, LastBlock 1812.0 ratio 6.81203007518797 + GenericResourceAwareStrategy, FirstBlock 235.0, LastBlock 1802.0 ratio 7.6680851063829785 + ConstraintSolverStrategy, FirstBlock 304.0, LastBlock 2320.0 ratio 7.631578947368421 + + TestLargeFragmentedClusterScheduling took 97268 ms + DefaultResourceAwareStrategy, FirstBlock 251.0, LastBlock 1826.0 ratio 7.274900398406374 + GenericResourceAwareStrategy, FirstBlock 220.0, LastBlock 1719.0 ratio 7.8136363636363635 + ConstraintSolverStrategy, FirstBlock 296.0, LastBlock 2469.0 ratio 8.341216216216216 + + TestLargeFragmentedClusterScheduling took 97963 ms + DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1788.0 ratio 7.180722891566265 + GenericResourceAwareStrategy, FirstBlock 240.0, LastBlock 1796.0 ratio 7.483333333333333 + ConstraintSolverStrategy, FirstBlock 328.0, LastBlock 2544.0 ratio 7.7560975609756095 + + TestLargeFragmentedClusterScheduling took 93106 ms + DefaultResourceAwareStrategy, FirstBlock 258.0, LastBlock 1714.0 ratio 6.6434108527131785 + GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1692.0 ratio 7.869767441860465 + ConstraintSolverStrategy, FirstBlock 309.0, LastBlock 2342.0 ratio 7.5792880258899675 + + Choose the median value of the values above + DefaultResourceAwareStrategy 6.96 + GenericResourceAwareStrategy 7.78 + ConstraintSolverStrategy 7.75 + */ + + final int numNodes = 500; + final int numRuns = 5; + + Map strategyToConfigs = new HashMap<>(); + strategyToConfigs.put(DefaultResourceAwareStrategy.class.getName(), createClusterConfig(10, 10, 0, null)); + strategyToConfigs.put(GenericResourceAwareStrategy.class.getName(), createGrasClusterConfig(10, 10, 0, null, null)); + strategyToConfigs.put(ConstraintSolverStrategy.class.getName(), createCSSClusterConfig(10, 10, 0, null)); + + Map strategyToTimeBlockResults = new HashMap<>(); + + // AcceptedBlockTimeRatios obtained by empirical testing (see comment block above) + Map strategyToAcceptedBlockTimeRatios = new HashMap<>(); + strategyToAcceptedBlockTimeRatios.put(DefaultResourceAwareStrategy.class.getName(), 6.96); + strategyToAcceptedBlockTimeRatios.put(GenericResourceAwareStrategy.class.getName(), 7.78); + strategyToAcceptedBlockTimeRatios.put(ConstraintSolverStrategy.class.getName(), 7.75); + + // Get first and last block times for multiple runs and strategies + long startTime = Time.currentTimeMillis(); + for (Entry strategyConfig : strategyToConfigs.entrySet()) { + TimeBlockResult strategyTimeBlockResult = strategyToTimeBlockResults.computeIfAbsent(strategyConfig.getKey(), (k) -> new TimeBlockResult()); + for (int run = 0; run < numRuns; ++run) { + TimeBlockResult result = testLargeClusterSchedulingTiming(numNodes, strategyConfig.getValue()); + strategyTimeBlockResult.append(result); + } + } + + // Log median ratios for different strategies + LOG.info("TestLargeFragmentedClusterScheduling took {} ms", Time.currentTimeMillis() - startTime); + for (Entry strategyResult : strategyToTimeBlockResults.entrySet()) { + TimeBlockResult strategyTimeBlockResult = strategyResult.getValue(); + double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime); + double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime); + double ratio = medianLastBlockTime / medianFirstBlockTime; + LOG.info("{}, FirstBlock {}, LastBlock {} ratio {}", strategyResult.getKey(), medianFirstBlockTime, medianLastBlockTime, ratio); + } + + // Check last block scheduling time does not get significantly slower + for (Entry strategyResult : strategyToTimeBlockResults.entrySet()) { + TimeBlockResult strategyTimeBlockResult = strategyResult.getValue(); + double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime); + double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime); + double ratio = medianLastBlockTime / medianFirstBlockTime; + + double slowSchedulingThreshold = 1.5; + String msg = "Strategy " + strategyResult.getKey() + " scheduling is significantly slower for mostly full fragmented cluster\n"; + msg += "Ratio was " + ratio + " Max allowed is " + (slowSchedulingThreshold * ratio); + assertTrue(msg, ratio < slowSchedulingThreshold * strategyToAcceptedBlockTimeRatios.get(strategyResult.getKey())); + } + } + + // Create multiple copies of a test topology + private void addTopologyBlockToMap(Map topologyMap, String baseName, Config config, + double spoutMemoryLoad, int[] blockIndices) { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("testSpout", new TestSpout(), 1).setMemoryLoad(spoutMemoryLoad); + StormTopology stormTopology = builder.createTopology(); + Map executorMap = genExecsAndComps(stormTopology); + + for (int i = blockIndices[0]; i <= blockIndices[1]; ++i) { + TopologyDetails topo = new TopologyDetails(baseName + i, config, stormTopology, 0, executorMap, 0, "user"); + topologyMap.put(topo.getId(), topo); + } + } + + /* + * Test time to schedule large cluster scheduling with fragmentation + */ + private TimeBlockResult testLargeClusterSchedulingTiming(int numNodes, Config config) { + // Attempt to schedule multiple copies of 2 different topologies (topo-t0 and topo-t1) in 3 blocks. + // Without fragmentation it is possible to schedule all topologies, but fragmentation causes topologies to not + // schedule for the last block. + + // Get start/end indices for blocks + int numTopologyPairs = numNodes; + int increment = (int) Math.floor(numTopologyPairs * 0.1); + int firstBlockIndices[] = {0, increment - 1}; + int midBlockIndices[] = {increment, numTopologyPairs - increment - 1}; + int lastBlockIndices[] = {numTopologyPairs - increment, numTopologyPairs - 1}; + + // Memory is the constraining resource. + double t0Mem = 70; // memory required by topo-t0 + double t1Mem = 20; // memory required by topo-t1 + double nodeMem = 100; + + // first block (0% - 10%) + Map topologyMap = new HashMap<>(); + addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, firstBlockIndices); + addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, firstBlockIndices); + Topologies topologies = new Topologies(topologyMap); + + Map supMap = genSupervisors(numNodes, 7, 3500, nodeMem); + Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap(), topologies, config); + TimeBlockResult timeBlockResult = new TimeBlockResult(); + + // schedule first block (0% - 10%) + { + scheduler = new ResourceAwareScheduler(); + scheduler.prepare(config); + + long time = Time.currentTimeMillis(); + scheduler.schedule(topologies, cluster); + timeBlockResult.firstBlockTime.add(Time.currentTimeMillis() - time); + } + + // schedule mid block (10% - 90%) + { + addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, midBlockIndices); + addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, midBlockIndices); + + topologies = new Topologies(topologyMap); + cluster = new Cluster(cluster, topologies); + + scheduler.schedule(topologies, cluster); + } + + // schedule last block (90% to 100%) + { + addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, lastBlockIndices); + addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, lastBlockIndices); + + topologies = new Topologies(topologyMap); + cluster = new Cluster(cluster, topologies); + + long time = Time.currentTimeMillis(); + scheduler.schedule(topologies, cluster); + timeBlockResult.lastBlockTime.add(Time.currentTimeMillis() - time); + } + + return timeBlockResult; + } + /** * Test multiple spouts and cyclic topologies */ diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java index e29830ed3bf..276cc46fddb 100644 --- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java +++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/TestUtilsForResourceAwareScheduler.java @@ -31,6 +31,7 @@ import org.apache.storm.scheduler.TopologyDetails; import org.apache.storm.scheduler.WorkerSlot; import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy; +import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy; import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy; import org.apache.storm.spout.SpoutOutputCollector; @@ -102,6 +103,13 @@ public static Map> userResourcePool(TestUserResource return ret; } + public static Config createCSSClusterConfig(double compPcore, double compOnHeap, double compOffHeap, + Map> pools) { + Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools); + config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName()); + return config; + } + public static Config createGrasClusterConfig(double compPcore, double compOnHeap, double compOffHeap, Map> pools, Map genericResourceMap) { Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);