Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up shard balancer by reusing shard model while moving shards that can no longer be allocated to a node #16926

Merged
merged 1 commit into from Mar 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -310,7 +310,7 @@ private boolean reroute(RoutingAllocation allocation) {
}

// move shards that no longer can be allocated
changed |= moveShards(allocation);
changed |= shardsAllocators.moveShards(allocation);

// rebalance
changed |= shardsAllocators.rebalance(allocation);
Expand All @@ -327,46 +327,6 @@ public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocatio
}
}

private boolean moveShards(RoutingAllocation allocation) {
boolean changed = false;

// create a copy of the shards interleaving between nodes, and check if they can remain
List<ShardRouting> shards = new ArrayList<>();
int index = 0;
boolean found = true;
final RoutingNodes routingNodes = allocation.routingNodes();
while (found) {
found = false;
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.size()) {
continue;
}
found = true;
shards.add(routingNode.get(index));
}
index++;
}
for (int i = 0; i < shards.size(); i++) {
ShardRouting shardRouting = shards.get(i);
// we can only move started shards...
if (!shardRouting.started()) {
continue;
}
final RoutingNode routingNode = routingNodes.node(shardRouting.currentNodeId());
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
if (!moved) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} else {
changed = true;
}
}
}
return changed;
}

private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
Expand Down
Expand Up @@ -42,13 +42,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.PriorityComparator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -119,9 +121,9 @@ public boolean rebalance(RoutingAllocation allocation) {
}

@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public boolean moveShards(RoutingAllocation allocation) {
final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
return balancer.move(shardRouting, node);
return balancer.moveShards();
}

/**
Expand Down Expand Up @@ -489,56 +491,93 @@ protected int comparePivot(int j) {
}

/**
* This function executes a move operation moving the given shard from
* the given node to the minimal eligible node with respect to the
* weight function. Iff the shard is moved the shard will be set to
* Move started shards that can not be allocated to a node anymore
*
* For each shard to be moved this function executes a move operation
* to the minimal eligible node with respect to the
* weight function. If a shard is moved the shard will be set to
* {@link ShardRoutingState#RELOCATING} and a shadow instance of this
* shard is created with an incremented version in the state
* {@link ShardRoutingState#INITIALIZING}.
*
* @return <code>true</code> iff the shard has successfully been moved.
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
public boolean move(ShardRouting shard, RoutingNode node ) {
if (nodes.isEmpty() || !shard.started()) {
/* with no nodes or a not started shard this is pointless */
public boolean moveShards() {
if (nodes.isEmpty()) {
/* with no nodes this is pointless */
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Try moving shard [{}] from [{}]", shard, node);

// Create a copy of the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.
List<ShardRouting> shards = new ArrayList<>();
int index = 0;
boolean found = true;
while (found) {
found = false;
for (RoutingNode routingNode : routingNodes) {
if (index >= routingNode.size()) {
continue;
}
found = true;
ShardRouting shardRouting = routingNode.get(index);
// we can only move started shards...
if (shardRouting.started()) {
shards.add(shardRouting);
}
}
index++;
}
if (shards.isEmpty()) {
return false;
}

final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (!changed) {
final ModelNode sourceNode = nodes.get(node.nodeId());
assert sourceNode != null;
if (changed == false) {
final NodeSorter sorter = newNodeSorter();
sorter.reset(shard.getIndexName());
final ModelNode[] nodes = sorter.modelNodes;
assert sourceNode.containsShard(shard);
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/

for (ModelNode currentNode : nodes) {
if (currentNode.getNodeId().equals(node.nodeId())) {
continue;
}
RoutingNode target = currentNode.getRoutingNode(routingNodes);
Decision allocationDecision = allocation.deciders().canAllocate(shard, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation);
Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
if (decision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
sourceNode.removeShard(shard);
ShardRouting targetRelocatingShard = routingNodes.relocate(shard, target.nodeId(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
currentNode.addShard(targetRelocatingShard, decision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shard, currentNode.getNodeId());
final ModelNode[] modelNodes = sorter.modelNodes;
for (ShardRouting shardRouting : shards) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
assert sourceNode != null && sourceNode.containsShard(shardRouting);
final RoutingNode routingNode = sourceNode.getRoutingNode(routingNodes);
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
sorter.reset(shardRouting.getIndexName());
/*
* the sorter holds the minimum weight node first for the shards index.
* We now walk through the nodes until we find a node to allocate the shard.
* This is not guaranteed to be balanced after this operation we still try best effort to
* allocate on the minimal eligible node.
*/
boolean moved = false;
for (ModelNode currentNode : modelNodes) {
if (currentNode == sourceNode) {
continue;
}
RoutingNode target = currentNode.getRoutingNode(routingNodes);
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(shardRouting, allocation);
if (allocationDecision.type() == Type.YES && rebalanceDecision.type() == Type.YES) { // TODO maybe we can respect throttling here too?
Decision sourceDecision = sourceNode.removeShard(shardRouting);
ShardRouting targetRelocatingShard = routingNodes.relocate(shardRouting, target.nodeId(), allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
// re-add (now relocating shard) to source node
sourceNode.addShard(shardRouting, sourceDecision);
Decision targetDecision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
currentNode.addShard(targetRelocatingShard, targetDecision);
if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, routingNode.node());
}
moved = true;
changed = true;
break;
}
}
if (moved == false) {
logger.debug("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
}
changed = true;
break;
}
}
}
Expand Down
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
Expand All @@ -36,42 +35,40 @@
public interface ShardsAllocator {

/**
* Applies changes on started nodes based on the implemented algorithm. For example if a
* shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
* Applies changes on started nodes based on the implemented algorithm. For example if a
* shard has changed to {@link ShardRoutingState#STARTED} from {@link ShardRoutingState#RELOCATING}
* this allocator might apply some cleanups on the node that used to hold the shard.
* @param allocation all started {@link ShardRouting shards}
*/
void applyStartedShards(StartedRerouteAllocation allocation);

/**
* Applies changes on failed nodes based on the implemented algorithm.
* Applies changes on failed nodes based on the implemented algorithm.
* @param allocation all failed {@link ShardRouting shards}
*/
void applyFailedShards(FailedRerouteAllocation allocation);

/**
* Assign all unassigned shards to nodes
*
* Assign all unassigned shards to nodes
*
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean allocateUnassigned(RoutingAllocation allocation);

/**
* Rebalancing number of shards on all nodes
*
*
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean rebalance(RoutingAllocation allocation);

/**
* Moves a shard from the given node to other node.
*
* @param shardRouting the shard to move
* @param node A node containing the shard
* Move started shards that can not be allocated to a node anymore
*
* @param allocation current node allocation
* @return <code>true</code> if the allocation has changed, otherwise <code>false</code>
*/
boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation);
boolean moveShards(RoutingAllocation allocation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this way more anyway

}
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.cluster.routing.allocation.allocator;

import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
Expand Down Expand Up @@ -96,7 +94,7 @@ public boolean rebalance(RoutingAllocation allocation) {
}

@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return allocator.move(shardRouting, node, allocation);
public boolean moveShards(RoutingAllocation allocation) {
return allocator.moveShards(allocation);
}
}
Expand Up @@ -60,7 +60,7 @@ public boolean rebalance(RoutingAllocation allocation) {
return false;
}
@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public boolean moveShards(RoutingAllocation allocation) {
return false;
}
}
Expand Down
Expand Up @@ -320,7 +320,7 @@ public boolean rebalance(RoutingAllocation allocation) {
}

@Override
public boolean move(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public boolean moveShards(RoutingAllocation allocation) {
return false;
}

Expand Down