Skip to content

Commit

Permalink
Improve support for partial snapshots
Browse files Browse the repository at this point in the history
Fixes elastic#4701. Changes behavior of the snapshot operation. The operation now fails if not all primary shards are available at the beginning of the snapshot operation. The restore operation no longer tries to restore indices with shards that failed or were missing during snapshot operation.
  • Loading branch information
imotov authored and brusic committed Jan 19, 2014
1 parent 4e58d74 commit 15a0190
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 74 deletions.
3 changes: 2 additions & 1 deletion docs/reference/modules/snapshots.asciidoc
Expand Up @@ -101,7 +101,8 @@ supports <<search-multi-index-type,multi index syntax>>. The snapshot request al
`ignore_unavailable` option. Setting it to `true` will cause indices that do not exists to be ignored during snapshot
creation. By default, when `ignore_unavailable` option is not set and an index is missing the snapshot request will fail.
By setting `include_global_state` to false it's possible to prevent the cluster global state to be stored as part of
the snapshot.
the snapshot. By default, entire snapshot will fail if one or more indices participating in the snapshot don't have
all primary shards available. This behaviour can be changed by setting `partial` to `true`.

The index snapshot process is incremental. In the process of making the index snapshot Elasticsearch analyses
the list of the index files that are already stored in the repository and copies only files that were created or
Expand Down
Expand Up @@ -45,6 +45,7 @@
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;

/**
* Create snapshot request
Expand All @@ -70,6 +71,8 @@ public class CreateSnapshotRequest extends MasterNodeOperationRequest<CreateSnap

private IndicesOptions indicesOptions = IndicesOptions.strict();

private boolean partial = false;

private Settings settings = EMPTY_SETTINGS;

private boolean includeGlobalState = true;
Expand Down Expand Up @@ -187,7 +190,7 @@ public CreateSnapshotRequest indices(List<String> indices) {
}

/**
* Retuns a list of indices that should be included into the snapshot
* Returns a list of indices that should be included into the snapshot
*
* @return list of indices
*/
Expand Down Expand Up @@ -215,6 +218,27 @@ public CreateSnapshotRequest indicesOptions(IndicesOptions indicesOptions) {
return this;
}


/**
* Returns true if indices with unavailable shards should be be partially snapshotted.
*
* @return the desired behaviour regarding indices options
*/
public boolean partial() {
return partial;
}

/**
* Set to true to allow indices with unavailable shards to be partially snapshotted.
*
* @param partial true if indices with unavailable shards should be be partially snapshotted.
* @return this request
*/
public CreateSnapshotRequest partial(boolean partial) {
this.partial = partial;
return this;
}

