Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a timeout to local mapping change check #9575

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.cluster;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;

/**
Expand All @@ -36,7 +35,7 @@ abstract public class ClusterStateUpdateTask {
/**
* A callback called when execute fails.
*/
abstract public void onFailure(String source, @Nullable Throwable t);
abstract public void onFailure(String source, Throwable t);


/**
Expand Down
Expand Up @@ -30,13 +30,13 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateNonMasterUpdateTask;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand Down Expand Up @@ -230,18 +230,18 @@ public void run() {
} catch (Throwable e) {
final CorruptIndexException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrap(e, CorruptIndexException.class)) != null) {
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
// if we are not the first exception, add ourselves as suppressed to the main one:
corruptedEngine.get().addSuppressed(e);
}
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occured on recovery but checksums are ok", null);
exception.addSuppressed(e);
exceptions.add(0, exception); // last exception first
logger.warn("{} File corruption on recovery {} local checksum OK", corruptIndexException, shard.shardId(), md);
}
if (store.checkIntegrity(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
// if we are not the first exception, add ourselves as suppressed to the main one:
corruptedEngine.get().addSuppressed(e);
}
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occured on recovery but checksums are ok", null);
exception.addSuppressed(e);
exceptions.add(0, exception); // last exception first
logger.warn("{} File corruption on recovery {} local checksum OK", corruptIndexException, shard.shardId(), md);
}
} else {
exceptions.add(0, e); // last exceptions first
}
Expand Down Expand Up @@ -310,7 +310,20 @@ private void updateMappingOnMaster() {
// while we're checking
final BlockingQueue<DocumentMapper> documentMappersToUpdate = ConcurrentCollections.newBlockingQueue();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("recovery_mapping_check", new ProcessedClusterStateNonMasterUpdateTask() {
final AtomicReference<Throwable> mappingCheckException = new AtomicReference<>();
// we use immediate as this is a very light weight check and we don't wait to delay recovery
clusterService.submitStateUpdateTask("recovery_mapping_check", Priority.IMMEDIATE, new TimeoutClusterStateUpdateTask() {

@Override
public boolean runOnlyOnMaster() {
return false;
}

@Override
public TimeValue timeout() {
return internalActionTimeout;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
Expand All @@ -337,13 +350,17 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}

@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("unexpected error while checking for pending mapping changes", t);
public void onFailure(String source, Throwable t) {
mappingCheckException.set(t);
latch.countDown();
}
});
try {
latch.await();
if (mappingCheckException.get() != null) {
logger.warn("error during mapping check, failing recovery", mappingCheckException.get());
throw new ElasticsearchException("error during mapping check", mappingCheckException.get());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
94 changes: 93 additions & 1 deletion src/test/java/org/elasticsearch/recovery/RelocationTests.java
Expand Up @@ -33,14 +33,19 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
Expand All @@ -51,10 +56,12 @@
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -417,4 +424,89 @@ public boolean apply(Object input) {
assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
}

}
@Test
@TestLogging("cluster.service:TRACE,indices.recovery:TRACE")
public void testRelocationWithBusyClusterUpdateThread() throws Exception {
final String indexName = "test";
final Settings settings = ImmutableSettings.builder()
.put("gateway.type", "local")
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s")
.put("indices.recovery.internal_action_timeout", "1s").build();
String master = internalCluster().startNode(settings);
ensureGreen();
List<String> nodes = internalCluster().startNodesAsync(2, settings).get();
final String node1 = nodes.get(0);
final String node2 = nodes.get(1);
ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
assertThat(response.isTimedOut(), is(false));


ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node1);


client().admin().indices().prepareCreate(indexName)
.setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();

List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
ensureSearchable(indexName);

// capture the incoming state indicate that the replicas have upgraded and assigned

final CountDownLatch allReplicasAssigned = new CountDownLatch(1);
final CountDownLatch releaseClusterState = new CountDownLatch(1);
try {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().routingNodes().hasUnassignedShards() == false) {
allReplicasAssigned.countDown();
try {
releaseClusterState.await();
} catch (InterruptedException e) {
//
}
}
}
});

logger.info("--> starting replica recovery");
// we don't expect this to be acknowledge by node1 where we block the cluster state thread
assertFalse(client().admin().indices().prepareUpdateSettings(indexName)
.setSettings(ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "_name", node1 + "," + node2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)

).setTimeout("200ms")
.get().isAcknowledged());

logger.info("--> waiting for node1 to process replica existence");
allReplicasAssigned.await();
logger.info("--> waiting for recovery to fail");
assertBusy(new Runnable() {
@Override
public void run() {
ClusterHealthResponse response = client().admin().cluster().prepareHealth().get();
assertThat(response.getUnassignedShards(), equalTo(1));
}
});
} finally {
logger.info("--> releasing cluster state update thread");
releaseClusterState.countDown();
}
logger.info("--> waiting for recovery to succeed");
// force a move.
client().admin().cluster().prepareReroute().get();
ensureGreen();
}

}