Skip to content

Commit

Permalink
Allocation: Shard Started messages should be matched using an exact m…
Browse files Browse the repository at this point in the history
…atch

When a node sends a shard started message to the master, the master goes through the routing table looking for the shard to start. At the moment we validate the indexUUID, the node the shard is assigned to and the fact that the shard is initializing. This check goes wrong if a relocating replica shard finishes recovery just at the moment the source node  leaves the cluster. In this case the master will cancel the recovery and will likely assign a new initializing replica to the same target node. In this case the message from the relocation recovery can activate the new replica wrongfully.

Also, the logic for decided whether an incoming shard started message will be applied was split between ShardStateAction and the AllocationService.
This commit does the following:
1) Let ShardStateAction only filter basic stuff like index existence and indexUUID.
2) Move the trickier shard started matching logic to the AllocationService and make it stricter
3) Unify ShardStateAction filtering logic for both shard started and shard failed.
4) Add unit tests for all of the above.

For an example test failure see: http://build-us-00.elastic.co/job/es_core_16_centos/388/

Closes #11999
  • Loading branch information
bleskes committed Jul 10, 2015
1 parent 5b4e863 commit 28090b3
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 73 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -145,26 +146,17 @@ public ClusterState execute(ClusterState currentState) {
return currentState;
}

MetaData metaData = currentState.getMetaData();
final MetaData metaData = currentState.getMetaData();


List<FailedRerouteAllocation.FailedShard> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexMetaData == null) {
continue;
}
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
logger.debug("{} ignoring shard failed, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry);
continue;
}
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "failed", metaData, logger)) {
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(entry.shardRouting, entry.reason));
}

logger.debug("{} will apply shard failed {}", shardRouting.shardId(), shardRoutingEntry);
shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(shardRouting, shardRoutingEntry.reason));
// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
}

RoutingAllocation.Result routingResult = allocationService.applyFailedShards(currentState, shardRoutingsToBeApplied);
Expand All @@ -189,6 +181,31 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

static List<ShardRoutingEntry> extractShardsToBeApplied(List<ShardRoutingEntry> shardRoutingEntries, String type, MetaData metaData, ESLogger logger) {
List<ShardRoutingEntry> shardRoutingsToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
// if there is no metadata or the current index is not of the right uuid, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexMetaData == null) {
logger.debug("{} ignoring shard {}, unknown index in {}", shardRouting.shardId(), type, shardRoutingEntry);
continue;
}
if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
logger.debug("{} ignoring shard {}, different index uuid, current {}, got {}", shardRouting.shardId(), type, indexMetaData.getUUID(), shardRoutingEntry);
continue;
}

// more debug info will be logged by the allocation service
logger.trace("{} will apply shard {} {}", shardRouting.shardId(), type, shardRoutingEntry);
shardRoutingsToBeApplied.add(shardRoutingEntry);
}
return shardRoutingsToBeApplied;

}

private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) {
logger.debug("received shard started for {}", shardRoutingEntry);
// buffer shard started requests, and the state update tasks will simply drain it
Expand Down Expand Up @@ -217,56 +234,15 @@ public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
MetaData metaData = currentState.getMetaData();


List<ShardRouting> shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size());
for (ShardRoutingEntry entry : extractShardsToBeApplied(shardRoutingEntries, "started", metaData, logger)) {
shardRoutingToBeApplied.add(entry.shardRouting);
}

for (int i = 0; i < shardRoutingEntries.size(); i++) {
ShardRoutingEntry shardRoutingEntry = shardRoutingEntries.get(i);
shardRoutingEntry.processed = true;
ShardRouting shardRouting = shardRoutingEntry.shardRouting;
try {
IndexMetaData indexMetaData = metaData.index(shardRouting.index());
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
// if there is no metadata, no routing table or the current index is not of the right uuid, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexMetaData == null) {
continue;
}
if (indexRoutingTable == null) {
continue;
}

if (!indexMetaData.isSameUUID(shardRoutingEntry.indexUUID)) {
logger.debug("{} ignoring shard started, different index uuid, current {}, got {}", shardRouting.shardId(), indexMetaData.getUUID(), shardRoutingEntry);
continue;
}

// find the one that maps to us, if its already started, no need to do anything...
// the shard might already be started since the nodes that is starting the shards might get cluster events
// with the shard still initializing, and it will try and start it again (until the verification comes)

IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardRouting.id());

boolean applyShardEvent = true;

for (ShardRouting entry : indexShardRoutingTable) {
if (shardRouting.currentNodeId().equals(entry.currentNodeId())) {
// we found the same shard that exists on the same node id
if (!entry.initializing()) {
// shard is in initialized state, skipping event (probable already started)
logger.debug("{} ignoring shard started event for {}, current state: {}", shardRouting.shardId(), shardRoutingEntry, entry.state());
applyShardEvent = false;
}
}
}

if (applyShardEvent) {
shardRoutingToBeApplied.add(shardRouting);
logger.debug("{} will apply shard started {}", shardRouting.shardId(), shardRoutingEntry);
}

} catch (Throwable t) {
logger.error("{} unexpected failure while processing shard started [{}]", t, shardRouting.shardId(), shardRouting);
}
// mark all entries as processed
for (ShardRoutingEntry entry : shardRoutingEntries) {
entry.processed = true;
}

