Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making snapshot deletes distributed across data nodes. #39656

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@

package org.elasticsearch.cluster;

import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;

import java.io.IOException;
Expand All @@ -48,7 +55,7 @@ public SnapshotDeletionsInProgress() {
this(Collections.emptyList());
}

private SnapshotDeletionsInProgress(List<Entry> entries) {
public SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = Collections.unmodifiableList(entries);
}

Expand Down Expand Up @@ -99,6 +106,16 @@ public boolean hasDeletionsInProgress() {
return entries.isEmpty() == false;
}

public Entry getEntry(final Snapshot snapshot) {
for (Entry entry : entries) {
final Snapshot curr = entry.getSnapshot();
if (curr.equals(snapshot)) {
return entry;
}
}
return null;
}

@Override
public String getWriteableName() {
return TYPE;
Expand Down Expand Up @@ -146,6 +163,32 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("snapshot", entry.snapshot.getSnapshotId().getName());
builder.humanReadableField("start_time_millis", "start_time", new TimeValue(entry.startTime));
builder.field("repository_state_id", entry.repositoryStateId);
builder.field("version", entry.version);
builder.startArray("indices_to_cleanup");
{
for (IndexId index : entry.indicesToCleanup) {
index.toXContent(builder, params);
}
}
builder.endArray();
builder.startArray("shards");
{
for (ObjectObjectCursor<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> shardEntry : entry.shards) {
ShardId shardId = shardEntry.key.v1();
String snapshotIndexId = shardEntry.key.v2();
ShardSnapshotDeletionStatus status = shardEntry.value;
builder.startObject();
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("snapshot_index_id", snapshotIndexId);
builder.field("state", status.state());
builder.field("node", status.nodeId());
}
builder.endObject();
}
}
builder.endArray();
}
builder.endObject();
}
Expand All @@ -172,17 +215,57 @@ public static final class Entry implements Writeable {
private final Snapshot snapshot;
private final long startTime;
private final long repositoryStateId;
private final int version;
private final List<IndexId> indicesToCleanup;
private final ImmutableOpenMap<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> shards;

public Entry(Snapshot snapshot, long startTime, long repositoryStateId) {
this(snapshot, startTime, repositoryStateId, 0, null, null);
}

public Entry(final Snapshot snapshot,
final long startTime,
final long repositoryStateId,
final int version,
final List<IndexId> indicesToCleanup,
final ImmutableOpenMap<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> shards) {
this.snapshot = snapshot;
this.startTime = startTime;
this.repositoryStateId = repositoryStateId;
this.version = version;
this.indicesToCleanup = indicesToCleanup == null ? Collections.emptyList() : indicesToCleanup;
this.shards = shards == null ? ImmutableOpenMap.of() : shards;
}

public Entry(StreamInput in) throws IOException {
this.snapshot = new Snapshot(in);
this.startTime = in.readVLong();
this.repositoryStateId = in.readLong();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.version = in.readInt();
int indices = in.readVInt();
List<IndexId> indexBuilder = new ArrayList<>();
for (int i = 0; i < indices; i++) {
indexBuilder.add(new IndexId(in.readString(), in.readString()));
}
this.indicesToCleanup = indexBuilder;
ImmutableOpenMap.Builder<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> builder = ImmutableOpenMap.builder();
int shards = in.readVInt();
for (int i = 0; i < shards; i++) {
ShardId shardId = ShardId.readShardId(in);
String indexId = in.readString();
builder.put(new Tuple<>(shardId, indexId), new ShardSnapshotDeletionStatus(in));
}
this.shards = builder.build();
} else {
this.version = 0;
this.indicesToCleanup = Collections.emptyList();
this.shards = ImmutableOpenMap.of();
}
}

public Entry(Entry entry, ImmutableOpenMap<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> shards) {
this(entry.snapshot, entry.startTime, entry.repositoryStateId, entry.version, entry.indicesToCleanup, shards);
}

/**
Expand All @@ -199,13 +282,25 @@ public long getStartTime() {
return startTime;
}

public int getVersion() {
return version;
}

/**
* The repository state id at the time the snapshot deletion began.
*/
public long getRepositoryStateId() {
return repositoryStateId;
}

public List<IndexId> getIndicesToCleanup() {
return indicesToCleanup;
}

public ImmutableOpenMap<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> getShards() {
return shards;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -217,19 +312,166 @@ public boolean equals(Object o) {
Entry that = (Entry) o;
return snapshot.equals(that.snapshot)
&& startTime == that.startTime
&& repositoryStateId == that.repositoryStateId;
&& repositoryStateId == that.repositoryStateId
&& version == that.version
&& indicesToCleanup.equals(that.indicesToCleanup)
&& shards.equals(that.shards);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, startTime, repositoryStateId);
return Objects.hash(snapshot, startTime, repositoryStateId, version, indicesToCleanup, shards);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
out.writeVLong(startTime);
out.writeLong(repositoryStateId);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeInt(version);
out.writeVInt(indicesToCleanup.size());
for (IndexId index : indicesToCleanup) {
index.writeTo(out);
}
out.writeVInt(shards.size());
for (ObjectObjectCursor<Tuple<ShardId, String>, ShardSnapshotDeletionStatus> shardEntry : shards) {
shardEntry.key.v1().writeTo(out);
out.writeString(shardEntry.key.v2());
shardEntry.value.writeTo(out);
}
}
}
}

/**
* Checks if all shard deletions in the list have completed
*
* @param shards list of shard deletion statuses
* @return true if all shard deletions have completed (either successfully or failed), false otherwise
*/
public static boolean completed(ObjectContainer<ShardSnapshotDeletionStatus> shards) {
for (ObjectCursor<ShardSnapshotDeletionStatus> status : shards) {
if (status.value.state().completed() == false) {
return false;
}
}
return true;
}

public static class ShardSnapshotDeletionStatus {
private final State state;
private final String nodeId;
private final String reason;

public ShardSnapshotDeletionStatus(String nodeId, State state) {
this(nodeId, state, null);
}

public ShardSnapshotDeletionStatus(String nodeId, State state, String reason) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
}

public ShardSnapshotDeletionStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public State state() {
return state;
}

public String nodeId() {
return nodeId;
}

public String reason() {
return reason;
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
out.writeOptionalString(reason);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ShardSnapshotDeletionStatus status = (ShardSnapshotDeletionStatus) o;

if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false;
if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false;
if (state != status.state) return false;

return true;
}

@Override
public int hashCode() {
int result = state != null ? state.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (reason != null ? reason.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotDeletionStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
}
}

public enum State {
INIT((byte) 0, false, false),
STARTED((byte) 1, false, false),
SUCCESS((byte) 2, true, false),
FAILED((byte) 3, true, true);

private byte value;

private boolean completed;

private boolean failed;

State(byte value, boolean completed, boolean failed) {
this.value = value;
this.completed = completed;
this.failed = failed;
}

public byte value() {
return value;
}

public boolean completed() {
return completed;
}

public boolean failed() {
return failed;
}

public static State fromValue(byte value) {
switch (value) {
case 0:
return INIT;
case 1:
return STARTED;
case 2:
return SUCCESS;
case 3:
return FAILED;
default:
throw new IllegalArgumentException("No snapshot deletion state for value [" + value + "]");
}
}
}

}
Loading