Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ topology.max.replication.wait.time.sec: 60
nimbus.credential.renewers.freq.secs: 600
nimbus.queue.size: 100000
scheduler.display.resource: false
nimbus.even.rebalance.idle.supervisor.enabled: false
nimbus.even.rebalance.max.free.per.topology: 0
nimbus.even.rebalance.idle.supervisor.min.stable.rounds: 3
nimbus.local.assignments.backend.class: "org.apache.storm.assignments.InMemoryAssignmentBackend"
nimbus.assignments.service.threads: 10
nimbus.assignments.service.thread.queue.size: 100
Expand Down
31 changes: 31 additions & 0 deletions storm-server/src/main/java/org/apache/storm/DaemonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,37 @@ public class DaemonConfig implements Validated {
@IsBoolean
public static final String SCHEDULER_DISPLAY_RESOURCE = "scheduler.display.resource";

/**
* If true, {@link org.apache.storm.scheduler.EvenScheduler} may move already-assigned workers onto non-blacklisted supervisors
* with no slot in use. This lets a freshly returned supervisor pick up workers instead of staying idle. The number of workers
* freed per topology in a single scheduling round is capped by {@link #NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY}, so even
* distribution is approached gradually rather than rebuilt from scratch.
*/
@IsBoolean
public static final String NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED
= "nimbus.even.rebalance.idle.supervisor.enabled";

/**
* Optional upper bound on the number of currently-assigned workers a single topology may release in one scheduling round
* when the idle-supervisor rebalance defined by {@link #NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED} kicks in. The
* default budget already targets an even per-supervisor distribution (idle supervisors absorb roughly {@code numWorkers /
* numSupervisors} workers each in one round), capped by the idle side's free slot capacity. Setting this to a positive
* value tightens that budget; setting it to {@code 0} or a negative value leaves the even-distribution budget unbounded.
*/
@IsInteger
public static final String NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY
= "nimbus.even.rebalance.max.free.per.topology";

/**
* Minimum number of consecutive supervisor monitor rounds that a fully-idle supervisor must have been alive before
* {@link org.apache.storm.scheduler.EvenScheduler} can relocate workers onto it. A positive value avoids moving workers onto a
* supervisor that has only just returned and may still be flapping. Setting this to {@code 0} or a negative value disables the
* uptime guard.
*/
@IsInteger
public static final String NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS
= "nimbus.even.rebalance.idle.supervisor.min.stable.rounds";

/**
* The directory where storm's health scripts go.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,16 @@ private static Map<String, SupervisorDetails> basicSupervisorDetailsMap(IStormCl
String id = entry.getKey();
SupervisorInfo info = entry.getValue();
ret.put(id, new SupervisorDetails(id, info.get_server_port(), info.get_hostname(),
info.get_scheduler_meta(), null, info.get_resources_map()));
info.get_scheduler_meta(), null, info.get_resources_map(),
supervisorUptimeSecs(info)));
}
return ret;
}

private static long supervisorUptimeSecs(SupervisorInfo info) {
return info.is_set_uptime_secs() ? info.get_uptime_secs() : 0L;
}

/**
* NOTE: this can return false when a topology has just been activated. The topology may still be
* in the STORMS_SUBTREE.
Expand Down Expand Up @@ -2273,7 +2278,8 @@ private Map<String, SupervisorDetails> readAllSupervisorDetails(Map<String, Set<
List<SupervisorDetails> superDetails = new ArrayList<>();
for (Entry<String, SupervisorInfo> entry : superInfos.entrySet()) {
SupervisorInfo info = entry.getValue();
superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map()));
superDetails.add(new SupervisorDetails(entry.getKey(), info.get_meta(), info.get_resources_map(),
supervisorUptimeSecs(info)));
}
// Note that allSlotsAvailableForScheduling
// only uses the supervisor-details. The rest of the arguments
Expand Down Expand Up @@ -2306,7 +2312,7 @@ private Map<String, SupervisorDetails> readAllSupervisorDetails(Map<String, Set<
allPorts.removeAll(deadPorts);
}
ret.put(superId, new SupervisorDetails(superId, hostname, info.get_scheduler_meta(),
allPorts, info.get_resources_map()));
allPorts, info.get_resources_map(), supervisorUptimeSecs(info)));
}
return ret;
}
Expand Down Expand Up @@ -5526,4 +5532,3 @@ public void run() {
}
}
}

58 changes: 58 additions & 0 deletions storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,64 @@ public boolean needsScheduling(TopologyDetails topology) {
return desiredNumWorkers > assignedNumWorkers || getUnassignedExecutors(topology).size() > 0;
}

/**
* Returns true when there is at least one stable, non-blacklisted supervisor whose slots are all currently free and the
* topology is not already on that supervisor. Controlled by
* {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}; returns false when disabled. The check is
* binary by design -- a supervisor either has zero used slots or it does not -- so this never fires for "almost balanced"
* clusters. Topologies that cannot benefit from a move (e.g. only a single worker assigned) are filtered later by the
* drain-budget computation in {@link EvenScheduler}, which evaluates to zero whenever
* {@code floor(numWorkers / nonBlacklistedSupervisorCount)} is zero.
*/
public boolean hasIdleSupervisorReusableBy(TopologyDetails topology) {
if (!ObjectReader.getBoolean(
conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) {
return false;
}
Set<String> nodesUsedByTopology = new HashSet<>();
for (WorkerSlot slot : getUsedSlotsByTopologyId(topology.getId())) {
nodesUsedByTopology.add(slot.getNodeId());
}
for (SupervisorDetails s : supervisors.values()) {
String sid = s.getId();
if (!isIdleSupervisorAvailableForEvenRebalance(s)) {
continue;
}
if (nodesUsedByTopology.contains(sid)) {
continue;
}
return true;
}
return false;
}

public boolean isIdleSupervisorAvailableForEvenRebalance(SupervisorDetails supervisor) {
if (supervisor == null) {
return false;
}
if (isBlackListed(supervisor.getId())) {
return false;
}
if (supervisor.getAllPorts().isEmpty()) {
return false;
}
if (!getUsedPorts(supervisor).isEmpty()) {
return false;
}
return hasMinimumIdleSupervisorStability(supervisor);
}

private boolean hasMinimumIdleSupervisorStability(SupervisorDetails supervisor) {
int minStableRounds = ObjectReader.getInt(
conf.get(DaemonConfig.NIMBUS_EVEN_REBALANCE_IDLE_SUPERVISOR_MIN_STABLE_ROUNDS), 3);
if (minStableRounds <= 0) {
return true;
}
int monitorFrequencySecs = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS), 3);
long requiredUptimeSecs = (long) minStableRounds * Math.max(1, monitorFrequencySecs);
return supervisor.getUptimeSecs() >= requiredUptimeSecs;
}

