Skip to content

Commit

Permalink
Check if node is still present when collecting attribute shard routings
Browse files Browse the repository at this point in the history
The node we need to lookup for attribute colelction might not be part
of the `DiscoveryNodes` anymore due to node failure or shutdown. This
commit adds a check and removes the shard from the iteration.

Closes elastic#4589
  • Loading branch information
s1monw committed Jan 3, 2014
1 parent 69a4896 commit a21434f
Showing 1 changed file with 25 additions and 31 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.UnmodifiableIterator;
import jsr166y.ThreadLocalRandom;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -430,22 +431,9 @@ private AttributesRoutings getActiveAttribute(AttributesKey key, DiscoveryNodes
if (shardRoutings == null) {
synchronized (shardsByAttributeMutex) {
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(activeShards);
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (String attribute : key.attributes) {
String localAttributeValue = nodes.localNode().attributes().get(attribute);
if (localAttributeValue == null) {
continue;
}
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();
if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) {
iterator.remove();
to.add(fromShard);
}
}
}
ImmutableList<ShardRouting> to = collectAttributeShards(key, nodes, from);

shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from));
shardRoutings = new AttributesRoutings(to, ImmutableList.copyOf(from));
activeShardsByAttributes = MapBuilder.newMapBuilder(activeShardsByAttributes).put(key, shardRoutings).immutableMap();
}
}
Expand All @@ -457,28 +445,34 @@ private AttributesRoutings getInitializingAttribute(AttributesKey key, Discovery
if (shardRoutings == null) {
synchronized (shardsByAttributeMutex) {
ArrayList<ShardRouting> from = new ArrayList<ShardRouting>(allInitializingShards);
ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (String attribute : key.attributes) {
String localAttributeValue = nodes.localNode().attributes().get(attribute);
if (localAttributeValue == null) {
continue;
}
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();
if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) {
iterator.remove();
to.add(fromShard);
}
}
}

shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from));
ImmutableList<ShardRouting> to = collectAttributeShards(key, nodes, from);
shardRoutings = new AttributesRoutings(to, ImmutableList.copyOf(from));
initializingShardsByAttributes = MapBuilder.newMapBuilder(initializingShardsByAttributes).put(key, shardRoutings).immutableMap();
}
}
return shardRoutings;
}

private static ImmutableList<ShardRouting> collectAttributeShards(AttributesKey key, DiscoveryNodes nodes, ArrayList<ShardRouting> from) {
final ArrayList<ShardRouting> to = new ArrayList<ShardRouting>();
for (final String attribute : key.attributes) {
final String localAttributeValue = nodes.localNode().attributes().get(attribute);
if (localAttributeValue != null) {
for (Iterator<ShardRouting> iterator = from.iterator(); iterator.hasNext(); ) {
ShardRouting fromShard = iterator.next();
final DiscoveryNode discoveryNode = nodes.get(fromShard.currentNodeId());
if (discoveryNode == null) {
iterator.remove(); // node is not present anymore - ignore shard
} else if (localAttributeValue.equals(discoveryNode.attributes().get(attribute))) {
iterator.remove();
to.add(fromShard);
}
}
}
}
return ImmutableList.copyOf(to);
}

public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) {
return preferAttributesActiveInitializingShardsIt(attributes, nodes, pickIndex());
}
Expand Down

0 comments on commit a21434f

Please sign in to comment.