if (shardRoutingToBeApplied.isEmpty()) {
Expand Down Expand Up @@ -307,18 +283,18 @@ public void messageReceived(ShardRoutingEntry request, TransportChannel channel)

static class ShardRoutingEntry extends TransportRequest {

private ShardRouting shardRouting;
ShardRouting shardRouting;

private String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;
String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE;

private String reason;
String reason;

volatile boolean processed; // state field, no need to serialize

ShardRoutingEntry() {
}

private ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) {
ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String reason) {
this.shardRouting = shardRouting;
this.reason = reason;
this.indexUUID = indexUUID;
Expand Down
Expand Up @@ -247,7 +247,7 @@ private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation al
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing"));
changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing"));
}

// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
Expand Down Expand Up @@ -331,14 +331,20 @@ private boolean applyStartedShards(RoutingNodes routingNodes, Iterable<? extends
if (currentRoutingNode != null) {
for (ShardRouting shard : currentRoutingNode) {
if (shard.shardId().equals(startedShard.shardId())) {
relocatingNodeId = shard.relocatingNodeId();
if (!shard.started()) {
if (shard.equals(startedShard)) {
relocatingNodeId = shard.relocatingNodeId();
dirty = true;
routingNodes.started(shard);
logger.trace("{} marked as started", shard);
} else {
logger.debug("failed to find shard [{}] in order to start it [no matching shard on node], ignoring", startedShard);
}
break;
}
}
} else {
logger.debug("failed to find shard [{}] in order to start it [failed to find node], ignoring", startedShard);

}

// startedShard is the current state of the shard (post relocation for example)
Expand Down Expand Up @@ -404,6 +410,7 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}
}
if (dirty) {
logger.debug("failed shard {} found in routingNodes, failing it", failedShard);
// now, find the node that we are relocating *from*, and cancel its relocation
RoutingNode relocatingFromNode = routingNodes.node(failedShard.relocatingNodeId());
if (relocatingFromNode != null) {
Expand Down Expand Up @@ -440,6 +447,7 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}
}
if (dirty) {
logger.debug("failed shard {} found in routingNodes, failing it", failedShard);
// next, we need to find the target initializing shard that is recovering from, and remove it...
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId());
if (initializingNode != null) {
Expand Down Expand Up @@ -490,7 +498,9 @@ private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting fail
}
}
}
if (!dirty) {
if (dirty) {
logger.debug("failed shard {} found in routingNodes and failed", failedShard);
} else {
logger.debug("failed shard {} not found in routingNodes, ignoring it", failedShard);
}
}
Expand Down
Expand Up @@ -45,6 +45,14 @@ public FailedShard(ShardRouting shard, String details) {
this.shard = shard;
this.details = details;
}

@Override
public String toString() {
return "FailedShard{" +
"shard=" + shard +
", details='" + details + '\'' +
'}';
}
}

private final List<FailedShard> failedShards;
Expand Down
@@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.action.shard;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.test.ElasticsearchTestCase;

import java.util.ArrayList;
import java.util.List;

import static org.hamcrest.Matchers.equalTo;


public class ShardStateActionTest extends ElasticsearchTestCase {

public void testShardFiltering() {
final IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_UUID, "test_uuid"))
.numberOfShards(2).numberOfReplicas(0)
.build();
ClusterState.Builder stateBuilder = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder()
.put(new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT)).masterNodeId("node1")
.put(new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT))
)
.metaData(MetaData.builder().put(indexMetaData, false));

final ShardRouting initShard = TestShardRouting.newShardRouting("test", 0, "node1", randomBoolean(), ShardRoutingState.INITIALIZING, 1);
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1);
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1);
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build())
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build())));

ClusterState state = stateBuilder.build();

ArrayList<ShardStateAction.ShardRoutingEntry> listToFilter = new ArrayList<>();
ArrayList<ShardStateAction.ShardRoutingEntry> expectedToBeApplied = new ArrayList<>();

listToFilter.add(new ShardStateAction.ShardRoutingEntry(initShard, indexMetaData.uuid() + "_suffix", "wrong_uuid"));

listToFilter.add(new ShardStateAction.ShardRoutingEntry(relocatingShard.buildTargetRelocatingShard(), indexMetaData.uuid(), "relocating_to_node"));
expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1));

listToFilter.add(new ShardStateAction.ShardRoutingEntry(startedShard, indexMetaData.uuid(), "started shard"));
expectedToBeApplied.add(listToFilter.get(listToFilter.size() - 1));

listToFilter.add(new ShardStateAction.ShardRoutingEntry(TestShardRouting.newShardRouting(initShard.index() + "_NA", initShard.id(),
initShard.currentNodeId(), initShard.primary(), initShard.state(), initShard.version()), indexMetaData.uuid(), "wrong_uuid"));

List<ShardStateAction.ShardRoutingEntry> toBeApplied = ShardStateAction.extractShardsToBeApplied(listToFilter, "for testing", state.metaData(), logger);
if (toBeApplied.size() != expectedToBeApplied.size()) {
fail("size mismatch.\n Got: \n [" + toBeApplied + "], \n expected: \n [" + expectedToBeApplied + "]");
}
for (int i = 0; i < toBeApplied.size(); i++) {
final ShardStateAction.ShardRoutingEntry found = toBeApplied.get(i);
final ShardStateAction.ShardRoutingEntry expected = expectedToBeApplied.get(i);
assertThat(found, equalTo(expected));
}
}
}

0 comments on commit 28090b3

Please sign in to comment.