Skip to content

Commit

Permalink
Fix Two Snapshot Clone State Machine Bugs (#65042)
Browse files Browse the repository at this point in the history
There are two separate but closely related bug fixes in this PR:
1. When two snapshot clones would initialize concurrently we could get into a state
where one is in front of the other in the queue snapshots array but its shard states
are in fact queued behind the other snapshot or clone. Tightly linked to this,
snapshot cloning would not account for snapshot deletes when queueing shard snapshots
which could lead to races where both delete and clone are running concurrently for a shard.
2. As a result of fixing the first issue and writing a test for it, it also became obvious
that a finished delete was not properly accounted for when it comes to starting snapshot
clones that could now queue behind a delete.
  • Loading branch information
original-brownbear committed Nov 17, 2020
1 parent fd675fd commit 12d412e
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.snapshots;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStatus;
Expand Down Expand Up @@ -313,6 +314,10 @@ public void testBackToBackClonesForIndexNotInCluster() throws Exception {

final int extraClones = randomIntBetween(1, 5);
final List<ActionFuture<AcknowledgedResponse>> extraCloneFutures = new ArrayList<>(extraClones);
final boolean slowInitClones = extraClones > 1 && randomBoolean();
if (slowInitClones) {
blockMasterOnReadIndexMeta(repoName);
}
for (int i = 0; i < extraClones; i++) {
extraCloneFutures.add(startClone(repoName, sourceSnapshot, "target-snapshot-" + i, indexBlocked));
}
Expand Down Expand Up @@ -571,6 +576,43 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw
assertEquals(getSnapshot(repoName, cloneName).state(), SnapshotState.SUCCESS);
}

public void testStartCloneDuringRunningDelete() throws Exception {
final String masterName = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");

final String indexName = "test-idx";
createIndexWithContent(indexName);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final List<String> snapshotNames = createNSnapshots(repoName, randomIntBetween(1, 5));
blockMasterOnWriteIndexFile(repoName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, randomFrom(snapshotNames));
waitForBlock(masterName, repoName);
awaitNDeletionsInProgress(1);

final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, "target-snapshot", indexName);
logger.info("--> waiting for snapshot clone to be fully initialized");
awaitClusterState(state -> {
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.clones().isEmpty() == false) {
assertEquals(sourceSnapshot, entry.source().getName());
for (ObjectCursor<SnapshotsInProgress.ShardSnapshotStatus> value : entry.clones().values()) {
assertSame(value.value, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED);
}
return true;
}
}
return false;
});
unblockNode(repoName, masterName);
assertAcked(deleteFuture.get());
assertAcked(cloneFuture.get());
}

private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
String... indices) {
return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1313,12 +1313,6 @@ private void corruptIndexN(Path repoPath, long generation) throws IOException {
Files.write(indexNBlob, randomByteArrayOfLength(1), StandardOpenOption.TRUNCATE_EXISTING);
}

private void awaitNDeletionsInProgress(int count) throws Exception {
logger.info("--> wait for [{}] deletions to show up in the cluster state", count);
awaitClusterState(state ->
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().size() == count);
}

private static List<SnapshotInfo> currentSnapshots(String repoName) {
return client().admin().cluster().prepareGetSnapshots(repoName).setSnapshots(GetSnapshotsRequest.CURRENT_SNAPSHOT)
.get().getSnapshots(repoName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -471,6 +472,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
}
builder.endArray();
if (isClone()) {
builder.field(SOURCE, source);
builder.startArray(CLONES);
{
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> shardEntry : clones) {
RepositoryShardId shardId = shardEntry.key;
ShardSnapshotStatus status = shardEntry.value;
builder.startObject();
{
builder.field(INDEX, shardId.index());
builder.field(SHARD, shardId.shardId());
builder.field(STATE, status.state());
builder.field(NODE, status.nodeId());
}
builder.endObject();
}
}
builder.endArray();
}
builder.array(DATA_STREAMS, dataStreams.toArray(new String[0]));
builder.endObject();
return builder;
Expand Down Expand Up @@ -692,13 +712,18 @@ public static State fromValue(byte value) {
private final List<Entry> entries;

private static boolean assertConsistentEntries(List<Entry> entries) {
final Map<String, Set<ShardId>> assignedShardsByRepo = new HashMap<>();
final Map<String, Set<Tuple<String, Integer>>> assignedShardsByRepo = new HashMap<>();
final Map<String, Set<Tuple<String, Integer>>> queuedShardsByRepo = new HashMap<>();
for (Entry entry : entries) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
if (shard.value.isActive()) {
assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(shard.key) :
"Found duplicate shard assignments in " + entries;
}
final ShardId sid = shard.key;
assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.getIndexName(), sid.id(),
shard.value);
}
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> shard : entry.clones()) {
final RepositoryShardId sid = shard.key;
assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.indexName(), sid.shardId(),
shard.value);
}
}
for (String repoName : assignedShardsByRepo.keySet()) {
Expand All @@ -708,6 +733,21 @@ private static boolean assertConsistentEntries(List<Entry> entries) {
return true;
}

private static boolean assertShardStateConsistent(List<Entry> entries, Map<String, Set<Tuple<String, Integer>>> assignedShardsByRepo,
Map<String, Set<Tuple<String, Integer>>> queuedShardsByRepo, Entry entry,
String indexName, int shardId, ShardSnapshotStatus shardSnapshotStatus) {
if (shardSnapshotStatus.isActive()) {
Tuple<String, Integer> plainShardId = Tuple.tuple(indexName, shardId);
assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>())
.add(plainShardId) : "Found duplicate shard assignments in " + entries;
assert queuedShardsByRepo.getOrDefault(entry.repository(), Collections.emptySet()).contains(plainShardId) == false
: "Found active shard assignments after queued shard assignments in " + entries;
} else if (shardSnapshotStatus.state() == ShardState.QUEUED) {
queuedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(Tuple.tuple(indexName, shardId));
}
return true;
}

public static SnapshotsInProgress of(List<Entry> entries) {
if (entries.isEmpty()) {
return EMPTY;
Expand Down Expand Up @@ -766,6 +806,8 @@ public void writeTo(StreamOutput out) throws IOException {
private static final String STATE = "state";
private static final String INDICES = "indices";
private static final String DATA_STREAMS = "data_streams";
private static final String SOURCE = "source";
private static final String CLONES = "clones";
private static final String START_TIME_MILLIS = "start_time_millis";
private static final String START_TIME = "start_time";
private static final String REPOSITORY_STATE_ID = "repository_state_id";
Expand Down

0 comments on commit 12d412e

Please sign in to comment.