Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
Expand Down Expand Up @@ -1067,6 +1068,215 @@ public void minCpuWorkerSplitFails() {
assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(topo1.getId()) != null);
}

protected static class TimeBlockResult {
List<Long> firstBlockTime;
List<Long> lastBlockTime;

TimeBlockResult() {
firstBlockTime = new ArrayList<>();
lastBlockTime = new ArrayList<>();
}

void append(TimeBlockResult other) {
this.firstBlockTime.addAll(other.firstBlockTime);
this.lastBlockTime.addAll(other.lastBlockTime);
}
}

private long getMedianValue(List<Long> values) {
final int numValues = values.size();
assert(numValues % 2 == 1); // number of values must be odd to compute median as below
List<Long> sortedValues = new ArrayList<Long>();
sortedValues.addAll(values);
Collections.sort(sortedValues);

final int medianIndex = (int) Math.floor(numValues / 2);
return sortedValues.get(medianIndex);
}

/**
* Check time to schedule a fragmented cluster using different strategies
*
* Simulate scheduling on a large production cluster. Find the ratio of time to schedule a set of topologies when
* the cluster is empty and when the cluster is nearly full. While the cluster has sufficient resources to schedule
* all topologies, when nearly full the cluster becomes fragmented and some topologies fail to schedule.
*/
@Test
public void TestLargeFragmentedClusterScheduling() {
/*
Without fragmentation, the cluster would be able to schedule both topologies on each node. Let's call each node
with both topologies scheduled as 100% scheduled.

We schedule the cluster in 3 blocks of topologies, measuring the time to schedule the blocks. The first, middle
and last blocks attempt to schedule the following 0-10%, 10%-90%, 90%-100%. The last block has a number of
scheduling failures due to cluster fragmentation and its time is dominated by attempting to evict topologies.

Timing results for scheduling are noisy. As a result, we do multiple runs and use median values for FirstBlock
and LastBlock times. (somewhere a statistician is crying). The ratio of LastBlock / FirstBlock remains fairly constant.


TestLargeFragmentedClusterScheduling took 91118 ms
DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1734.0 ratio 6.963855421686747
GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1673.0 ratio 7.78139534883721
ConstraintSolverStrategy, FirstBlock 279.0, LastBlock 2200.0 ratio 7.885304659498208

TestLargeFragmentedClusterScheduling took 98455 ms
DefaultResourceAwareStrategy, FirstBlock 266.0, LastBlock 1812.0 ratio 6.81203007518797
GenericResourceAwareStrategy, FirstBlock 235.0, LastBlock 1802.0 ratio 7.6680851063829785
ConstraintSolverStrategy, FirstBlock 304.0, LastBlock 2320.0 ratio 7.631578947368421

TestLargeFragmentedClusterScheduling took 97268 ms
DefaultResourceAwareStrategy, FirstBlock 251.0, LastBlock 1826.0 ratio 7.274900398406374
GenericResourceAwareStrategy, FirstBlock 220.0, LastBlock 1719.0 ratio 7.8136363636363635
ConstraintSolverStrategy, FirstBlock 296.0, LastBlock 2469.0 ratio 8.341216216216216

TestLargeFragmentedClusterScheduling took 97963 ms
DefaultResourceAwareStrategy, FirstBlock 249.0, LastBlock 1788.0 ratio 7.180722891566265
GenericResourceAwareStrategy, FirstBlock 240.0, LastBlock 1796.0 ratio 7.483333333333333
ConstraintSolverStrategy, FirstBlock 328.0, LastBlock 2544.0 ratio 7.7560975609756095

TestLargeFragmentedClusterScheduling took 93106 ms
DefaultResourceAwareStrategy, FirstBlock 258.0, LastBlock 1714.0 ratio 6.6434108527131785
GenericResourceAwareStrategy, FirstBlock 215.0, LastBlock 1692.0 ratio 7.869767441860465
ConstraintSolverStrategy, FirstBlock 309.0, LastBlock 2342.0 ratio 7.5792880258899675

Choose the median value of the values above
DefaultResourceAwareStrategy 6.96
GenericResourceAwareStrategy 7.78
ConstraintSolverStrategy 7.75
*/

final int numNodes = 500;
final int numRuns = 5;

Map<String, Config> strategyToConfigs = new HashMap<>();
strategyToConfigs.put(DefaultResourceAwareStrategy.class.getName(), createClusterConfig(10, 10, 0, null));
strategyToConfigs.put(GenericResourceAwareStrategy.class.getName(), createGrasClusterConfig(10, 10, 0, null, null));
strategyToConfigs.put(ConstraintSolverStrategy.class.getName(), createCSSClusterConfig(10, 10, 0, null));

Map<String, TimeBlockResult> strategyToTimeBlockResults = new HashMap<>();

// AcceptedBlockTimeRatios obtained by empirical testing (see comment block above)
Map<String, Double> strategyToAcceptedBlockTimeRatios = new HashMap<>();
strategyToAcceptedBlockTimeRatios.put(DefaultResourceAwareStrategy.class.getName(), 6.96);
strategyToAcceptedBlockTimeRatios.put(GenericResourceAwareStrategy.class.getName(), 7.78);
strategyToAcceptedBlockTimeRatios.put(ConstraintSolverStrategy.class.getName(), 7.75);

// Get first and last block times for multiple runs and strategies
long startTime = Time.currentTimeMillis();
for (Entry<String, Config> strategyConfig : strategyToConfigs.entrySet()) {
TimeBlockResult strategyTimeBlockResult = strategyToTimeBlockResults.computeIfAbsent(strategyConfig.getKey(), (k) -> new TimeBlockResult());
for (int run = 0; run < numRuns; ++run) {
TimeBlockResult result = testLargeClusterSchedulingTiming(numNodes, strategyConfig.getValue());
strategyTimeBlockResult.append(result);
}
}

// Log median ratios for different strategies
LOG.info("TestLargeFragmentedClusterScheduling took {} ms", Time.currentTimeMillis() - startTime);
for (Entry<String, TimeBlockResult> strategyResult : strategyToTimeBlockResults.entrySet()) {
TimeBlockResult strategyTimeBlockResult = strategyResult.getValue();
double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime);
double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime);
double ratio = medianLastBlockTime / medianFirstBlockTime;
LOG.info("{}, FirstBlock {}, LastBlock {} ratio {}", strategyResult.getKey(), medianFirstBlockTime, medianLastBlockTime, ratio);
}

