Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void allocate(RoutingAllocation allocation) {
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights);
balancer.allocateUnassigned();
balancer.moveShards();
balancer.moveNonPreferred();
balancer.balance();

// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy.
Expand Down Expand Up @@ -711,6 +712,95 @@ protected int comparePivot(int j) {
return indices;
}

/**
* Move started shards that are in non-preferred allocations
*/
public void moveNonPreferred() {
boolean movedAShard = false;
do {
for (Iterator<AllocationDeciders.AllocationProblem> problemIterator = allocation.deciders()
.findAllocationProblems(allocation); problemIterator.hasNext();) {
AllocationDeciders.AllocationProblem problem = problemIterator.next();
if (tryResolve(problem)) {
movedAShard = true;
break;
}
logger.debug("Unable to resolve [{}]", problem);
}
// TODO: Update cluster info
} while (movedAShard);
}

private boolean tryResolve(AllocationDeciders.AllocationProblem problem) {
for (Iterator<ShardRouting> shardIterator = problem.preferredShardMovements(); shardIterator.hasNext();) {
ShardRouting shardRouting = shardIterator.next();
ProjectIndex index = projectIndex(shardRouting);
final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId());
sourceNode.removeShard(index, shardRouting);
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard(
shardRouting,
targetNode.getNodeId(),
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
problem.relocateReason(),
allocation.changes()
);
final ShardRouting shard = relocatingShards.v2();
targetNode.addShard(projectIndex(shard), shard);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
}
return true;
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
}
return false;
}

/**
* Makes a decision on whether to move a started shard to another node. The following rules apply
* to the {@link MoveDecision} return object:
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
* 2. If the shard's current allocation is preferred ({@link Decision.Type#YES}), no attempt will be made to move the shard and
* {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* 3. If the shard is not allowed ({@link Decision.Type#NO}), or not preferred ({@link Decision.Type#NOT_PREFERRED}) to remain
* on its current node, then {@link MoveDecision#getAllocationDecision()} will be populated with the decision of moving to
* another node. If {@link MoveDecision#forceMove()} returns {@code true}, then {@link MoveDecision#getTargetNode} will return
* a non-null value representing a node that returned {@link Decision.Type#YES} from canAllocate, otherwise the assignedNodeId
* will be null.
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
* {@link MoveDecision#getNodeDecisions} will have a non-null value.
*/
public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final ShardRouting shardRouting) {
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting);
index.assertMatch(shardRouting);

if (shardRouting.started() == false) {
// we can only move started shards
return MoveDecision.NOT_TAKEN;
}

final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(index, shardRouting);
RoutingNode routingNode = sourceNode.getRoutingNode();
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if ((canRemain.type() == Type.NOT_PREFERRED || canRemain.type() == Type.NO) == false) {
return MoveDecision.remain(canRemain);
}

sorter.reset(index);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly);
}

/**
* Move started shards that can not be allocated to a node anymore
*
Expand Down Expand Up @@ -839,6 +929,15 @@ private MoveDecision decideMove(
);
}

private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) {
Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation);
// not-preferred means no here
if (decision.type() == Type.NOT_PREFERRED) {
return Decision.NO;
}
return decision;
}

private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) {
// don't use canRebalance as we want hard filtering rules to apply. See #17698
return allocation.deciders().canAllocate(shardRouting, target, allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;

import java.util.Collection;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -153,4 +154,14 @@ public Decision canAllocateReplicaWhenThereIsRetentionLease(ShardRouting shardRo
public Optional<Set<String>> getForcedInitialShardAllocationToNodes(ShardRouting shardRouting, RoutingAllocation allocation) {
return Optional.empty();
}

/**
* Get a list of allocation problems that can be fixed by moving some shards
*
* @param allocation the current routing allocation
* @return A list of node IDs that contain shards this decider would like to move elsewhere, in order of descending priority
*/
public Optional<Collection<? extends AllocationDeciders.AllocationProblem>> getAllocationProblems(RoutingAllocation allocation) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.set.Sets;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -244,4 +248,34 @@ public Optional<Set<String>> getForcedInitialShardAllocationToNodes(ShardRouting
}
return result;
}

public Iterator<AllocationProblem> findAllocationProblems(RoutingAllocation routingAllocation) {
var problems = new TreeSet<>(Comparator.comparing(AllocationProblem::priority).reversed());
for (AllocationDecider decider : deciders) {
decider.getAllocationProblems(routingAllocation).ifPresent(problems::addAll);
}
return problems.iterator();
}

public interface AllocationProblem {

/**
* Shard movements to attempt to resolve the problem in descending priority order.
*/
Iterator<ShardRouting> preferredShardMovements();

/**
* The reason for the relocation
*
* @see RoutingChangesObserver#relocationStarted(ShardRouting, ShardRouting, String)
*/
String relocateReason();

/**
* We could prioritize them this way
*/
default int priority() {
return 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import org.elasticsearch.core.Strings;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread
* pool usage stats and any candidate shard's write load estimate.
Expand Down Expand Up @@ -109,6 +114,47 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented");
}

@Override
public Optional<Collection<? extends AllocationDeciders.AllocationProblem>> getAllocationProblems(RoutingAllocation allocation) {
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) {
return Optional.empty();
}

final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
final Collection<HotSpot> hotSpots = nodeUsageStatsForThreadPools.entrySet()
.stream()
.filter(entry -> entry.getValue().threadPoolUsageStatsMap().containsKey(ThreadPool.Names.WRITE))
.filter(entry -> {
long maxQueueLatency = entry.getValue()
.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE)
.maxThreadPoolQueueLatencyMillis();
return maxQueueLatency > writeLoadConstraintSettings.getQueueLatencyThreshold().millis();
})
.map(entry -> new HotSpot(entry.getKey()))
.collect(Collectors.toList());
return hotSpots.isEmpty() == false ? Optional.of(hotSpots) : Optional.empty();
}

private record HotSpot(String nodeId) implements AllocationDeciders.AllocationProblem {

@Override
public Iterator<ShardRouting> preferredShardMovements() {
// TODO: return shards in priority order
return null;
}

@Override
public String relocateReason() {
return "hot-spotting";
}

@Override
public String toString() {
return "Hot-spotting on node " + nodeId;
}
}

/**
* Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node.
* Returns the percent thread pool utilization change.
Expand Down