@Override
public boolean needsSchedulingRas(TopologyDetails topology) {
return getUnassignedExecutors(topology).size() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ public static Set<WorkerSlot> slotsCanReassign(Cluster cluster, Set<WorkerSlot>
}

public static void defaultSchedule(Topologies topologies, Cluster cluster) {
EvenScheduler.redistributeOntoIdleSupervisors(topologies, cluster);
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
if (topologies.getById(topology.getId()) == null) {
continue;
}
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,23 @@

package org.apache.storm.scheduler;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.storm.DaemonConfig;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,6 +106,151 @@ public static Map<WorkerSlot, List<ExecutorDetails>> getAliveAssignedWorkerSlotE
return Utils.reverseMap(executorToSlot);
}

/**
* Round-robin relocation of currently-assigned workers onto fully-idle supervisors. Each round-robin iteration moves
* at most one worker per topology, so multiple topologies share the idle slots and a single returning supervisor ends
* up hosting workers from several topologies — preserving the per-supervisor workload diversity that a fresh
* cluster has after submission.
*
* <p>Per-topology cap in one scheduling round is
* {@code idleSupervisorCount * floor(numWorkers / nonBlacklistedSupervisorCount)}, further tightened by
* {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY} when set to a positive value. Topologies whose
* computed cap is zero (typically {@code numWorkers < numSupervisors}) are skipped entirely. The trigger remains
* binary — only fires when at least one supervisor has zero used slots — so a near-balanced cluster sees no
* movement.
*
* <p>Workers are always pulled from the supervisor where this topology has the most workers, and only when that
* supervisor would still hold at least one worker afterward. Each pulled worker's executors are placed directly
* onto an idle slot, so the subsequent sortSlots / interleave pass cannot drop them back into the just-vacated
* slots. Ties between equally loaded source supervisors are resolved by supervisor id, lexicographically.
*
* <p>Gated by {@link DaemonConfig#NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED}: when disabled (the default) the
* method returns before scanning any supervisor, so a cluster that has not opted in pays no per-scheduling-round cost.
*/
@VisibleForTesting
static void redistributeOntoIdleSupervisors(Topologies topologies, Cluster cluster) {
if (!ObjectReader.getBoolean(
cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_ON_IDLE_SUPERVISOR_ENABLED), false)) {
return;
}
int nonBlacklistedSupervisorCount = 0;
int idleSupervisorCount = 0;
Deque<WorkerSlot> idleTargets = new ArrayDeque<>();
List<SupervisorDetails> supervisors = new ArrayList<>(cluster.getSupervisors().values());
supervisors.sort(Comparator.comparing(SupervisorDetails::getId));
for (SupervisorDetails s : supervisors) {
if (cluster.isBlackListed(s.getId())) {
continue;
}
if (s.getAllPorts().isEmpty()) {
continue;
}
nonBlacklistedSupervisorCount++;
if (cluster.isIdleSupervisorAvailableForEvenRebalance(s)) {
idleSupervisorCount++;
List<Integer> ports = new ArrayList<>(s.getAllPorts());
Collections.sort(ports);
for (Integer port : ports) {
idleTargets.add(new WorkerSlot(s.getId(), port));
}
}
}
if (idleTargets.isEmpty() || nonBlacklistedSupervisorCount == 0 || idleSupervisorCount == 0) {
return;
}

int maxFree = ObjectReader.getInt(
cluster.getConf().get(DaemonConfig.NIMBUS_EVEN_REBALANCE_MAX_FREE_PER_TOPOLOGY), 0);

List<TopologyDetails> orderedTopos = new ArrayList<>();
Map<String, Integer> remainingBudget = new HashMap<>();
for (TopologyDetails topo : topologies.getTopologies()) {
if (!cluster.hasIdleSupervisorReusableBy(topo)) {
continue;
}
int target = (topo.getNumWorkers() / nonBlacklistedSupervisorCount) * idleSupervisorCount;
if (target <= 0) {
continue;
}
if (maxFree > 0) {
target = Math.min(target, maxFree);
}
orderedTopos.add(topo);
remainingBudget.put(topo.getId(), target);
}
if (orderedTopos.isEmpty()) {
return;
}
orderedTopos.sort(Comparator.comparing(TopologyDetails::getId));

int totalRelocated = 0;
while (!idleTargets.isEmpty()) {
boolean movedThisIteration = false;
for (TopologyDetails topo : orderedTopos) {
if (idleTargets.isEmpty()) {
break;
}
int remaining = remainingBudget.getOrDefault(topo.getId(), 0);
if (remaining <= 0) {
continue;
}
if (relocateOneWorkerOntoIdleSlot(topo, cluster, idleTargets)) {
remainingBudget.put(topo.getId(), remaining - 1);
totalRelocated++;
movedThisIteration = true;
} else {
remainingBudget.put(topo.getId(), 0);
}
}
if (!movedThisIteration) {
break;
}
}
if (totalRelocated > 0) {
LOG.info("EvenScheduler: relocated {} worker(s) onto idle supervisor(s) round-robin across {} topologies.",
totalRelocated, orderedTopos.size());
}
}

