Skip to content

Commit

Permalink
Introduce ShardState Enum + Slight Cleanup SnapshotsInProgress (#41940)
Browse files Browse the repository at this point in the history
* Added separate enum for the state of each shard, it was really
confusing that we used the same enum for the state of the snapshot
overall and the state of each individual shard
   * relates #40943 (comment)
* Shortened some obvious spots in equals method and saved a few lines
via `computeIfAbsent` to make up for adding 50 new lines to this class
  • Loading branch information
original-brownbear committed May 22, 2019
1 parent 464f769 commit 2ddd39a
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
break;
case INIT:
case WAITING:
case STARTED:
stage = SnapshotIndexShardStage.STARTED;
break;
case SUCCESS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Meta data about snapshots that are currently executing
Expand All @@ -53,12 +54,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SnapshotsInProgress that = (SnapshotsInProgress) o;

if (!entries.equals(that.entries)) return false;

return true;
return entries.equals(((SnapshotsInProgress) o).entries);
}

@Override
Expand Down Expand Up @@ -208,18 +204,11 @@ public String toString() {
return snapshot.toString();
}

// package private for testing
ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.WAITING) {
final String indexName = entry.key.getIndexName();
List<ShardId> waitingShards = waitingIndicesMap.get(indexName);
if (waitingShards == null) {
waitingShards = new ArrayList<>();
waitingIndicesMap.put(indexName, waitingShards);
}
waitingShards.add(entry.key);
if (entry.value.state() == ShardState.WAITING) {
waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key);
}
}
if (waitingIndicesMap.isEmpty()) {
Expand All @@ -241,28 +230,27 @@ ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<Shar
*/
public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
for (ObjectCursor<ShardSnapshotStatus> status : shards) {
if (status.value.state().completed() == false) {
if (status.value.state().completed == false) {
return false;
}
}
return true;
}


public static class ShardSnapshotStatus {
private final State state;
private final ShardState state;
private final String nodeId;
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
this(nodeId, ShardState.INIT);
}

public ShardSnapshotStatus(String nodeId, State state) {
public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
}

public ShardSnapshotStatus(String nodeId, State state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
Expand All @@ -272,11 +260,11 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
state = ShardState.fromValue(in.readByte());
reason = in.readOptionalString();
}

public State state() {
public ShardState state() {
return state;
}

Expand All @@ -298,14 +286,9 @@ public void writeTo(StreamOutput out) throws IOException {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;

if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false;
if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false;
if (state != status.state) return false;

return true;
}

@Override
Expand All @@ -331,11 +314,11 @@ public enum State {
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);

private byte value;
private final byte value;

private boolean completed;
private final boolean completed;

private boolean failed;
private final boolean failed;

State(byte value, boolean completed, boolean failed) {
this.value = value;
Expand Down Expand Up @@ -379,7 +362,6 @@ public static State fromValue(byte value) {

private final List<Entry> entries;


public SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
}
Expand Down Expand Up @@ -534,4 +516,52 @@ public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params p
builder.endArray();
builder.endObject();
}

public enum ShardState {
INIT((byte) 0, false, false),
SUCCESS((byte) 2, true, false),
FAILED((byte) 3, true, true),
ABORTED((byte) 4, false, true),
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);

private final byte value;

private final boolean completed;

private final boolean failed;

ShardState(byte value, boolean completed, boolean failed) {
this.value = value;
this.completed = completed;
this.failed = failed;
}

public boolean completed() {
return completed;
}

public boolean failed() {
return failed;
}

public static ShardState fromValue(byte value) {
switch (value) {
case 0:
return INIT;
case 2:
return SUCCESS;
case 3:
return FAILED;
case 4:
return ABORTED;
case 5:
return MISSING;
case 6:
return WAITING;
default:
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -248,7 +249,8 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
// Add all new shards to start processing on
final ShardId shardId = shard.key;
final ShardSnapshotStatus shardSnapshotStatus = shard.value;
if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT
if (localNodeId.equals(shardSnapshotStatus.nodeId())
&& shardSnapshotStatus.state() == ShardState.INIT
&& snapshotShards.containsKey(shardId) == false) {
logger.trace("[{}] - Adding shard to the queue", shardId);
if (startedShards == null) {
Expand Down Expand Up @@ -286,7 +288,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
} else {
// due to CS batching we might have missed the INIT state and straight went into ABORTED
// notify master that abort has completed by moving to FAILED
if (shard.value.state() == State.ABORTED) {
if (shard.value.state() == ShardState.ABORTED) {
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason());
}
}
Expand Down Expand Up @@ -480,12 +482,14 @@ public String toString() {

/** Notify the master node that the given shard has been successfully snapshotted **/
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) {
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS));
sendSnapshotShardUpdate(snapshot, shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS));
}

/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure));
sendSnapshotShardUpdate(snapshot, shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure));
}

/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -776,7 +777,7 @@ public ClusterState execute(ClusterState currentState) {
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
shardEntry.key, shardStatus.nodeId());
shards.put(shardEntry.key,
new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
}
}
}
Expand Down Expand Up @@ -872,7 +873,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards) {
ShardSnapshotStatus shardStatus = shardEntry.value;
ShardId shardId = shardEntry.key;
if (shardStatus.state() == State.WAITING) {
if (shardStatus.state() == ShardState.WAITING) {
IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex());
if (indexShardRoutingTable != null) {
IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id());
Expand All @@ -893,7 +894,7 @@ private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShar
// Shard that we were waiting for went into unassigned state or disappeared - giving up
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned"));
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned"));
} else {
shards.put(shardId, shardStatus);
}
Expand Down Expand Up @@ -943,7 +944,7 @@ private static Tuple<Set<String>, Set<String>> indicesWithMissingShards(
Set<String> missing = new HashSet<>();
Set<String> closed = new HashSet<>();
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.MISSING) {
if (entry.value.state() == ShardState.MISSING) {
if (metaData.hasIndex(entry.key.getIndex().getName()) &&
metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) {
closed.add(entry.key.getIndex().getName());
Expand Down Expand Up @@ -1195,7 +1196,7 @@ public ClusterState execute(ClusterState currentState) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion");
}
shardsBuilder.put(shardEntry.key, status);
}
Expand Down Expand Up @@ -1385,7 +1386,7 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
if (indexMetaData == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index"));
} else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
Expand All @@ -1394,18 +1395,18 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
if (primary == null || !primary.assignedToNode()) {
builder.put(shardId,
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated"));
} else if (primary.relocating() || primary.initializing()) {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING));
} else if (!primary.started()) {
builder.put(shardId,
new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING,
new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
"primary shard hasn't been started yet"));
} else {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId()));
}
} else {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING,
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING,
"missing routing table"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -55,11 +56,11 @@ public void testWaitingIndices() {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();

// test more than one waiting shard in an index
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
// test exactly one waiting shard in an index
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
// test no waiting shards in an index
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
Expand All @@ -72,7 +73,7 @@ public void testWaitingIndices() {
assertFalse(waitingIndices.containsKey(idx3Name));
}

private State randomNonWaitingState() {
return randomFrom(Arrays.stream(State.values()).filter(s -> s != State.WAITING).collect(Collectors.toSet()));
private ShardState randomNonWaitingState() {
return randomFrom(Arrays.stream(ShardState.values()).filter(s -> s != ShardState.WAITING).collect(Collectors.toSet()));
}
}
Loading

0 comments on commit 2ddd39a

Please sign in to comment.