Skip to content

Commit

Permalink
Snapshot/Restore: delete operation should ignore finalizing shards on…
Browse files Browse the repository at this point in the history
… nodes that no longer exist

Related to #9924
  • Loading branch information
imotov committed Mar 11, 2015
1 parent 2c4aaa0 commit 55f2a54
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 13 deletions.
26 changes: 20 additions & 6 deletions src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Expand Up @@ -24,7 +24,6 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
Expand Down Expand Up @@ -747,8 +746,7 @@ private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) {
return true;
}
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards().entrySet()) {
ShardSnapshotStatus shardStatus = shardEntry.getValue();
for (ShardSnapshotStatus shardStatus : snapshot.shards().values()) {
if (!shardStatus.state().completed() && node.getId().equals(shardStatus.nodeId())) {
// At least one shard was running on the removed node - we need to fail it
return true;
Expand Down Expand Up @@ -1121,9 +1119,25 @@ public ClusterState execute(ClusterState currentState) throws Exception {
shards = snapshot.shards();
endSnapshot(snapshot);
} else {
// snapshot is being finalized - wait for it
logger.trace("trying to delete completed snapshot - save to delete");
return currentState;
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
for (ShardSnapshotStatus shardStatus : snapshot.shards().values()) {
// Check if we still have shard running on existing nodes
if (shardStatus.state().completed() == false && shardStatus.nodeId() != null && currentState.nodes().get(shardStatus.nodeId()) != null) {
hasUncompletedShards = true;
break;
}
}
if (hasUncompletedShards) {
// snapshot is being finalized - wait for shards to complete finalization process
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
return currentState;
} else {
// no shards to wait for - finish the snapshot
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
shards = snapshot.shards();
endSnapshot(snapshot);
}
}
SnapshotMetaData.Entry newSnapshot = new SnapshotMetaData.Entry(snapshot, State.ABORTED, shards);
snapshots = new SnapshotMetaData(newSnapshot);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
Expand All @@ -39,31 +40,32 @@
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.*;
import org.elasticsearch.cluster.metadata.SnapshotMetaData.State;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.mockstore.MockRepositoryModule;
import org.junit.Test;

import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -882,7 +884,7 @@ public void snapshotClosedIndexTest() throws Exception {
logger.info("--> closing index test-idx-closed");
assertAcked(client.admin().indices().prepareClose("test-idx-closed"));
ClusterStateResponse stateResponse = client.admin().cluster().prepareState().get();
assertThat(stateResponse.getState().metaData().index("test-idx-closed").state(), equalTo(State.CLOSE));
assertThat(stateResponse.getState().metaData().index("test-idx-closed").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.getState().routingTable().index("test-idx-closed"), nullValue());

logger.info("--> snapshot");
Expand Down Expand Up @@ -1665,6 +1667,67 @@ public void deleteIndexDuringSnapshotTest() throws Exception {
}
}


@Test
public void deleteOrphanSnapshotTest() throws Exception {
Client client = client();

logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDirPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000))
));

createIndex("test-idx");
ensureGreen();

ClusterService clusterService = internalCluster().getInstance(ClusterService.class, internalCluster().getMasterName());

final CountDownLatch countDownLatch = new CountDownLatch(1);

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

logger.info("--> emulate an orphan snapshot");

clusterService.submitStateUpdateTask("orphan snapshot test", new ProcessedClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
// Simulate orphan snapshot
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
shards.put(new ShardId("test-idx", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId("test-idx", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId("test-idx", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));
ImmutableList.Builder<Entry> entries = ImmutableList.builder();
entries.add(new Entry(new SnapshotId("test-repo", "test-snap"), true, State.ABORTED, ImmutableList.of("test-idx"), System.currentTimeMillis(), shards.build()));
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
mdBuilder.putCustom(SnapshotMetaData.TYPE, new SnapshotMetaData(entries.build()));
return ClusterState.builder(currentState).metaData(mdBuilder).build();
}

@Override
public void onFailure(String source, Throwable t) {
fail();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
countDownLatch.countDown();
}
});

countDownLatch.await();
logger.info("--> try deleting the orphan snapshot");

assertAcked(client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").get("10s"));

}

private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException {
return awaitBusy(new Predicate<Object>() {
@Override
Expand Down

0 comments on commit 55f2a54

Please sign in to comment.