Permalink
Browse files

Use existing datastructures from RoutingNodes to elect unassigned pri…

…maries

Currently we trying to find a replica for a primary that is allocated by
running through all shards in the cluster while RoutingNodes already has
a datastructure keyed by shard ID for this. We should lookup this
directly rather than using linear probing. This improves shard allocation performance
by 5x.
  • Loading branch information...
s1monw committed Dec 18, 2013
1 parent 62104a1 commit 314499cee078e42353b3373901368152d7c4fa4e
@@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import org.elasticsearch.cluster.ClusterState;
@@ -97,7 +98,7 @@ public RoutingNodes(ClusterState clusterState) {
}
MutableShardRouting sr = new MutableShardRouting(shard);
entries.add(sr);
activeShardsAdd(sr);
assignedShardsAdd(sr);
if (shard.relocating()) {
entries = nodesToShards.get(shard.relocatingNodeId());
relocatingShards++;
@@ -110,7 +111,7 @@ public RoutingNodes(ClusterState clusterState) {
sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(),
shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version());
entries.add(sr);
activeShardsAdd(sr);
assignedShardsAdd(sr);
} else if (!shard.active()) { // shards that are initializing without being relocated
if (shard.primary()) {
inactivePrimaryCount++;
@@ -119,7 +120,7 @@ public RoutingNodes(ClusterState clusterState) {
}
} else {
MutableShardRouting sr = new MutableShardRouting(shard);
activeShardsAdd(sr);
assignedShardsAdd(sr);
unassignedShards.add(sr);
}
}
@@ -251,22 +252,40 @@ public int getRelocatingShardCount() {
*/
public MutableShardRouting activePrimary(ShardRouting shard) {
assert !shard.primary();
for (MutableShardRouting shardRouting : activeShards(shard.shardId())) {
if (shardRouting.primary()) {
if (shardRouting.active()) {
return shardRouting;
}
break;
for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) {
if (shardRouting.primary() && shardRouting.active()) {
return shardRouting;
}
}
return null;
}
/**
* Returns one active replica shard for the given ShardRouting shard ID or <code>null</code> if
* no active replica is found.
*/
public MutableShardRouting activeReplica(ShardRouting shard) {
for (MutableShardRouting shardRouting : assignedShards(shard.shardId())) {
if (!shardRouting.primary() && shardRouting.active()) {
return shardRouting;
}
}
return null;
}
/**
* Returns all shards that are not in the state UNASSIGNED with the same shard
* ID as the given shard.
*/
public Iterable<MutableShardRouting> assignedShards(ShardRouting shard) {
return Iterables.unmodifiableIterable(assignedShards(shard.shardId()));
}
/**
* Returns <code>true</code> iff all replicas are active for the given shard routing. Otherwise <code>false</code>
*/
public boolean allReplicasActive(ShardRouting shardRouting) {
final Set<MutableShardRouting> shards = activeShards(shardRouting.shardId());
final Set<MutableShardRouting> shards = assignedShards(shardRouting.shardId());
if (shards.isEmpty() || shards.size() < this.routingTable.index(shardRouting.index()).shard(shardRouting.id()).size()) {
return false; // if we are empty nothing is active if we have less than total at least one is unassigned
}
@@ -352,7 +371,7 @@ public void assign(MutableShardRouting shard, String nodeId) {
if (shard.state() == ShardRoutingState.RELOCATING) {
relocatingShards++;
}
activeShardsAdd(shard);
assignedShardsAdd(shard);
}
/**
@@ -410,7 +429,7 @@ public void swapPrimaryFlag(MutableShardRouting... shards) {
private static final Set<MutableShardRouting> EMPTY = Collections.emptySet();
private Set<MutableShardRouting> activeShards(ShardId shardId) {
private Set<MutableShardRouting> assignedShards(ShardId shardId) {
final Set<MutableShardRouting> replicaSet = assignedShards.get(shardId);
return replicaSet == null ? EMPTY : Collections.unmodifiableSet(replicaSet);
}
@@ -430,10 +449,10 @@ private void remove(MutableShardRouting shard) {
} else if (shard.relocating()) {
cancelRelocation(shard);
}
activeShardsRemove(shard);
assignedShardsRemove(shard);
}
private void activeShardsAdd(MutableShardRouting shard) {
private void assignedShardsAdd(MutableShardRouting shard) {
if (shard.unassigned()) {
// no unassigned
return;
@@ -446,7 +465,7 @@ private void activeShardsAdd(MutableShardRouting shard) {
replicaSet.add(shard);
}
private void activeShardsRemove(MutableShardRouting shard) {
private void assignedShardsRemove(MutableShardRouting shard) {
Set<MutableShardRouting> replicaSet = assignedShards.get(shard.shardId());
if (replicaSet != null) {
if (replicaSet.contains(shard)) {
@@ -488,6 +507,7 @@ public RoutingNodeIterator routingNodeIter(String nodeId) {
public final static class UnassignedShards implements Iterable<MutableShardRouting> {
private final List<MutableShardRouting> unassigned;
private int primaries = 0;
private long transactionId = 0;
private final UnassignedShards source;
@@ -581,10 +601,6 @@ public UnassignedShards transactionBegin() {
return new UnassignedShards(this);
}
public void copyAll(Collection<MutableShardRouting> others) {
others.addAll(unassigned);
}
public MutableShardRouting[] drain() {
MutableShardRouting[] mutableShardRoutings = unassigned.toArray(new MutableShardRouting[unassigned.size()]);
unassigned.clear();
@@ -649,7 +665,7 @@ public static boolean assertShardStats(RoutingNodes routingNodes) {
}
}
}
Set<MutableShardRouting> mutableShardRoutings = routingNodes.activeShards(new ShardId(index, i));
Set<MutableShardRouting> mutableShardRoutings = routingNodes.assignedShards(new ShardId(index, i));
for (MutableShardRouting r : mutableShardRoutings) {
assert shards.contains(r);
shards.remove(r);
@@ -246,65 +246,52 @@ private boolean moveShards(RoutingAllocation allocation) {
private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
if (!routingNodes.hasUnassignedPrimaries()) {
// move out if we don't have unassigned primaries
return changed;
}
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary() && !shardEntry.assignedToNode()) {
boolean elected = false;
// primary and not assigned, go over and find a replica that is assigned and active (since it might be relocating)
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting shardEntry2 : routingNode) {
if (shardEntry.shardId().equals(shardEntry2.shardId()) && shardEntry2.active()) {
assert shardEntry2.assignedToNode();
assert !shardEntry2.primary();
changed = true;
routingNodes.swapPrimaryFlag(shardEntry, shardEntry2);
if (shardEntry2.relocatingNodeId() != null) {
// its also relocating, make sure to move the other routing to primary
RoutingNode node = routingNodes.node(shardEntry2.relocatingNodeId());
if (node != null) {
for (MutableShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(shardEntry2.shardId()) && !shardRouting.primary()) {
routingNodes.swapPrimaryFlag(shardRouting);
break;
}
}
if (shardEntry.primary()) {
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
if (candidate != null) {
routingNodes.swapPrimaryFlag(shardEntry, candidate);
if (candidate.relocatingNodeId() != null) {
changed = true;
// its also relocating, make sure to move the other routing to primary
RoutingNode node = routingNodes.node(candidate.relocatingNodeId());
if (node != null) {
for (MutableShardRouting shardRouting : node) {
if (shardRouting.shardId().equals(candidate.shardId()) && !shardRouting.primary()) {
routingNodes.swapPrimaryFlag(shardRouting);
break;
}
}
elected = true;
break;
}
}
if (elected) {
break;
}
}
}
}
// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
List<ShardRouting> shardsToFail = null;
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary() && !shardEntry.assignedToNode()) {
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting shardEntry2 : routingNode) {
if (shardEntry.shardId().equals(shardEntry2.shardId()) && !shardEntry2.active()) {
if (routingNodes.hasUnassignedPrimaries()) {
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for(MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary()) {
changed = true;
if (shardsToFail == null) {
shardsToFail = new ArrayList<ShardRouting>();
}
shardsToFail.add(shardEntry2);
shardsToFail.add(routing);
}
}
}
}
}
if (shardsToFail != null) {
for (ShardRouting shardToFail : shardsToFail) {
applyFailedShard(allocation, shardToFail, false);
if (shardsToFail != null) {
for (ShardRouting shardToFail : shardsToFail) {
applyFailedShard(allocation, shardToFail, false);
}
}
}
return changed;

0 comments on commit 314499c

Please sign in to comment.