Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort snapshots on a node that leaves the cluster #21084

Merged
merged 9 commits into from Oct 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -210,12 +210,9 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {


public static class ShardSnapshotStatus {
private State state;
private String nodeId;
private String reason;

private ShardSnapshotStatus() {
}
private final State state;
private final String nodeId;
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
Expand All @@ -231,6 +228,12 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {
this.reason = reason;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public State state() {
return state;
}
Expand All @@ -243,18 +246,6 @@ public String reason() {
return reason;
}

public static ShardSnapshotStatus readShardSnapshotStatus(StreamInput in) throws IOException {
ShardSnapshotStatus shardSnapshotStatus = new ShardSnapshotStatus();
shardSnapshotStatus.readFrom(in);
return shardSnapshotStatus;
}

public void readFrom(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
Expand Down Expand Up @@ -282,6 +273,11 @@ public int hashCode() {
result = 31 * result + (reason != null ? reason.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
}
}

public enum State {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -399,9 +400,7 @@ public Delta delta(DiscoveryNodes other) {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
for (DiscoveryNode node : this) {
sb.append(node).append(',');
}
sb.append(Strings.collectionToDelimitedString(this, ","));
sb.append("}");
return sb.toString();
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -411,7 +411,11 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
}
} finally {
try {
store.close();
if (store != null) {
store.close();
} else {
logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId);
}
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class IndexShardSnapshotStatus {
/**
* Snapshot stage
*/
public static enum Stage {
public enum Stage {
/**
* Snapshot hasn't started yet
*/
Expand Down Expand Up @@ -66,7 +66,7 @@ public static enum Stage {

private long indexVersion;

private boolean aborted;
private volatile boolean aborted;

private String failure;

Expand Down
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -113,10 +114,11 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService) {
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService);
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService,
snapshotShardsService);
}

// for tests
Expand All @@ -128,9 +130,10 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService) {
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) {
super(settings);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService,
snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down