Skip to content

Commit

Permalink
Recovery: add a timeout to local mapping change check
Browse files Browse the repository at this point in the history
After phase1 of recovery is completed, we check that all pending mapping changes have been sent to the master and processed by the other nodes. This is needed in order to make sure that the target node has the latest mapping (we just copied over the corresponding lucene files). To make sure we do not miss updates, we do so under a local cluster state update task. At the moment we don't have a timeout when waiting on the task to be completed. If the local node update thread is very busy, this may stall the recovery for too long. This commit adds a timeout (equal to `indices.recovery.internal_action_timeout`) and upgrade the task urgency to `IMMEDIATE`. If we fail to perform the check, we fail the recovery.

Closes elastic#9575
  • Loading branch information
bleskes committed Feb 9, 2015
1 parent 2e5cea4 commit 5ba4a12
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 20 deletions.
Expand Up @@ -30,13 +30,13 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -230,21 +230,21 @@ public void run() {
} catch (Throwable e) {
final CorruptIndexException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrap(e, CorruptIndexException.class)) != null) {
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
CorruptIndexException current = corruptedEngine.get();
if (current != null || corruptedEngine.compareAndSet(null, corruptIndexException)) {
current = corruptedEngine.get();
assert current != null;
current.addSuppressed(e);
}

} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occured on recovery but checksums are ok", null);
exception.addSuppressed(e);
exceptions.add(0, exception); // last exception first
logger.warn("{} File corruption on recovery {} local checksum OK", corruptIndexException, shard.shardId(), md);
}
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
CorruptIndexException current = corruptedEngine.get();
if (current != null || corruptedEngine.compareAndSet(null, corruptIndexException)) {
current = corruptedEngine.get();
assert current != null;
current.addSuppressed(e);
}

} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occured on recovery but checksums are ok", null);
exception.addSuppressed(e);
exceptions.add(0, exception); // last exception first
logger.warn("{} File corruption on recovery {} local checksum OK", corruptIndexException, shard.shardId(), md);
}
} else {
exceptions.add(0, e); // last exceptions first
}
Expand Down Expand Up @@ -313,7 +313,16 @@ private void updateMappingOnMaster() {
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateUpdateTask() {
final AtomicReference<Throwable> mappingCheckException = new AtomicReference<>();
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() {


@Override
public TimeValue timeout() {
return internalActionTimeout;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
Expand All @@ -340,13 +349,17 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
});
try {
latch.await();
if (mappingCheckException.get() != null) {
logger.warn("error during mapping check, failing recovery", mappingCheckException.get());
throw new ElasticsearchException("error during mapping check", mappingCheckException.get());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
92 changes: 92 additions & 0 deletions src/test/java/org/elasticsearch/recovery/RelocationTests.java
Expand Up @@ -33,14 +33,19 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
Expand All @@ -51,10 +56,12 @@
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -417,4 +424,89 @@ public boolean apply(Object input) {
assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
}

@Test
@TestLogging("cluster.service:TRACE,indices.recovery:TRACE")
public void testRelocationWithBusyClusterUpdateThread() throws Exception {
final String indexName = "test";
final Settings settings = ImmutableSettings.builder()
.put("gateway.type", "local")
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s")
.put("indices.recovery.internal_action_timeout", "1s").build();
String master = internalCluster().startNode(settings);
ensureGreen();
List<String> nodes = internalCluster().startNodesAsync(2, settings).get();
final String node1 = nodes.get(0);
final String node2 = nodes.get(1);
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));


ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);


client().admin().indices().prepareCreate(indexName)
.setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();

List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
ensureSearchable(indexName);

// capture the incoming state indicate that the replicas have upgraded and assigned

final CountDownLatch allReplicasAssigned = new CountDownLatch(1);
final CountDownLatch releaseClusterState = new CountDownLatch(1);
try {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().routingNodes().hasUnassignedShards() == false) {
allReplicasAssigned.countDown();
try {
releaseClusterState.await();
} catch (InterruptedException e) {
//
}
}
}
});

logger.info("--> starting replica recovery");
// we don't expect this to be acknowledge by node1 where we block the cluster state thread
assertFalse(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1 + "," + node2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)

).setTimeout("200ms")
.get().isAcknowledged());

logger.info("--> waiting for node1 to process replica existence");
allReplicasAssigned.await();
logger.info("--> waiting for recovery to fail");
assertBusy(new Runnable() {
@Override
public void run() {
ClusterHealthResponse response = client().admin().cluster().prepareHealth().get();
assertThat(response.getUnassignedShards(), equalTo(1));
}
});
} finally {
logger.info("--> releasing cluster state update thread");
releaseClusterState.countDown();
}
logger.info("--> waiting for recovery to succeed");
// force a move.
client().admin().cluster().prepareReroute().get();
ensureGreen();
}

}

0 comments on commit 5ba4a12

Please sign in to comment.