// Check last block scheduling time does not get significantly slower
for (Entry<String, TimeBlockResult> strategyResult : strategyToTimeBlockResults.entrySet()) {
TimeBlockResult strategyTimeBlockResult = strategyResult.getValue();
double medianFirstBlockTime = getMedianValue(strategyTimeBlockResult.firstBlockTime);
double medianLastBlockTime = getMedianValue(strategyTimeBlockResult.lastBlockTime);
double ratio = medianLastBlockTime / medianFirstBlockTime;

double slowSchedulingThreshold = 1.5;
String msg = "Strategy " + strategyResult.getKey() + " scheduling is significantly slower for mostly full fragmented cluster\n";
msg += "Ratio was " + ratio + " Max allowed is " + (slowSchedulingThreshold * ratio);
assertTrue(msg, ratio < slowSchedulingThreshold * strategyToAcceptedBlockTimeRatios.get(strategyResult.getKey()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not something you have to change, but as you mentioned you are new to Java, I thought I'd mention this as you may be unfamiliar with JUnit and Hamcrest:

There's an assertThat method you can use to do this kind of assertion in a way where the error message will be useful automatically. Writing this as assertThat(ratio, lessThan(slowSchedulingThreshold * strategyToAcceptedBlockTimeRatios.get(strategyResult.getKey())) gets you an automatic error message similar to what you've written manually here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info. I'll be sure to use assertThat in the future.

}
}

// Create multiple copies of a test topology
private void addTopologyBlockToMap(Map<String, TopologyDetails> topologyMap, String baseName, Config config,
double spoutMemoryLoad, int[] blockIndices) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("testSpout", new TestSpout(), 1).setMemoryLoad(spoutMemoryLoad);
StormTopology stormTopology = builder.createTopology();
Map<ExecutorDetails, String> executorMap = genExecsAndComps(stormTopology);

for (int i = blockIndices[0]; i <= blockIndices[1]; ++i) {
TopologyDetails topo = new TopologyDetails(baseName + i, config, stormTopology, 0, executorMap, 0, "user");
topologyMap.put(topo.getId(), topo);
}
}

