Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify handling of ignored unassigned shards #12339

Merged
merged 1 commit into from
Jul 20, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 96 additions & 41 deletions core/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {

private final Map<String, RoutingNode> nodesToShards = newHashMap();

private final UnassignedShards unassignedShards = new UnassignedShards();

private final List<ShardRouting> ignoredUnassignedShards = newArrayList();
private final UnassignedShards unassignedShards = new UnassignedShards(this);

private final Map<ShardId, List<ShardRouting>> assignedShards = newHashMap();

Expand Down Expand Up @@ -185,10 +183,6 @@ public boolean hasUnassigned() {
return !unassignedShards.isEmpty();
}

public List<ShardRouting> ignoredUnassigned() {
return this.ignoredUnassignedShards;
}

public UnassignedShards unassigned() {
return this.unassignedShards;
}
Expand Down Expand Up @@ -526,24 +520,30 @@ public void reinitShadowPrimary(ShardRouting candidate) {

}

public final static class UnassignedShards implements Iterable<ShardRouting> {
public static final class UnassignedShards implements Iterable<ShardRouting> {

private final RoutingNodes nodes;
private final List<ShardRouting> unassigned;
private final List<ShardRouting> ignored;

private int primaries = 0;
private long transactionId = 0;
private final UnassignedShards source;
private final long sourceTransactionId;

public UnassignedShards(UnassignedShards other) {
this.nodes = other.nodes;
source = other;
sourceTransactionId = other.transactionId;
unassigned = new ArrayList<>(other.unassigned);
ignored = new ArrayList<>(other.ignored);
primaries = other.primaries;
}

public UnassignedShards() {
public UnassignedShards(RoutingNodes nodes) {
this.nodes = nodes;
unassigned = new ArrayList<>();
ignored = new ArrayList<>();
source = null;
sourceTransactionId = -1;
}
Expand All @@ -556,12 +556,6 @@ public void add(ShardRouting shardRouting) {
transactionId++;
}

public void addAll(Collection<ShardRouting> mutableShardRoutings) {
for (ShardRouting r : mutableShardRoutings) {
add(r);
}
}

public void sort(Comparator<ShardRouting> comparator) {
CollectionUtil.timSort(unassigned, comparator);
}
Expand All @@ -575,29 +569,87 @@ public int numPrimaries() {
}

@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> iterator = unassigned.iterator();
return new Iterator<ShardRouting>() {
private ShardRouting current;
@Override
public boolean hasNext() {
return iterator.hasNext();
}
public UnassignedIterator iterator() {
return new UnassignedIterator();
}

@Override
public ShardRouting next() {
return current = iterator.next();
}
/**
* The list of ignored unassigned shards (read only). The ignored unassigned shards
* are not part of the formal unassigned list, but are kept around and used to build
* back the list of unassigned shards as part of the routing table.
*/
public List<ShardRouting> ignored() {
return Collections.unmodifiableList(ignored);
}

@Override
public void remove() {
iterator.remove();
if (current.primary()) {
primaries--;
}
transactionId++;
/**
* Adds a shard to the ignore unassigned list. Should be used with caution, typically,
* the correct usage is to removeAndIgnore from the iterator.
*/
public void ignoreShard(ShardRouting shard) {
ignored.add(shard);
transactionId++;
}

public class UnassignedIterator implements Iterator<ShardRouting> {

private final Iterator<ShardRouting> iterator;
private ShardRouting current;

public UnassignedIterator() {
this.iterator = unassigned.iterator();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ShardRouting next() {
return current = iterator.next();
}

/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId) {
initialize(nodeId, current.version());
}

/**
* Initializes the current unassigned shard and moves it from the unassigned list.
*/
public void initialize(String nodeId, long version) {
innerRemove();
nodes.initialize(new ShardRouting(current, version), nodeId);
}

/**
* Removes and ignores the unassigned shard (will be ignored for this run, but
* will be added back to unassigned once the metadata is constructed again).
*/
public void removeAndIgnore() {
innerRemove();
ignoreShard(current);
}

/**
* Unsupported operation, just there for the interface. Use {@link #removeAndIgnore()} or
* {@link #initialize(String)}.
*/
@Override
public void remove() {
throw new UnsupportedOperationException("remove is not supported in unassigned iterator, use removeAndIgnore or initialize");
}

private void innerRemove() {
iterator.remove();
if (current.primary()) {
primaries--;
}
};
transactionId++;
}
}

public boolean isEmpty() {
Expand All @@ -611,16 +663,19 @@ public void shuffle() {
public void clear() {
transactionId++;
unassigned.clear();
ignored.clear();
primaries = 0;
}

public void transactionEnd(UnassignedShards shards) {
assert shards.source == this && shards.sourceTransactionId == transactionId :
"Expected ID: " + shards.sourceTransactionId + " actual: " + transactionId + " Expected Source: " + shards.source + " actual: " + this;
transactionId++;
this.unassigned.clear();
this.unassigned.addAll(shards.unassigned);
this.primaries = shards.primaries;
assert shards.source == this && shards.sourceTransactionId == transactionId :
"Expected ID: " + shards.sourceTransactionId + " actual: " + transactionId + " Expected Source: " + shards.source + " actual: " + this;
transactionId++;
this.unassigned.clear();
this.unassigned.addAll(shards.unassigned);
this.ignored.clear();
this.ignored.addAll(shards.ignored);
this.primaries = shards.primaries;
}

public UnassignedShards transactionBegin() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public Builder updateNodes(RoutingNodes routingNodes) {
indexBuilder.addShard(refData, shardRoutingEntry);
}
}
for (ShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.ignoredUnassigned())) {
for (ShardRouting shardRoutingEntry : Iterables.concat(routingNodes.unassigned(), routingNodes.unassigned().ignored())) {
String index = shardRoutingEntry.index();
IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index);
if (indexBuilder == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,22 +460,6 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}
}

// move all the shards matching the failed shard to the end of the unassigned list
// so we give a chance for other allocations and won't create poison failed allocations
// that can keep other shards from being allocated (because of limits applied on how many
// shards we can start per node)
List<ShardRouting> shardsToMove = Lists.newArrayList();
for (Iterator<ShardRouting> unassignedIt = routingNodes.unassigned().iterator(); unassignedIt.hasNext(); ) {
ShardRouting unassignedShardRouting = unassignedIt.next();
if (unassignedShardRouting.shardId().equals(failedShard.shardId())) {
unassignedIt.remove();
shardsToMove.add(unassignedShardRouting);
}
}
if (!shardsToMove.isEmpty()) {
routingNodes.unassigned().addAll(shardsToMove);
}

matchedNode.moveToUnassigned(unassignedInfo);
}
assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@

import java.util.*;

import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;

/**
Expand Down Expand Up @@ -292,7 +291,7 @@ private boolean initialize(RoutingNodes routing, RoutingNodes.UnassignedShards u
}
indices.addAll(allocation.routingTable().indicesRouting().keySet());
buildModelFromAssigned(routing.shards(assignedFilter));
return allocateUnassigned(unassigned, routing.ignoredUnassigned());
return allocateUnassigned(unassigned);
}

private static float absDelta(float lower, float higher) {
Expand Down Expand Up @@ -551,7 +550,7 @@ private void buildModelFromAssigned(Iterable<ShardRouting> shards) {
* Allocates all given shards on the minimal eligable node for the shards index
* with respect to the weight function. All given shards must be unassigned.
*/
private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned, List<ShardRouting> ignoredUnassigned) {
private boolean allocateUnassigned(RoutingNodes.UnassignedShards unassigned) {
assert !nodes.isEmpty();
if (logger.isTraceEnabled()) {
logger.trace("Start allocating unassigned shards");
Expand Down Expand Up @@ -600,9 +599,9 @@ public int compare(ShardRouting o1,
if (!shard.primary()) {
boolean drop = deciders.canAllocate(shard, allocation).type() == Type.NO;
if (drop) {
ignoredUnassigned.add(shard);
unassigned.ignoreShard(shard);
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
ignoredUnassigned.add(primary[++i]);
unassigned.ignoreShard(primary[++i]);
}
continue;
} else {
Expand Down Expand Up @@ -706,10 +705,10 @@ public int compare(ShardRouting o1,
} else if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
}
ignoredUnassigned.add(shard);
unassigned.ignoreShard(shard);
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) {
ignoredUnassigned.add(secondary[--secondaryLength]);
unassigned.ignoreShard(secondary[--secondaryLength]);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,11 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
throw new IllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
// go over and remove it from the unassigned
for (Iterator<ShardRouting> it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
if (it.next() != shardRouting) {
continue;
}
it.remove();
routingNodes.initialize(shardRouting, routingNode.nodeId());
it.initialize(routingNode.nodeId());
if (shardRouting.primary()) {
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
// and we want to force allocate it (and create a new index for it)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
final MetaData metaData = routingNodes.metaData();

final Iterator<ShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();

Expand All @@ -69,8 +69,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
if (shardState.hasData() == false) {
logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
unassignedIterator.removeAndIgnore();
continue;
}

Expand All @@ -83,8 +82,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
// if we are restoring this shard we still can allocate
if (shard.restoreSource() == null) {
// we can't really allocate, so ignore it and continue
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
unassignedIterator.removeAndIgnore();
logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound);
} else {
logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
Expand All @@ -97,19 +95,16 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id());
unassignedIterator.remove();
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
} else if (nodesToAllocate.throttleNodes.isEmpty() == true && nodesToAllocate.noNodes.isEmpty() == false) {
DiscoveryNode node = nodesToAllocate.noNodes.get(0);
logger.debug("[{}][{}]: forcing allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
changed = true;
routingNodes.initialize(new ShardRouting(shard, nodesAndVersions.highestVersion), node.id());
unassignedIterator.remove();
unassignedIterator.initialize(node.id(), nodesAndVersions.highestVersion);
} else {
// we are throttling this, but we have enough to allocate to this node, ignore it for now
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, nodesToAllocate.throttleNodes);
unassignedIterator.remove();
routingNodes.ignoredUnassigned().add(shard);
unassignedIterator.removeAndIgnore();
}
}
return changed;
Expand Down
Loading