diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 4368099725c..d4c61bdffba 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -88,6 +88,9 @@ topology.max.replication.wait.time.sec: 60 nimbus.credential.renewers.freq.secs: 600 nimbus.queue.size: 100000 scheduler.display.resource: false +nimbus.even.rebalance.idle.supervisor.enabled: false +nimbus.even.rebalance.max.free.per.topology: 0 +nimbus.even.rebalance.idle.supervisor.min.stable.rounds: 3 nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend" nimbus.assignments.service.threads: 10 nimbus.assignments.service.thread.queue.size: 100 diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index aabf7283c0d..97182d1d166 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -175,6 +175,37 @@ public class DaemonConfig implements Validated { @IsBoolean public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource"; + /** + * If true, {@link org.apache.storm.scheduler.EvenScheduler} may move already-assigned workers onto non-blacklisted supervisors + * with no slot in use. This lets a freshly returned supervisor pick up workers instead of staying idle. The number of workers + * freed per topology in a single scheduling round is capped by {@link #NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY}, so even + * distribution is approached gradually rather than rebuilt from scratch. + */ + @IsBoolean + public static final String NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED + = "nimbus.even.rebalance.idle.supervisor.enabled"; + + /** + * Optional upper bound on the number of currently-assigned workers a single topology may release in one scheduling round + * when the idle-supervisor rebalance defined by {@link #NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} kicks in. The + * default budget already targets an even per-supervisor distribution (idle supervisors absorb roughly {@code numWorkers / + * numSupervisors} workers each in one round), capped by the idle side's free slot capacity. Setting this to a positive + * value tightens that budget; setting it to {@code 0} or a negative value leaves the even-distribution budget unbounded. + */ + @IsInteger + public static final String NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY + = "nimbus.even.rebalance.max.free.per.topology"; + + /** + * Minimum number of consecutive supervisor monitor rounds that a fully-idle supervisor must have been alive before + * {@link org.apache.storm.scheduler.EvenScheduler} can relocate workers onto it. A positive value avoids moving workers onto a + * supervisor that has only just returned and may still be flapping. Setting this to {@code 0} or a negative value disables the + * uptime guard. + */ + @IsInteger + public static final String NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS + = "nimbus.even.rebalance.idle.supervisor.min.stable.rounds"; + /** * The directory where storm's health scripts go. */ diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index d59a2527d21..bd209d03881 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -985,11 +985,16 @@ private static Map basicSupervisorDetailsMap(IStormCl String id = entry.getKey(); SupervisorInfo info = entry.getValue(); ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(), - info.get_scheduler_meta(), null, info.get_resources_map())); + info.get_scheduler_meta(), null, info.get_resources_map(), + supervisorUptimeSecs(info))); } return ret; } + private static long supervisorUptimeSecs(SupervisorInfo info) { + return info.is_set_uptime_secs() ? info.get_uptime_secs() : 0L; + } + /** * NOTE: this can return false when a topology has just been activated. The topology may still be * in the STORMS_SUBTREE. @@ -2273,7 +2278,8 @@ private Map readAllSupervisorDetails(Map superDetails = new ArrayList<>(); for (Entry entry : superInfos.entrySet()) { SupervisorInfo info = entry.getValue(); - superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map())); + superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map(), + supervisorUptimeSecs(info))); } // Note that allSlotsAvailableForScheduling // only uses the supervisor-details. The rest of the arguments @@ -2306,7 +2312,7 @@ private Map readAllSupervisorDetails(Map assignedNumWorkers || getUnassignedExecutors(topology).size() > 0; } + /** + * Returns true when there is at least one stable, non-blacklisted supervisor whose slots are all currently free and the + * topology is not already on that supervisor. Controlled by + * {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}; returns false when disabled. The check is + * binary by design -- a supervisor either has zero used slots or it does not -- so this never fires for "almost balanced" + * clusters. Topologies that cannot benefit from a move (e.g. only a single worker assigned) are filtered later by the + * drain-budget computation in {@link EvenScheduler}, which evaluates to zero whenever + * {@code floor(numWorkers / nonBlacklistedSupervisorCount)} is zero. + */ + public boolean hasIdleSupervisorReusableBy(TopologyDetails topology) { + if (!ObjectReader.getBoolean( + conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) { + return false; + } + Set nodesUsedByTopology = new HashSet<>(); + for (WorkerSlot slot : getUsedSlotsByTopologyId(topology.getId())) { + nodesUsedByTopology.add(slot.getNodeId()); + } + for (SupervisorDetails s : supervisors.values()) { + String sid = s.getId(); + if (!isIdleSupervisorAvailableForEvenRebalance(s)) { + continue; + } + if (nodesUsedByTopology.contains(sid)) { + continue; + } + return true; + } + return false; + } + + public boolean isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails supervisor) { + if (supervisor == null) { + return false; + } + if (isBlackListed(supervisor.getId())) { + return false; + } + if (supervisor.getAllPorts().isEmpty()) { + return false; + } + if (!getUsedPorts(supervisor).isEmpty()) { + return false; + } + return hasMinimumIdleSupervisorStability(supervisor); + } + + private boolean hasMinimumIdleSupervisorStability(SupervisorDetails supervisor) { + int minStableRounds = ObjectReader.getInt( + conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS), 3); + if (minStableRounds <= 0) { + return true; + } + int monitorFrequencySecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), 3); + long requiredUptimeSecs = (long) minStableRounds * Math.max(1, monitorFrequencySecs); + return supervisor.getUptimeSecs() >= requiredUptimeSecs; + } + @Override public boolean needsSchedulingRas(TopologyDetails topology) { return getUnassignedExecutors(topology).size() > 0; diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java index 81a0ad8abcf..2ab34914d2a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/DefaultScheduler.java @@ -72,7 +72,11 @@ public static Set slotsCanReassign(Cluster cluster, Set } public static void defaultSchedule(Topologies topologies, Cluster cluster) { + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { + if (topologies.getById(topology.getId()) == null) { + continue; + } List availableSlots = cluster.getAvailableSlots(); Set allExecutors = topology.getExecutors(); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java index ccc2e34d4bb..d1cabb5ccda 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/EvenScheduler.java @@ -18,18 +18,23 @@ package org.apache.storm.scheduler; +import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.storm.DaemonConfig; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; import org.apache.storm.shade.com.google.common.collect.Sets; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; @@ -101,6 +106,151 @@ public static Map> getAliveAssignedWorkerSlotE return Utils.reverseMap(executorToSlot); } + /** + * Round-robin relocation of currently-assigned workers onto fully-idle supervisors. Each round-robin iteration moves + * at most one worker per topology, so multiple topologies share the idle slots and a single returning supervisor ends + * up hosting workers from several topologies — preserving the per-supervisor workload diversity that a fresh + * cluster has after submission. + * + *

Per-topology cap in one scheduling round is + * {@code idleSupervisorCount * floor(numWorkers / nonBlacklistedSupervisorCount)}, further tightened by + * {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY} when set to a positive value. Topologies whose + * computed cap is zero (typically {@code numWorkers < numSupervisors}) are skipped entirely. The trigger remains + * binary — only fires when at least one supervisor has zero used slots — so a near-balanced cluster sees no + * movement. + * + *

Workers are always pulled from the supervisor where this topology has the most workers, and only when that + * supervisor would still hold at least one worker afterward. Each pulled worker's executors are placed directly + * onto an idle slot, so the subsequent sortSlots / interleave pass cannot drop them back into the just-vacated + * slots. Ties between equally loaded source supervisors are resolved by supervisor id, lexicographically. + * + *

Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the + * method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost. + */ + @VisibleForTesting + static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) { + if (!ObjectReader.getBoolean( + cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) { + return; + } + int nonBlacklistedSupervisorCount = 0; + int idleSupervisorCount = 0; + Deque idleTargets = new ArrayDeque<>(); + List supervisors = new ArrayList<>(cluster.getSupervisors().values()); + supervisors.sort(Comparator.comparing(SupervisorDetails::getId)); + for (SupervisorDetails s : supervisors) { + if (cluster.isBlackListed(s.getId())) { + continue; + } + if (s.getAllPorts().isEmpty()) { + continue; + } + nonBlacklistedSupervisorCount++; + if (cluster.isIdleSupervisorAvailableForEvenRebalance(s)) { + idleSupervisorCount++; + List ports = new ArrayList<>(s.getAllPorts()); + Collections.sort(ports); + for (Integer port : ports) { + idleTargets.add(new WorkerSlot(s.getId(), port)); + } + } + } + if (idleTargets.isEmpty() || nonBlacklistedSupervisorCount == 0 || idleSupervisorCount == 0) { + return; + } + + int maxFree = ObjectReader.getInt( + cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY), 0); + + List orderedTopos = new ArrayList<>(); + Map remainingBudget = new HashMap<>(); + for (TopologyDetails topo : topologies.getTopologies()) { + if (!cluster.hasIdleSupervisorReusableBy(topo)) { + continue; + } + int target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCount; + if (target <= 0) { + continue; + } + if (maxFree > 0) { + target = Math.min(target, maxFree); + } + orderedTopos.add(topo); + remainingBudget.put(topo.getId(), target); + } + if (orderedTopos.isEmpty()) { + return; + } + orderedTopos.sort(Comparator.comparing(TopologyDetails::getId)); + + int totalRelocated = 0; + while (!idleTargets.isEmpty()) { + boolean movedThisIteration = false; + for (TopologyDetails topo : orderedTopos) { + if (idleTargets.isEmpty()) { + break; + } + int remaining = remainingBudget.getOrDefault(topo.getId(), 0); + if (remaining <= 0) { + continue; + } + if (relocateOneWorkerOntoIdleSlot(topo, cluster, idleTargets)) { + remainingBudget.put(topo.getId(), remaining - 1); + totalRelocated++; + movedThisIteration = true; + } else { + remainingBudget.put(topo.getId(), 0); + } + } + if (!movedThisIteration) { + break; + } + } + if (totalRelocated > 0) { + LOG.info("EvenScheduler: relocated {} worker(s) onto idle supervisor(s) round-robin across {} topologies.", + totalRelocated, orderedTopos.size()); + } + } + + /** + * Pulls a single worker from the supervisor where {@code topology} currently has the most workers and reassigns its + * executors onto the next idle slot from {@code idleTargets}. Returns false (without consuming an idle target) if + * the topology has no eligible source supervisor — namely all of its supervisors host at most one of its workers, + * which would otherwise drain that supervisor to zero and turn it into the next round's idle. + */ + private static boolean relocateOneWorkerOntoIdleSlot(TopologyDetails topology, Cluster cluster, + Deque idleTargets) { + Map> slotToExecutors = + getAliveAssignedWorkerSlotExecutors(cluster, topology.getId()); + Map> nodeToSlots = new HashMap<>(); + for (WorkerSlot slot : slotToExecutors.keySet()) { + nodeToSlots.computeIfAbsent(slot.getNodeId(), k -> new ArrayList<>()).add(slot); + } + List>> candidates = new ArrayList<>(nodeToSlots.entrySet()); + candidates.removeIf(e -> e.getValue().size() < 2); + candidates.sort(Comparator + .>>comparingInt(e -> e.getValue().size()) + .reversed() + .thenComparing(Map.Entry::getKey)); + if (candidates.isEmpty()) { + return false; + } + List slots = candidates.get(0).getValue(); + slots.sort(Comparator.comparingInt(WorkerSlot::getPort)); + WorkerSlot victim = slots.get(slots.size() - 1); + Collection execs = slotToExecutors.get(victim); + if (execs == null || execs.isEmpty()) { + return false; + } + if (idleTargets.isEmpty()) { + return false; + } + WorkerSlot target = idleTargets.poll(); + cluster.freeSlot(victim); + cluster.assign(target, topology.getId(), execs); + return true; + } + private static Map scheduleTopology(TopologyDetails topology, Cluster cluster) { List availableSlots = cluster.getAvailableSlots(); Set allExecutors = topology.getExecutors(); @@ -148,7 +298,11 @@ public int compare(ExecutorDetails o1, ExecutorDetails o2) { } public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) { + redistributeOntoIdleSupervisors(topologies, cluster); for (TopologyDetails topology : cluster.needsSchedulingTopologies()) { + if (topologies.getById(topology.getId()) == null) { + continue; + } String topologyId = topology.getId(); Map newAssignment = scheduleTopology(topology, cluster); Map> nodePortToExecutors = Utils.reverseMap(newAssignment); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java index 188627353dd..509ff0f7e76 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/SupervisorDetails.java @@ -46,6 +46,7 @@ public class SupervisorDetails { * all the ports of the supervisor. */ private Set allPorts; + private final long uptimeSecs; /** * Create the details of a new supervisor. @@ -59,12 +60,18 @@ public class SupervisorDetails { */ public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta, Collection allPorts, Map totalResources) { + this(id, serverPort, host, meta, schedulerMeta, allPorts, totalResources, Long.MAX_VALUE); + } + + public SupervisorDetails(String id, Integer serverPort, String host, Object meta, Object schedulerMeta, + Collection allPorts, Map totalResources, long uptimeSecs) { this.id = id; this.serverPort = serverPort; this.host = host; this.meta = meta; this.schedulerMeta = schedulerMeta; + this.uptimeSecs = uptimeSecs; if (allPorts != null) { setAllPorts(allPorts); } else { @@ -82,6 +89,10 @@ public SupervisorDetails(String id, Object meta, Map totalResour this(id, null, null, meta, null, null, totalResources); } + public SupervisorDetails(String id, Object meta, Map totalResources, long uptimeSecs) { + this(id, null, null, meta, null, null, totalResources, uptimeSecs); + } + public SupervisorDetails(String id, Object meta, Collection allPorts) { this(id, null, null, meta, null, allPorts, null); } @@ -95,11 +106,21 @@ public SupervisorDetails(String id, String host, Object schedulerMeta, this(id, null, host, null, schedulerMeta, allPorts, totalResources); } + public SupervisorDetails(String id, String host, Object schedulerMeta, + Collection allPorts, Map totalResources, long uptimeSecs) { + this(id, null, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs); + } + public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta, Collection allPorts, Map totalResources) { this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources); } + public SupervisorDetails(String id, int serverPort, String host, Object schedulerMeta, + Collection allPorts, Map totalResources, long uptimeSecs) { + this(id, serverPort, host, null, schedulerMeta, allPorts, totalResources, uptimeSecs); + } + @Override public String toString() { return getClass().getSimpleName() + " ID: " + id + " HOST: " + host + " META: " + meta @@ -126,6 +147,10 @@ public Set getAllPorts() { return allPorts; } + public long getUptimeSecs() { + return uptimeSecs; + } + private void setAllPorts(Collection allPorts) { this.allPorts = new HashSet<>(); if (allPorts != null) { diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java new file mode 100644 index 00000000000..b15a8335502 --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/scheduler/TestEvenSchedulerIdleSupervisor.java @@ -0,0 +1,624 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.DaemonConfig; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.blacklist.TestUtilsForBlacklistScheduler; +import org.apache.storm.scheduler.resource.normalization.ResourceMetrics; +import org.apache.storm.topology.TopologyBuilder; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for the idle-supervisor rebalance behavior added to {@link Cluster#hasIdleSupervisorReusableBy(TopologyDetails)} + * and {@link EvenScheduler#redistributeOntoIdleSupervisors(Topologies, Cluster)}. + * + *

Trigger condition is binary: at least one non-blacklisted supervisor with zero used slots must exist. The cluster + * being "almost balanced" never triggers the new logic, so a near-even distribution is preserved as-is. Each round only + * frees up to {@code nimbus.even.rebalance.max.free.per.topology} workers and never drains a supervisor down to zero. + */ +public class TestEvenSchedulerIdleSupervisor { + + private static final String TOPO_ID = "topo-1"; + + /** + * supA and supB host the topology; supC is freshly returned and idle. Topology has 2 workers on supA and 1 on supB. + */ + private Cluster buildClusterWithIdleSupervisor(boolean enableRebalance, int maxFreePerTopology) { + return buildClusterWithIdleSupervisor(TestUtilsForBlacklistScheduler.genSupervisors(3, 4), + evenRebalanceConf(enableRebalance, maxFreePerTopology)); + } + + private Cluster buildClusterWithIdleSupervisor(Map supMap, Map conf) { + // Build a topology and assign 3 workers: two on sup-0 and one on sup-1. sup-2 stays idle. + TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3); + + WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); + WorkerSlot s0p1 = new WorkerSlot("sup-0", 1); + WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); + + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + // Distribute the executors round-robin onto the three slots so each slot has at least one. + Map execToSlot = new HashMap<>(); + WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0}; + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); + } + SchedulerAssignmentImpl assignment = new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null); + + Map assignments = new HashMap<>(); + assignments.put(TOPO_ID, assignment); + + Map topoMap = new HashMap<>(); + topoMap.put(TOPO_ID, topology); + Topologies topologies = new Topologies(topoMap); + + return newCluster(supMap, assignments, topologies, conf); + } + + private Map evenRebalanceConf(boolean enableRebalance, int maxFreePerTopology) { + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, enableRebalance); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, maxFreePerTopology); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 0); + conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); + return conf; + } + + private Map genSupervisorsWithUptime(int numSup, int numPorts, long uptimeSecs) { + Map supMap = new HashMap<>(); + for (int i = 0; i < numSup; i++) { + SupervisorDetails sup = supervisor("sup-" + i, "host-" + i, numPorts, uptimeSecs); + supMap.put(sup.getId(), sup); + } + return supMap; + } + + private SupervisorDetails supervisor(String id, String host, int numPorts, long uptimeSecs) { + List ports = new LinkedList<>(); + for (int port = 0; port < numPorts; port++) { + ports.add(port); + } + return new SupervisorDetails(id, host, null, ports, null, uptimeSecs); + } + + private Cluster newCluster(Map supMap, + Map assignments, + Topologies topologies, + Map conf) { + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + return new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, supMap, + assignments, topologies, conf); + } + + private TopologyDetails makeTopologyDetails(String id, int numWorkers, int parallelism) { + Config conf = new Config(); + conf.put(Config.TOPOLOGY_NAME, id); + conf.put(Config.TOPOLOGY_WORKERS, numWorkers); + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout-0", new TestUtilsForBlacklistScheduler.TestSpout(), parallelism); + builder.setBolt("bolt-0", new TestUtilsForBlacklistScheduler.TestBolt(), parallelism).shuffleGrouping("spout-0"); + StormTopology stormTopology = builder.createTopology(); + + Map execsAndComps = TestUtilsForBlacklistScheduler.genExecsAndComps( + stormTopology, parallelism, parallelism); + return new TopologyDetails(id, conf, stormTopology, numWorkers, execsAndComps, 0, "user"); + } + + private TopologyDetails makeTopologyDetails(String id, int numWorkers) { + return makeTopologyDetails(id, numWorkers, 3); + } + + private TopologyDetails firstTopology(Cluster cluster) { + return cluster.getTopologies().getById(TOPO_ID); + } + + private int usedSlotCount(Cluster cluster, String supervisorId) { + SupervisorDetails s = cluster.getSupervisorById(supervisorId); + return cluster.getUsedPorts(s).size(); + } + + @Test + public void disabledByDefault_doesNotTrigger() { + Cluster cluster = buildClusterWithIdleSupervisor(false, 1); + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + "disabled flag must short-circuit the trigger even when an idle supervisor exists"); + assertFalse(cluster.needsScheduling(firstTopology(cluster)), + "needsScheduling must remain false when the new behavior is disabled and the topology is fully assigned"); + } + + @Test + public void enabledWithIdleSupervisor_doesNotChangeGenericNeedsScheduling() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + assertTrue(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); + assertFalse(cluster.needsScheduling(firstTopology(cluster)), + "needsScheduling is used by schedulers other than EvenScheduler; the idle trigger stays out of that generic path"); + assertFalse(cluster.needsSchedulingRas(firstTopology(cluster)), + "ResourceAwareScheduler keeps using needsSchedulingRas, so this opt-in EvenScheduler feature is out of RAS scope"); + } + + @Test + public void noIdleSupervisor_doesNotTrigger() { + // Two supervisors, both serving the topology -> no idle supervisor present. + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(2, 4); + TopologyDetails topology = makeTopologyDetails(TOPO_ID, 3); + + WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); + WorkerSlot s0p1 = new WorkerSlot("sup-0", 1); + WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); + + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map execToSlot = new HashMap<>(); + WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s0p1, s1p0}; + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); + } + Map assignments = new HashMap<>(); + assignments.put(TOPO_ID, new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(TOPO_ID, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 1); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); + assertFalse(cluster.needsScheduling(firstTopology(cluster))); + } + + @Test + public void redistributeRelocatesAtMostMaxFreeWorkersPerTopology() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + + EvenScheduler.redistributeOntoIdleSupervisors(cluster.getTopologies(), cluster); + + // max-free=1 caps the topology to a single relocation; pulled from the most-loaded supervisor (sup-0) + // and placed directly onto the idle supervisor. + assertEquals(1, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(1, usedSlotCount(cluster, "sup-2")); + assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); + } + + @Test + public void redistributeNeverDrainsSupervisorToZero() { + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + TopologyDetails topology = makeTopologyDetails(TOPO_ID, 2); + + WorkerSlot s0p0 = new WorkerSlot("sup-0", 0); + WorkerSlot s1p0 = new WorkerSlot("sup-1", 0); + + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map execToSlot = new HashMap<>(); + WorkerSlot[] slotRing = new WorkerSlot[]{s0p0, s1p0}; + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slotRing[i % slotRing.length]); + } + Map assignments = new HashMap<>(); + assignments.put(TOPO_ID, new SchedulerAssignmentImpl(TOPO_ID, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(TOPO_ID, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 5); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); + + // floor(2/3)=0 → topology gets a budget of 0 and is skipped entirely. No source supervisor is drained. + assertEquals(1, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + @Test + public void scheduleTopologiesEvenly_movesOneWorkerToIdleSupervisor() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + + Topologies topologies = cluster.getTopologies(); + EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); + + // After scheduling: idle supervisor (sup-2) should now host exactly 1 worker. + assertEquals(1, usedSlotCount(cluster, "sup-2")); + // Total worker count is preserved (3) and respects the topology's numWorkers. + int total = usedSlotCount(cluster, "sup-0") + + usedSlotCount(cluster, "sup-1") + + usedSlotCount(cluster, "sup-2"); + assertEquals(3, total); + } + + /** + * Single-worker topology + idle supervisors must produce no movement: {@code floor(1 / N) = 0} for any N >= 2, so the + * drain budget evaluates to zero regardless of how many idle supervisors exist. Without this guard a single-worker + * topology would ping-pong between supervisors every monitor cycle. + */ + @Test + public void singleWorkerTopology_doesNotMoveDespiteIdleSupervisors() { + String topoId = "topo-single"; + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + TopologyDetails topology = makeTopologyDetails(topoId, 1, 1); + + WorkerSlot s0 = new WorkerSlot("sup-0", 0); + Map execToSlot = new HashMap<>(); + for (ExecutorDetails e : topology.getExecutors()) { + execToSlot.put(e, s0); + } + Map assignments = new HashMap<>(); + assignments.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(topoId, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); + + assertEquals(1, usedSlotCount(cluster, "sup-0")); + assertEquals(0, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + /** + * 8-worker topology starts at distribution (4, 4, 0). With max-free unbounded the budget targets + * floor(numWorkers / numSupervisors) = 2 workers for the idle supervisor, and the round ends at (3, 3, 2) + * — fully even — without disturbing topologies on the next round (no supervisor is idle anymore). + */ + @Test + public void evenDistributionInOneRound_unboundedMaxFree() { + String topoId = "topo-even"; + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + TopologyDetails topology = makeTopologyDetails(topoId, 8, 4); + + WorkerSlot[] slots = new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3), + }; + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map execToSlot = new HashMap<>(); + for (int i = 0; i < execs.size(); i++) { + execToSlot.put(execs.get(i), slots[i]); + } + Map assignments = new HashMap<>(); + assignments.put(topoId, new SchedulerAssignmentImpl(topoId, execToSlot, null, null)); + + Map topoMap = new HashMap<>(); + topoMap.put(topoId, topology); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + assertEquals(4, usedSlotCount(cluster, "sup-0")); + assertEquals(4, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + + EvenScheduler.scheduleTopologiesEvenly(topologies, cluster); + + // Idle supervisor absorbs exactly floor(8/3) = 2 workers in one round; total worker count is preserved. + assertEquals(2, usedSlotCount(cluster, "sup-2")); + assertEquals(8, usedSlotCount(cluster, "sup-0") + + usedSlotCount(cluster, "sup-1") + + usedSlotCount(cluster, "sup-2")); + // No supervisor is idle anymore — the trigger will not refire on the next round. + assertFalse(cluster.hasIdleSupervisorReusableBy(cluster.getTopologies().getById(topoId))); + } + + /** + * Two equally-sized topologies share the same returning supervisor round-robin: each contributes one worker, so + * sup-2 ends up hosting workers from both — restoring per-supervisor workload diversity the way a fresh submission + * would. + */ + @Test + public void multipleTopologies_shareIdleSlotsRoundRobin() { + Map supMap = TestUtilsForBlacklistScheduler.genSupervisors(3, 4); + + TopologyDetails topoA = makeTopologyDetails("topo-A", 4, 2); + TopologyDetails topoB = makeTopologyDetails("topo-B", 4, 2); + + Map assignments = new HashMap<>(); + assignments.put("topo-A", buildAssignment(topoA, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + assignments.put("topo-B", buildAssignment(topoB, new WorkerSlot[]{ + new WorkerSlot("sup-0", 2), new WorkerSlot("sup-0", 3), + new WorkerSlot("sup-1", 2), new WorkerSlot("sup-1", 3), + })); + + Map topoMap = new HashMap<>(); + topoMap.put("topo-A", topoA); + topoMap.put("topo-B", topoB); + Topologies topologies = new Topologies(topoMap); + + Map conf = new HashMap<>(); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED, true); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY, 0); + StormMetricsRegistry metricsRegistry = new StormMetricsRegistry(); + ResourceMetrics resourceMetrics = new ResourceMetrics(metricsRegistry); + Cluster cluster = new Cluster(new TestUtilsForBlacklistScheduler.INimbusTest(), resourceMetrics, + supMap, assignments, topologies, conf); + + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); + + // floor(4/3)=1 per topology, two topologies → sup-2 hosts 1 worker from each, in round-robin order. + assertEquals(2, usedSlotCount(cluster, "sup-2")); + assertEquals(1, supervisorWorkerCount(cluster, "topo-A", "sup-2")); + assertEquals(1, supervisorWorkerCount(cluster, "topo-B", "sup-2")); + // Each topology kept its total worker count; only one host moved. + assertEquals(4, cluster.getAssignedNumWorkers(topoA)); + assertEquals(4, cluster.getAssignedNumWorkers(topoB)); + } + + @Test + public void idleSupervisorYoungerThanStableRoundsDoesNotMoveWorkers() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + supMap.put("sup-2", supervisor("sup-2", "host-2", 4, 8)); + + Map conf = evenRebalanceConf(true, 1); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3); + conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); + Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf); + + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + "3 stable rounds at a 3 second monitor frequency require at least 9 seconds of supervisor uptime"); + + EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); + + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + @Test + public void idleSupervisorAtStableRoundThresholdCanReceiveWorker() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + supMap.put("sup-2", supervisor("sup-2", "host-2", 4, 9)); + + Map conf = evenRebalanceConf(true, 1); + conf.put(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS, 3); + conf.put(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS, 3); + Cluster cluster = buildClusterWithIdleSupervisor(supMap, conf); + + assertTrue(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster))); + + EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); + + assertEquals(1, usedSlotCount(cluster, "sup-2")); + assertEquals(3, cluster.getAssignedNumWorkers(firstTopology(cluster))); + } + + @Test + public void donorTieBreaksBySupervisorIdWhenWorkerCountsTie() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + TopologyDetails topology = makeTopologyDetails("topo-tie", 4, 4); + + Map assignments = new HashMap<>(); + assignments.put("topo-tie", buildAssignment(topology, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + + Map topoMap = new HashMap<>(); + topoMap.put("topo-tie", topology); + Topologies topologies = new Topologies(topoMap); + Cluster cluster = newCluster(supMap, assignments, topologies, evenRebalanceConf(true, 1)); + + EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster); + + assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-0"), + "sup-0 and sup-1 started with two workers each; lexicographic tie-break chooses sup-0 as donor"); + assertEquals(2, supervisorWorkerCount(cluster, "topo-tie", "sup-1")); + assertEquals(1, supervisorWorkerCount(cluster, "topo-tie", "sup-2")); + } + + @Test + public void blacklistedIdleSupervisorIsNotReusableTarget() { + Cluster cluster = buildClusterWithIdleSupervisor(true, 1); + cluster.blacklistHost("host-2"); + + assertFalse(cluster.hasIdleSupervisorReusableBy(firstTopology(cluster)), + "IsolationScheduler represents reserved hosts by blacklisting them before delegating to DefaultScheduler"); + + EvenScheduler.scheduleTopologiesEvenly(cluster.getTopologies(), cluster); + + assertEquals(2, usedSlotCount(cluster, "sup-0")); + assertEquals(1, usedSlotCount(cluster, "sup-1")); + assertEquals(0, usedSlotCount(cluster, "sup-2")); + } + + @Test + public void defaultSchedulerIdleRebalanceHonorsLeftoverTopologySubset() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + TopologyDetails isolated = makeTopologyDetails("topo-isolated", 2, 2); + TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3); + + Map assignments = new HashMap<>(); + assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + })); + assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{ + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(isolated.getId(), isolated); + topoMap.put(regular.getId(), regular); + Topologies allTopologies = new Topologies(topoMap); + Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0)); + + cluster.blacklistHost("host-0"); + DefaultScheduler.defaultSchedule(new Topologies(regular), cluster); + + assertEquals(2, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"), + "the isolated topology is not in the leftover topology set, so DefaultScheduler must not move it"); + assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2")); + assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2")); + } + + @Test + public void isolationSchedulerOnlyRelocatesLeftoverTopologyOntoNonIsolatedIdleSupervisor() { + Map supMap = genSupervisorsWithUptime(3, 4, 100); + TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1); + TopologyDetails regular = makeTopologyDetails("topo-regular", 3, 3); + + Map assignments = new HashMap<>(); + assignments.put(isolated.getId(), buildAssignment(isolated, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), + })); + assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{ + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), new WorkerSlot("sup-1", 2), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(isolated.getId(), isolated); + topoMap.put(regular.getId(), regular); + Topologies topologies = new Topologies(topoMap); + + Map conf = evenRebalanceConf(true, 0); + conf.put(DaemonConfig.ISOLATION_SCHEDULER_MACHINES, Collections.singletonMap(isolated.getName(), 1)); + Cluster cluster = newCluster(supMap, assignments, topologies, conf); + + IsolationScheduler scheduler = new IsolationScheduler(); + scheduler.prepare(conf, new StormMetricsRegistry()); + scheduler.schedule(topologies, cluster); + + assertEquals(1, supervisorWorkerCount(cluster, isolated.getId(), "sup-0"), + "the already-isolated topology remains on its isolated host and is not selected as a donor"); + assertEquals(0, supervisorWorkerCount(cluster, isolated.getId(), "sup-2")); + assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-2"), + "only the leftover regular topology is allowed to move onto the non-isolated idle supervisor"); + } + + /** + * IsolationScheduler reserves a host by blacklisting it before delegating the remaining (non-isolated) topologies to + * {@link DefaultScheduler#defaultSchedule(Topologies, Cluster)}. Here the isolated topology is down -- it has no + * assigned workers at all -- yet its reserved host (sup-2) must not be treated as an idle relocation target even + * though it has zero used slots. The leftover regular topology is rebalanced onto the genuinely idle, non-reserved + * sup-3 and never onto the reserved sup-2. + */ + @Test + public void reservedHostForDownIsolatedTopologyIsNotTreatedAsIdle() { + Map supMap = genSupervisorsWithUptime(4, 4, 100); + TopologyDetails isolated = makeTopologyDetails("topo-isolated", 1, 1); + TopologyDetails regular = makeTopologyDetails("topo-regular", 4, 4); + + Map assignments = new HashMap<>(); + // The isolated topology is down: it has no assignment at all. + assignments.put(regular.getId(), buildAssignment(regular, new WorkerSlot[]{ + new WorkerSlot("sup-0", 0), new WorkerSlot("sup-0", 1), + new WorkerSlot("sup-1", 0), new WorkerSlot("sup-1", 1), + })); + + Map topoMap = new HashMap<>(); + topoMap.put(isolated.getId(), isolated); + topoMap.put(regular.getId(), regular); + Topologies allTopologies = new Topologies(topoMap); + Cluster cluster = newCluster(supMap, assignments, allTopologies, evenRebalanceConf(true, 0)); + + // sup-2 is reserved for the (down) isolated topology -- IsolationScheduler represents this by blacklisting it. + cluster.blacklistHost("host-2"); + + assertFalse(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-2")), + "a blacklisted reserved host is never an even-rebalance target, even with zero used slots"); + assertTrue(cluster.isIdleSupervisorAvailableForEvenRebalance(cluster.getSupervisorById("sup-3")), + "the non-reserved idle supervisor is available"); + + // IsolationScheduler delegates the leftover (non-isolated) topologies to DefaultScheduler with the reserved + // host already blacklisted. + DefaultScheduler.defaultSchedule(new Topologies(regular), cluster); + + assertEquals(0, usedSlotCount(cluster, "sup-2"), + "the reserved host stays idle: the down isolated topology's machine is not repopulated by rebalance"); + assertEquals(0, supervisorWorkerCount(cluster, regular.getId(), "sup-2")); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-3"), + "the leftover regular topology rebalances onto the genuinely idle, non-reserved supervisor"); + assertEquals(1, supervisorWorkerCount(cluster, regular.getId(), "sup-0")); + assertEquals(2, supervisorWorkerCount(cluster, regular.getId(), "sup-1")); + assertEquals(0, cluster.getUsedSlotsByTopologyId(isolated.getId()).size(), + "the isolated topology is down and is never scheduled by the leftover path"); + } + + private SchedulerAssignmentImpl buildAssignment(TopologyDetails topology, WorkerSlot[] slots) { + List execs = new LinkedList<>(topology.getExecutors()); + Collections.sort(execs, (a, b) -> Integer.compare(a.getStartTask(), b.getStartTask())); + Map map = new HashMap<>(); + for (int i = 0; i < execs.size(); i++) { + map.put(execs.get(i), slots[i % slots.length]); + } + return new SchedulerAssignmentImpl(topology.getId(), map, null, null); + } + + private int supervisorWorkerCount(Cluster cluster, String topologyId, String supervisorId) { + int count = 0; + for (WorkerSlot slot : cluster.getUsedSlotsByTopologyId(topologyId)) { + if (slot.getNodeId().equals(supervisorId)) { + count++; + } + } + return count; + } +}