/*
* Test time to schedule large cluster scheduling with fragmentation
*/
private TimeBlockResult testLargeClusterSchedulingTiming(int numNodes, Config config) {
// Attempt to schedule multiple copies of 2 different topologies (topo-t0 and topo-t1) in 3 blocks.
// Without fragmentation it is possible to schedule all topologies, but fragmentation causes topologies to not
// schedule for the last block.

// Get start/end indices for blocks
int numTopologyPairs = numNodes;
int increment = (int) Math.floor(numTopologyPairs * 0.1);
int firstBlockIndices[] = {0, increment - 1};
int midBlockIndices[] = {increment, numTopologyPairs - increment - 1};
int lastBlockIndices[] = {numTopologyPairs - increment, numTopologyPairs - 1};

// Memory is the constraining resource.
double t0Mem = 70; // memory required by topo-t0
double t1Mem = 20; // memory required by topo-t1
double nodeMem = 100;

// first block (0% - 10%)
Map<String, TopologyDetails> topologyMap = new HashMap<>();
addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, firstBlockIndices);
addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, firstBlockIndices);
Topologies topologies = new Topologies(topologyMap);

Map<String, SupervisorDetails> supMap = genSupervisors(numNodes, 7, 3500, nodeMem);
Cluster cluster = new Cluster(new INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), supMap, new HashMap<String, SchedulerAssignmentImpl>(), topologies, config);
TimeBlockResult timeBlockResult = new TimeBlockResult();

// schedule first block (0% - 10%)
{
scheduler = new ResourceAwareScheduler();
scheduler.prepare(config);

long time = Time.currentTimeMillis();
scheduler.schedule(topologies, cluster);
timeBlockResult.firstBlockTime.add(Time.currentTimeMillis() - time);
}

// schedule mid block (10% - 90%)
{
addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, midBlockIndices);
addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, midBlockIndices);

topologies = new Topologies(topologyMap);
cluster = new Cluster(cluster, topologies);

scheduler.schedule(topologies, cluster);
}

// schedule last block (90% to 100%)
{
addTopologyBlockToMap(topologyMap, "topo_t0-", config, t0Mem, lastBlockIndices);
addTopologyBlockToMap(topologyMap, "topo_t1-", config, t1Mem, lastBlockIndices);

topologies = new Topologies(topologyMap);
cluster = new Cluster(cluster, topologies);

long time = Time.currentTimeMillis();
scheduler.schedule(topologies, cluster);
timeBlockResult.lastBlockTime.add(Time.currentTimeMillis() - time);
}

return timeBlockResult;
}

/**
* Test multiple spouts and cyclic topologies
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.spout.SpoutOutputCollector;
Expand Down Expand Up @@ -102,6 +103,13 @@ public static Map<String, Map<String, Number>> userResourcePool(TestUserResource
return ret;
}

public static Config createCSSClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
Map<String, Map<String, Number>> pools) {
Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
config.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, ConstraintSolverStrategy.class.getName());
return config;
}

public static Config createGrasClusterConfig(double compPcore, double compOnHeap, double compOffHeap,
Map<String, Map<String, Number>> pools, Map<String, Double> genericResourceMap) {
Config config = createClusterConfig(compPcore, compOnHeap, compOffHeap, pools);
Expand Down