Skip to content

Commit

Permalink
Fix Concurrent Snapshot Ending And Stabilize Snapshot Finalization (e…
Browse files Browse the repository at this point in the history
…lastic#38368)

* The problem in elastic#38226 is that in some corner cases multiple calls to `endSnapshot` were made concurrently, leading to non-deterministic behavior (`beginSnapshot` was triggering a repository finalization while one that was triggered by a `deleteSnapshot` was already in progress)
   * Fixed by:
      * Making all `endSnapshot` calls originate from the cluster state being in a "completed" state (apart from on short-circuit on initializing an empty snapshot). This forced putting the failure string into `SnapshotsInProgress.Entry`.
      * Adding deduplication logic to `endSnapshot`
* Also:
  * Streamlined the init behavior to work the same way (keep state on the `SnapshotsService` to decide which snapshot entries are stale)
* closes elastic#38226
  • Loading branch information
original-brownbear authored and dimitris-athanasiou committed Feb 12, 2019
1 parent 678d6d5 commit 5931eab
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 258 deletions.
Expand Up @@ -24,6 +24,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -87,9 +88,11 @@ public static class Entry {
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
@Nullable private final String failure;

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
Expand All @@ -104,15 +107,26 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
this.waitingIndices = findWaitingIndices(shards);
}
this.repositoryStateId = repositoryStateId;
this.failure = failure;
}

public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards);
entry.repositoryStateId, shards, entry.failure);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure);
}

public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards);
this(entry, entry.state, shards, entry.failure);
}

public Snapshot snapshot() {
Expand Down Expand Up @@ -151,6 +165,10 @@ public long getRepositoryStateId() {
return repositoryStateId;
}

public String failure() {
return failure;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -427,14 +445,21 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
}
}
long repositoryStateId = in.readLong();
final String failure;
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
failure = in.readOptionalString();
} else {
failure = null;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
state,
Collections.unmodifiableList(indexBuilder),
startTime,
repositoryStateId,
builder.build());
builder.build(),
failure);
}
this.entries = Arrays.asList(entries);
}
Expand Down Expand Up @@ -463,6 +488,9 @@ public void writeTo(StreamOutput out) throws IOException {
}
}
out.writeLong(entry.repositoryStateId);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(entry.failure);
}
}
}

Expand Down
Expand Up @@ -591,8 +591,6 @@ private class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIn
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
// Finalize snapshot in the repository
snapshotsService.endSnapshot(updatedEntry);
}
} else {
entries.add(entry);
Expand Down

0 comments on commit 5931eab

Please sign in to comment.