/**
* Pulls a single worker from the supervisor where {@code topology} currently has the most workers and reassigns its
* executors onto the next idle slot from {@code idleTargets}. Returns false (without consuming an idle target) if
* the topology has no eligible source supervisor — namely all of its supervisors host at most one of its workers,
* which would otherwise drain that supervisor to zero and turn it into the next round's idle.
*/
private static boolean relocateOneWorkerOntoIdleSlot(TopologyDetails topology, Cluster cluster,
Deque<WorkerSlot> idleTargets) {
Map<WorkerSlot, List<ExecutorDetails>> slotToExecutors =
getAliveAssignedWorkerSlotExecutors(cluster, topology.getId());
Map<String, List<WorkerSlot>> nodeToSlots = new HashMap<>();
for (WorkerSlot slot : slotToExecutors.keySet()) {
nodeToSlots.computeIfAbsent(slot.getNodeId(), k -> new ArrayList<>()).add(slot);
}
List<Map.Entry<String, List<WorkerSlot>>> candidates = new ArrayList<>(nodeToSlots.entrySet());
candidates.removeIf(e -> e.getValue().size() < 2);
candidates.sort(Comparator
.<Map.Entry<String, List<WorkerSlot>>>comparingInt(e -> e.getValue().size())
.reversed()
.thenComparing(Map.Entry::getKey));
if (candidates.isEmpty()) {
return false;
}
List<WorkerSlot> slots = candidates.get(0).getValue();
slots.sort(Comparator.comparingInt(WorkerSlot::getPort));
WorkerSlot victim = slots.get(slots.size() - 1);
Collection<ExecutorDetails> execs = slotToExecutors.get(victim);
if (execs == null || execs.isEmpty()) {
return false;
}
if (idleTargets.isEmpty()) {
return false;
}
WorkerSlot target = idleTargets.poll();
cluster.freeSlot(victim);
cluster.assign(target, topology.getId(), execs);
return true;
}

private static Map<ExecutorDetails, WorkerSlot> scheduleTopology(TopologyDetails topology, Cluster cluster) {
List<WorkerSlot> availableSlots = cluster.getAvailableSlots();
Set<ExecutorDetails> allExecutors = topology.getExecutors();
Expand Down Expand Up @@ -148,7 +298,11 @@ public int compare(ExecutorDetails o1, ExecutorDetails o2) {
}

public static void scheduleTopologiesEvenly(Topologies topologies, Cluster cluster) {
redistributeOntoIdleSupervisors(topologies, cluster);
for (TopologyDetails topology : cluster.needsSchedulingTopologies()) {
if (topologies.getById(topology.getId()) == null) {
continue;
}
String topologyId = topology.getId();
Map<ExecutorDetails, WorkerSlot> newAssignment = scheduleTopology(topology, cluster);
Map<WorkerSlot, List<ExecutorDetails>> nodePortToExecutors = Utils.reverseMap(newAssignment);
Expand Down
Loading
Loading