Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel recovery if shard on the target node closes during recovery operation #6645

Merged
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -378,14 +378,7 @@ public String executor() {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());

onGoingRecovery.indexShard.performRecoveryPrepareForTranslog();
onGoingRecovery.stage(RecoveryState.Stage.TRANSLOG);
@@ -409,14 +402,7 @@ public String executor() {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());

onGoingRecovery.stage(RecoveryState.Stage.FINALIZE);
onGoingRecovery.indexShard.performRecoveryFinalization(false, onGoingRecovery);
@@ -442,21 +428,11 @@ public String executor() {
@Override
public void messageReceived(RecoveryTranslogOperationsRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());

InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
for (Translog.Operation operation : request.operations()) {
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());
shard.performRecoveryOperation(operation);
onGoingRecovery.recoveryState.getTranslog().incrementTranslogOperations();
}
@@ -479,14 +455,7 @@ public String executor() {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());

onGoingRecovery.recoveryState().getIndex().addFileDetails(request.phase1FileNames, request.phase1FileSizes);
onGoingRecovery.recoveryState().getIndex().addReusedFileDetails(request.phase1ExistingFileNames, request.phase1ExistingFileSizes);
@@ -513,14 +482,7 @@ public String executor() {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());

final Store store = onGoingRecovery.indexShard.store();
store.incRef();
@@ -586,14 +548,7 @@ public String executor() {
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
RecoveryStatus onGoingRecovery = onGoingRecoveries.get(request.recoveryId());
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(request.shardId());
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(request.shardId());
}
validateRecoveryStatus(onGoingRecovery, request.shardId());

Store store = onGoingRecovery.indexShard.store();
store.incRef();
@@ -671,4 +626,20 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
}
}
}

private void validateRecoveryStatus(RecoveryStatus onGoingRecovery, ShardId shardId) {
if (onGoingRecovery == null) {
// shard is getting closed on us
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.indexShard.state() == IndexShardState.CLOSED) {
cancelRecovery(onGoingRecovery.indexShard);
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}
if (onGoingRecovery.isCanceled()) {
onGoingRecovery.sentCanceledToSource = true;
throw new IndexShardClosedException(shardId);
}
}
}
@@ -21,22 +21,31 @@

import com.carrotsearch.hppc.IntOpenHashSet;
import com.carrotsearch.hppc.procedures.IntProcedure;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.BackgroundIndexer;
@@ -52,8 +61,10 @@
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

/**
*/
@@ -321,4 +332,89 @@ public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardSt
}
}

@Test
public void testMoveShardsWhileRelocation() throws Exception {
final String indexName = "test";

ListenableFuture<String> blueFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "blue").build());
internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "green").build());
ListenableFuture<String> redFuture = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.color", "red").build());

ClusterHealthResponse response = client().admin().cluster().prepareHealth().setWaitForNodes(">=3").get();
assertThat(response.isTimedOut(), is(false));

String blueNodeName = blueFuture.get();
final String redNodeName = redFuture.get();

client().admin().indices().prepareCreate(indexName)
.setSettings(
ImmutableSettings.builder()
.put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "blue")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
).get();

List<IndexRequestBuilder> requests = new ArrayList<>();
int numDocs = scaledRandomIntBetween(25, 250);
for (int i = 0; i < numDocs; i++) {
requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
}
indexRandom(true, requests);
ensureSearchable(indexName);

ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
String blueNodeId = internalCluster().getInstance(DiscoveryService.class, blueNodeName).localNode().id();

assertFalse(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());

SearchResponse searchResponse = client().prepareSearch(indexName).get();
assertHitCount(searchResponse, numDocs);

// Slow down recovery in order to make recovery cancellations more likely
IndicesStatsResponse statsResponse = client().admin().indices().prepareStats(indexName).get();
long chunkSize = statsResponse.getIndex(indexName).getShards()[0].getStats().getStore().size().bytes() / 10;
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
// one chunk per sec..
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize)
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize)
)
.get().isAcknowledged());

client().admin().indices().prepareUpdateSettings(indexName).setSettings(
ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "red")
).get();

// Lets wait a bit and then move again to hopefully trigger recovery cancellations.
boolean applied = awaitBusy(

This comment has been minimized.

Copy link
@bleskes

bleskes Jul 1, 2014

Member

I think this may miss a recovery if it is completed already. I think you should look for a recovery on the red node and not care if it's active or not. Also, maybe slow down the recovery as well? see - https://github.com/elasticsearch/elasticsearch/blob/c907ce325e89b25d0ff29288f018adda35bbdeed/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryTests.java#L104

new Predicate<Object>() {
@Override
public boolean apply(Object input) {
RecoveryResponse recoveryResponse = internalCluster().client(redNodeName).admin().indices().prepareRecoveries(indexName)
.get();
return !recoveryResponse.shardResponses().get(indexName).isEmpty();
}
}
);
assertTrue(applied);
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
ImmutableSettings.builder().put(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "color", "green")
).get();

// Restore the recovery speed to not timeout cluster health call
assertTrue(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb")
.put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb")
)
.get().isAcknowledged());

// this also waits for all ongoing recoveries to complete:
ensureSearchable(indexName);
searchResponse = client().prepareSearch(indexName).get();
assertHitCount(searchResponse, numDocs);

stateResponse = client().admin().cluster().prepareState().get();
assertTrue(stateResponse.getState().readOnlyRoutingNodes().node(blueNodeId).isEmpty());
}

}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.