/**
* If set to true the request should wait for the snapshot completion before returning.
*
Expand Down Expand Up @@ -315,6 +339,7 @@ public CreateSnapshotRequest includeGlobalState(boolean includeGlobalState) {

/**
* Returns true if global state should be stored as part of the snapshot
*
* @return true if global state should be stored as part of the snapshot
*/
public boolean includeGlobalState() {
Expand Down Expand Up @@ -353,17 +378,15 @@ public CreateSnapshotRequest source(Map source) {
throw new ElasticsearchIllegalArgumentException("malformed indices section, should be an array of strings");
}
} else if (name.equals("ignore_unavailable") || name.equals("ignoreUnavailable")) {
assert entry.getValue() instanceof String;
ignoreUnavailable = Boolean.valueOf(entry.getValue().toString());
ignoreUnavailable = nodeBooleanValue(entry.getValue());
} else if (name.equals("allow_no_indices") || name.equals("allowNoIndices")) {
assert entry.getValue() instanceof String;
allowNoIndices = Boolean.valueOf(entry.getValue().toString());
allowNoIndices = nodeBooleanValue(entry.getValue());
} else if (name.equals("expand_wildcards_open") || name.equals("expandWildcardsOpen")) {
assert entry.getValue() instanceof String;
expandWildcardsOpen = Boolean.valueOf(entry.getValue().toString());
expandWildcardsOpen = nodeBooleanValue(entry.getValue());
} else if (name.equals("expand_wildcards_closed") || name.equals("expandWildcardsClosed")) {
assert entry.getValue() instanceof String;
expandWildcardsClosed = Boolean.valueOf(entry.getValue().toString());
expandWildcardsClosed = nodeBooleanValue(entry.getValue());
} else if (name.equals("partial")) {
partial(nodeBooleanValue(entry.getValue()));
} else if (name.equals("settings")) {
if (!(entry.getValue() instanceof Map)) {
throw new ElasticsearchIllegalArgumentException("malformed settings section, should indices an inner object");
Expand Down Expand Up @@ -450,6 +473,7 @@ public void readFrom(StreamInput in) throws IOException {
settings = readSettingsFromStream(in);
includeGlobalState = in.readBoolean();
waitForCompletion = in.readBoolean();
partial = in.readBoolean();
}

@Override
Expand All @@ -462,5 +486,6 @@ public void writeTo(StreamOutput out) throws IOException {
writeSettingsToStream(settings, out);
out.writeBoolean(includeGlobalState);
out.writeBoolean(waitForCompletion);
out.writeBoolean(partial);
}
}
Expand Up @@ -112,6 +112,17 @@ public CreateSnapshotRequestBuilder setWaitForCompletion(boolean waitForCompleti
return this;
}

/**
* If set to true the request should snapshot indices with unavailable shards
*
* @param partial true if request should snapshot indices with unavailable shards
* @return this builder
*/
public CreateSnapshotRequestBuilder setPartial(boolean partial) {
request.partial(partial);
return this;
}

/**
* Sets repository-specific snapshot settings.
* <p/>
Expand Down
Expand Up @@ -78,6 +78,7 @@ protected void masterOperation(final CreateSnapshotRequest request, ClusterState
new SnapshotsService.SnapshotRequest("create_snapshot[" + request.snapshot() + "]", request.snapshot(), request.repository())
.indices(request.indices())
.indicesOptions(request.indicesOptions())
.partial(request.partial())
.settings(request.settings())
.includeGlobalState(request.includeGlobalState())
.masterNodeTimeout(request.masterNodeTimeout());
Expand Down
Expand Up @@ -44,6 +44,7 @@
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.common.settings.ImmutableSettings.readSettingsFromStream;
import static org.elasticsearch.common.settings.ImmutableSettings.writeSettingsToStream;
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;

/**
* Restore snapshot request
Expand Down Expand Up @@ -397,17 +398,13 @@ public RestoreSnapshotRequest source(Map source) {
throw new ElasticsearchIllegalArgumentException("malformed indices section, should be an array of strings");
}
} else if (name.equals("ignore_unavailable") || name.equals("ignoreUnavailable")) {
assert entry.getValue() instanceof String;
ignoreUnavailable = Boolean.valueOf(entry.getValue().toString());
ignoreUnavailable = nodeBooleanValue(entry.getValue());
} else if (name.equals("allow_no_indices") || name.equals("allowNoIndices")) {
assert entry.getValue() instanceof String;
allowNoIndices = Boolean.valueOf(entry.getValue().toString());
allowNoIndices = nodeBooleanValue(entry.getValue());
} else if (name.equals("expand_wildcards_open") || name.equals("expandWildcardsOpen")) {
assert entry.getValue() instanceof String;
expandWildcardsOpen = Boolean.valueOf(entry.getValue().toString());
expandWildcardsOpen = nodeBooleanValue(entry.getValue());
} else if (name.equals("expand_wildcards_closed") || name.equals("expandWildcardsClosed")) {
assert entry.getValue() instanceof String;
expandWildcardsClosed = Boolean.valueOf(entry.getValue().toString());
expandWildcardsClosed = nodeBooleanValue(entry.getValue());
} else if (name.equals("settings")) {
if (!(entry.getValue() instanceof Map)) {
throw new ElasticsearchIllegalArgumentException("malformed settings section, should indices an inner object");
Expand Down
Expand Up @@ -203,7 +203,8 @@ public static enum State {
STARTED((byte) 1),
SUCCESS((byte) 2),
FAILED((byte) 3),
ABORTED((byte) 4);
ABORTED((byte) 4),
MISSING((byte) 5);

private byte value;

Expand All @@ -216,7 +217,43 @@ public byte value() {
}

public boolean completed() {
return this == SUCCESS || this == FAILED;
switch (this) {
case INIT:
return false;
case STARTED:
return false;
case SUCCESS:
return true;
case FAILED:
return true;
case ABORTED:
return false;
case MISSING:
return true;
default:
assert false;
return true;
}
}

public boolean failed() {
switch (this) {
case INIT:
return false;
case STARTED:
return false;
case SUCCESS:
return false;
case FAILED:
return true;
case ABORTED:
return true;
case MISSING:
return true;
default:
assert false;
return false;
}
}

public static State fromValue(byte value) {
Expand All @@ -231,6 +268,8 @@ public static State fromValue(byte value) {
return FAILED;
case 4:
return ABORTED;
case 5:
return MISSING;
default:
throw new ElasticsearchIllegalArgumentException("No snapshot state for value [" + value + "]");
}
Expand Down
29 changes: 26 additions & 3 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Expand Up @@ -162,6 +162,10 @@ public ClusterState execute(ClusterState currentState) {
ImmutableMap.Builder<ShardId, RestoreMetaData.ShardRestoreStatus> shards = ImmutableMap.builder();
for (Map.Entry<String, String> indexEntry : renamedIndices.entrySet()) {
String index = indexEntry.getValue();
// Make sure that index was fully snapshotted - don't restore
if (failed(snapshot, index)) {
throw new SnapshotRestoreException(snapshotId, "index [" + index + "] wasn't fully snapshotted - cannot restore");
}
RestoreSource restoreSource = new RestoreSource(snapshotId, index);
String renamedIndex = indexEntry.getKey();
IndexMetaData snapshotIndexMetaData = metaData.index(index);
Expand Down Expand Up @@ -391,6 +395,24 @@ private void processDeletedIndices(ClusterChangedEvent event) {
}
}

private boolean failed(Snapshot snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
return true;
}
}
return false;
}

private boolean failed(Snapshot snapshot, String index, int shard) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index()) && shard == failure.shardId()) {
return true;
}
}
return false;
}

/**
* Adds restore completion listener
* <p/>
Expand Down Expand Up @@ -427,16 +449,17 @@ public void clusterChanged(ClusterChangedEvent event) {

/**
* Checks if a repository is currently in use by one of the snapshots
*
* @param clusterState cluster state
* @param repository repository id
* @param repository repository id
* @return true if repository is currently in use by one of the running snapshots
*/
public static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
MetaData metaData = clusterState.metaData();
RestoreMetaData snapshots = metaData.custom(RestoreMetaData.TYPE);
if (snapshots != null) {
for(RestoreMetaData.Entry snapshot : snapshots.entries()) {
if(repository.equals(snapshot.snapshotId().getRepository())) {
for (RestoreMetaData.Entry snapshot : snapshots.entries()) {
if (repository.equals(snapshot.snapshotId().getRepository())) {
return true;
}
}
Expand Down
24 changes: 22 additions & 2 deletions src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.Joda;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
Expand All @@ -43,6 +42,8 @@ public class SnapshotInfo implements ToXContent, Streamable {

private SnapshotState state;

private String reason;

private ImmutableList<String> indices;

private long startTime;
Expand All @@ -67,6 +68,7 @@ public class SnapshotInfo implements ToXContent, Streamable {
public SnapshotInfo(Snapshot snapshot) {
name = snapshot.name();
state = snapshot.state();
reason = snapshot.reason();
indices = snapshot.indices();
startTime = snapshot.startTime();
endTime = snapshot.endTime();
Expand All @@ -93,6 +95,15 @@ public SnapshotState state() {
return state;
}

/**
* Returns snapshot failure reason
*
* @return snapshot failure reason
*/
public String reason() {
return reason;
}

/**
* Returns indices that were included into this snapshot
*
Expand Down Expand Up @@ -137,7 +148,7 @@ public int totalShards() {
* @return number of failed shards
*/
public int failedShards() {
return totalShards - successfulShards;
return totalShards - successfulShards;
}

/**
Expand All @@ -162,6 +173,9 @@ public ImmutableList<SnapshotShardFailure> shardFailures() {
* Returns snapshot REST status
*/
public RestStatus status() {
if (state == SnapshotState.FAILED) {
return RestStatus.INTERNAL_SERVER_ERROR;
}
if (shardFailures.size() == 0) {
return RestStatus.OK;
}
Expand All @@ -179,6 +193,7 @@ public RestStatus status() {
static final class Fields {
static final XContentBuilderString INDICES = new XContentBuilderString("indices");
static final XContentBuilderString STATE = new XContentBuilderString("state");
static final XContentBuilderString REASON = new XContentBuilderString("reason");
static final XContentBuilderString START_TIME = new XContentBuilderString("start_time");
static final XContentBuilderString START_TIME_IN_MILLIS = new XContentBuilderString("start_time_in_millis");
static final XContentBuilderString END_TIME = new XContentBuilderString("end_time");
Expand All @@ -202,6 +217,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endArray();
builder.field(Fields.STATE, state);
if (reason != null) {
builder.field(Fields.REASON, reason);
}
if (startTime != 0) {
builder.field(Fields.START_TIME, DATE_TIME_FORMATTER.printer().print(startTime));
builder.field(Fields.START_TIME_IN_MILLIS, startTime);
Expand Down Expand Up @@ -235,6 +253,7 @@ public void readFrom(StreamInput in) throws IOException {
}
indices = indicesListBuilder.build();
state = SnapshotState.fromValue(in.readByte());
reason = in.readOptionalString();
startTime = in.readVLong();
endTime = in.readVLong();
totalShards = in.readVInt();
Expand All @@ -259,6 +278,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
}
out.writeByte(state.value());
out.writeOptionalString(reason);
out.writeVLong(startTime);
out.writeVLong(endTime);
out.writeVInt(totalShards);
Expand Down

0 comments on commit 15a0190

Please sign in to comment.