diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index e7d5ecd8d648e..6ce8592879e08 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -19,6 +19,8 @@ package org.elasticsearch.action; +import java.util.function.Consumer; + /** * A listener for action responses or failures. */ @@ -33,4 +35,31 @@ public interface ActionListener { * A failure caused by an exception at some phase of the task. */ void onFailure(Exception e); + + /** + * Creates a listener that listens for a response (or failure) and executes the + * corresponding consumer when the response (or failure) is received. + * + * @param onResponse the consumer of the response, when the listener receives one + * @param onFailure the consumer of the failure, when the listener receives one + * @param the type of the response + * @return a listener that listens for responses and invokes the consumer when received + */ + static ActionListener wrap(Consumer onResponse, Consumer onFailure) { + return new ActionListener() { + @Override + public void onResponse(Response response) { + try { + onResponse.accept(response); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } + }; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 8d7e75b162610..7b3b2a0a2f0c6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final Set blocks = new HashSet<>(); + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { this.originalMessage = originalMessage; @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) { return this; } + public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + public TransportMessage originalMessage() { return originalMessage; } @@ -142,4 +150,8 @@ public Index shrinkFrom() { public boolean updateAllTypes() { return updateAllTypes; } + + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index b808484cef53d..38401fef18126 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest private boolean updateAllTypes = false; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + public CreateIndexRequest() { } @@ -440,6 +443,30 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) { return this; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + /** + * Sets the number of shard copies that should be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * wait for all shards (primary and all replicas) to be active before returning. + * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -462,6 +489,7 @@ public void readFrom(StreamInput in) throws IOException { aliases.add(Alias.read(in)); } updateAllTypes = in.readBoolean(); + waitForActiveShards = ActiveShardCount.readFrom(in); } @Override @@ -486,5 +514,6 @@ public void writeTo(StreamOutput out) throws IOException { alias.writeTo(out); } out.writeBoolean(updateAllTypes); + waitForActiveShards.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java index 2de6d4d57a051..1c930b8951065 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -249,4 +250,23 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) { request.updateAllTypes(updateAllTypes); return this; } + + /** + * Sets the number of shard copies that should be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * wait for all shards (primary and all replicas) to be active before returning. + * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index b9282002349b3..35dd53276cd6d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; @@ -30,22 +31,41 @@ */ public class CreateIndexResponse extends AcknowledgedResponse { + private boolean shardsAcked; + protected CreateIndexResponse() { } - protected CreateIndexResponse(boolean acknowledged) { + protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked) { super(acknowledged); + assert acknowledged || shardsAcked == false; // if its not acknowledged, then shards acked should be false too + this.shardsAcked = shardsAcked; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); readAcknowledged(in); + shardsAcked = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); writeAcknowledged(out); + out.writeBoolean(shardsAcked); + } + + /** + * Returns true if the requisite number of shards were started before + * returning from the index creation operation. If {@link #isAcknowledged()} + * is false, then this also returns false. + */ + public boolean isShardsAcked() { + return shardsAcked; + } + + public void addCustomFields(XContentBuilder builder) throws IOException { + builder.field("shards_acknowledged", isShardsAcked()); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 444da8df082bf..d3ce1975e8917 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -31,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -77,24 +75,12 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .settings(request.settings()).mappings(request.mappings()) - .aliases(request.aliases()).customs(request.customs()); + .aliases(request.aliases()).customs(request.customs()) + .waitForActiveShards(request.waitForActiveShards()); - createIndexService.createIndex(updateRequest, new ActionListener() { - - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new CreateIndexResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof IndexAlreadyExistsException) { - logger.trace("[{}] failed to create", t, request.index()); - } else { - logger.debug("[{}] failed to create", t, request.index()); - } - listener.onFailure(t); - } - }); + createIndexService.createIndex(updateRequest, ActionListener.wrap(response -> + listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked())), + listener::onFailure)); } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java index 9e98418f18428..481a375492a8c 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -206,4 +207,22 @@ public void source(BytesReference source) { } } + /** + * Sets the number of shard copies that should be active for creation of the + * new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.createIndexRequest.waitForActiveShards(waitForActiveShards); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java index e9b4351fc5dea..edc7aaa92d6b9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.rollover; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; @@ -70,4 +71,23 @@ public RolloverRequestBuilder mapping(String type, String source) { this.request.getCreateIndexRequest().mapping(type, source); return this; } + + /** + * Sets the number of shard copies that should be active for creation of the + * new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.request.setWaitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java index 0f254e825da3f..b495e3c6a0f32 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java @@ -39,22 +39,28 @@ public final class RolloverResponse extends ActionResponse implements ToXContent private static final String DRY_RUN = "dry_run"; private static final String ROLLED_OVER = "rolled_over"; private static final String CONDITIONS = "conditions"; + private static final String ACKNOWLEDGED = "acknowledged"; + private static final String SHARDS_ACKED = "shards_acknowledged"; private String oldIndex; private String newIndex; private Set> conditionStatus; private boolean dryRun; private boolean rolledOver; + private boolean acknowledged; + private boolean shardsAcked; RolloverResponse() { } RolloverResponse(String oldIndex, String newIndex, Set conditionResults, - boolean dryRun, boolean rolledOver) { + boolean dryRun, boolean rolledOver, boolean acknowledged, boolean shardsAcked) { this.oldIndex = oldIndex; this.newIndex = newIndex; this.dryRun = dryRun; this.rolledOver = rolledOver; + this.acknowledged = acknowledged; + this.shardsAcked = shardsAcked; this.conditionStatus = conditionResults.stream() .map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched)) .collect(Collectors.toSet()); @@ -89,12 +95,31 @@ public boolean isDryRun() { } /** - * Returns if the rollover was not simulated and the conditions were met + * Returns true if the rollover was not simulated and the conditions were met */ public boolean isRolledOver() { return rolledOver; } + /** + * Returns true if the creation of the new rollover index and switching of the + * alias to the newly created index was successful, and returns false otherwise. + * If {@link #isDryRun()} is true, then this will also return false. If this + * returns false, then {@link #isShardsAcked()} will also return false. + */ + public boolean isAcknowledged() { + return acknowledged; + } + + /** + * Returns true if the requisite number of shards were started in the newly + * created rollover index before returning. If {@link #isAcknowledged()} is + * false, then this will also return false. + */ + public boolean isShardsAcked() { + return shardsAcked; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -110,6 +135,8 @@ public void readFrom(StreamInput in) throws IOException { conditionStatus = conditions; dryRun = in.readBoolean(); rolledOver = in.readBoolean(); + acknowledged = in.readBoolean(); + shardsAcked = in.readBoolean(); } @Override @@ -124,6 +151,8 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBoolean(dryRun); out.writeBoolean(rolledOver); + out.writeBoolean(acknowledged); + out.writeBoolean(shardsAcked); } @Override @@ -132,6 +161,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(NEW_INDEX, newIndex); builder.field(ROLLED_OVER, rolledOver); builder.field(DRY_RUN, dryRun); + builder.field(ACKNOWLEDGED, acknowledged); + builder.field(SHARDS_ACKED, shardsAcked); builder.startObject(CONDITIONS); for (Map.Entry entry : conditionStatus) { builder.field(entry.getKey(), entry.getValue()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index af81e5cf8c908..d88cfe35f8f7e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -25,11 +25,12 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasAction; @@ -58,6 +59,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction result.matched)) { - createIndexService.createIndex(prepareCreateIndexRequest(rolloverIndexName, rolloverRequest), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - // switch the alias to point to the newly created index - indexAliasesService.indicesAliases( - prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, - rolloverRequest), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { - listener.onResponse( - new RolloverResponse(sourceIndexName, rolloverIndexName, - conditionResults, false, true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(rolloverIndexName, rolloverRequest); + createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> { + // switch the alias to point to the newly created index + indexAliasesService.indicesAliases( + prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, + rolloverRequest), + ActionListener.wrap(aliasClusterStateUpdateResponse -> { + if (aliasClusterStateUpdateResponse.isAcknowledged()) { + activeShardsObserver.waitForActiveShards(rolloverIndexName, + rolloverRequest.getCreateIndexRequest().waitForActiveShards(), + rolloverRequest.masterNodeTimeout(), + isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, + conditionResults, false, true, true, isShardsAcked)), + listener::onFailure); + } else { + listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, + false, true, false, false)); + } + }, listener::onFailure)); + }, listener::onFailure)); } else { // conditions not met listener.onResponse( - new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false) + new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false, false, false) ); } } @@ -216,6 +211,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Stri .masterNodeTimeout(createIndexRequest.masterNodeTimeout()) .settings(createIndexRequest.settings()) .aliases(createIndexRequest.aliases()) + .waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation .mappings(createIndexRequest.mappings()); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java index 791d35220e21e..9ba4acdefc60f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -36,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; -import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -126,6 +126,24 @@ public String getSourceIndex() { return sourceIndex; } + /** + * Sets the number of shard copies that should be active for creation of the + * new shrunken index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards); + } + public void source(BytesReference source) { XContentType xContentType = XContentFactory.xContentType(source); if (xContentType != null) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java index ab392a7f8249d..5ec1a5066ebdc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.shrink; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; @@ -44,4 +45,23 @@ public ShrinkRequestBuilder setSettings(Settings settings) { this.request.getShrinkIndexRequest().settings(settings); return this; } + + /** + * Sets the number of shard copies that should be active for creation of the + * new shrunken index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public ShrinkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.request.setWaitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java index 4835471ae4c24..e7ad0afe3aa17 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java @@ -25,7 +25,7 @@ public final class ShrinkResponse extends CreateIndexResponse { ShrinkResponse() { } - ShrinkResponse(boolean acknowledged) { - super(acknowledged); + ShrinkResponse(boolean acknowledged, boolean shardsAcked) { + super(acknowledged, shardsAcked); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java index 57dc3e82e1877..4667f1e98255a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -40,7 +39,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -93,22 +91,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); }, indexNameExpressionResolver); - createIndexService.createIndex(updateRequest, new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new ShrinkResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof IndexAlreadyExistsException) { - logger.trace("[{}] failed to create shrink index", t, updateRequest.index()); - } else { - logger.debug("[{}] failed to create shrink index", t, updateRequest.index()); - } - listener.onFailure(t); - } - }); + createIndexService.createIndex(updateRequest, ActionListener.wrap(response -> + listener.onResponse(new ShrinkResponse(response.isAcknowledged(), response.isShardsAcked())), listener::onFailure)); } @Override @@ -162,6 +146,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Shri .settings(targetIndex.settings()) .aliases(targetIndex.aliases()) .customs(targetIndex.customs()) + .waitForActiveShards(targetIndex.waitForActiveShards()) .shrinkFrom(metaData.getIndex()); } diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java new file mode 100644 index 0000000000000..90bd0450afb81 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * A class whose instances represent a value for counting the number + * of active shard copies for a given shard in an index. + */ +public final class ActiveShardCount implements Writeable { + + private static final int ACTIVE_SHARD_COUNT_DEFAULT = -2; + private static final int ALL_ACTIVE_SHARDS = -1; + + public static final ActiveShardCount DEFAULT = new ActiveShardCount(ACTIVE_SHARD_COUNT_DEFAULT); + public static final ActiveShardCount ALL = new ActiveShardCount(ALL_ACTIVE_SHARDS); + public static final ActiveShardCount NONE = new ActiveShardCount(0); + public static final ActiveShardCount ONE = new ActiveShardCount(1); + + private final int value; + + private ActiveShardCount(final int value) { + this.value = value; + } + + /** + * Get an ActiveShardCount instance for the given value. The value is first validated to ensure + * it is a valid shard count and throws an IllegalArgumentException if validation fails. Valid + * values are any non-negative number. Directly use {@link ActiveShardCount#DEFAULT} for the + * default value (which is one shard copy) or {@link ActiveShardCount#ALL} to specify all the shards. + */ + public static ActiveShardCount from(final int value) { + if (value < 0) { + throw new IllegalArgumentException("shard count cannot be a negative value"); + } + return get(value); + } + + private static ActiveShardCount get(final int value) { + switch (validateValue(value)) { + case ACTIVE_SHARD_COUNT_DEFAULT: + return DEFAULT; + case ALL_ACTIVE_SHARDS: + return ALL; + case 1: + return ONE; + case 0: + return NONE; + default: + return new ActiveShardCount(value); + } + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeInt(value); + } + + public static ActiveShardCount readFrom(final StreamInput in) throws IOException { + return get(in.readInt()); + } + + private static int validateValue(final int value) { + if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) { + throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]"); + } + return value; + } + + /** + * Resolve this instance to an actual integer value for the number of active shard counts. + * If {@link ActiveShardCount#ALL} is specified, then the given {@link IndexMetaData} is + * used to determine what the actual active shard count should be. The default value indicates + * one active shard. + */ + public int resolve(final IndexMetaData indexMetaData) { + if (this == ActiveShardCount.DEFAULT) { + return 1; + } else if (this == ActiveShardCount.ALL) { + return indexMetaData.getNumberOfReplicas() + 1; + } else { + return value; + } + } + + /** + * Parses the active shard count from the given string. Valid values are "all" for + * all shard copies, null for the default value (which defaults to one shard copy), + * or a numeric value greater than or equal to 0. Any other input will throw an + * IllegalArgumentException. + */ + public static ActiveShardCount parseString(final String str) { + if (str == null) { + return ActiveShardCount.DEFAULT; + } else if (str.equals("all")) { + return ActiveShardCount.ALL; + } else { + int val; + try { + val = Integer.parseInt(str); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("cannot parse ActiveShardCount[" + str + "]", e); + } + return ActiveShardCount.from(val); + } + } + + /** + * Returns true iff the given cluster state's routing table contains enough active + * shards to meet the required shard count represented by this instance. + */ + public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) { + if (this == ActiveShardCount.NONE) { + // not waiting for any active shards + return true; + } + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + if (indexMetaData == null) { + // its possible the index was deleted while waiting for active shard copies, + // in this case, we'll just consider it that we have enough active shard copies + // and we can stop waiting + return true; + } + final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName); + assert indexRoutingTable != null; + if (indexRoutingTable.allPrimaryShardsActive() == false) { + // all primary shards aren't active yet + return false; + } + for (final IntObjectCursor shardRouting : indexRoutingTable.getShards()) { + if (enoughShardsActive(shardRouting.value, indexMetaData) == false) { + // not enough active shard copies yet + return false; + } + } + return true; + } + + /** + * Returns true iff the active shard count in the shard routing table is enough + * to meet the required shard count represented by this instance. + */ + public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { + if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) { + // not enough active shard copies yet + return false; + } + return true; + } + + @Override + public int hashCode() { + return Integer.hashCode(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + @SuppressWarnings("unchecked") ActiveShardCount that = (ActiveShardCount) o; + return value == that.value; + } + + @Override + public String toString() { + final String valStr; + switch (value) { + case ALL_ACTIVE_SHARDS: + valStr = "ALL"; + break; + case ACTIVE_SHARD_COUNT_DEFAULT: + valStr = "DEFAULT"; + break; + default: + valStr = Integer.toString(value); + } + return "ActiveShardCount[" + valStr + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java new file mode 100644 index 0000000000000..7217961d899c4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.function.Consumer; + +/** + * This class provides primitives for waiting for a configured number of shards + * to become active before sending a response on an {@link ActionListener}. + */ +public class ActiveShardsObserver extends AbstractComponent { + + private final ClusterService clusterService; + private final ThreadPool threadPool; + + public ActiveShardsObserver(final Settings settings, final ClusterService clusterService, final ThreadPool threadPool) { + super(settings); + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + /** + * Waits on the specified number of active shards to be started before executing the + * + * @param indexName the index to wait for active shards on + * @param activeShardCount the number of active shards to wait on before returning + * @param timeout the timeout value + * @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first) + * @param onFailure a function that is executed in response to an error occurring during waiting for the active shards + */ + public void waitForActiveShards(final String indexName, + final ActiveShardCount activeShardCount, + final TimeValue timeout, + final Consumer onResult, + final Consumer onFailure) { + + // wait for the configured number of active shards to be allocated before executing the result consumer + if (activeShardCount == ActiveShardCount.NONE) { + // not waiting, so just run whatever we were to run when the waiting is + onResult.accept(true); + return; + } + + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext()); + if (activeShardCount.enoughShardsActive(observer.observedState(), indexName)) { + onResult.accept(true); + } else { + final ClusterStateObserver.ChangePredicate shardsAllocatedPredicate = + new ClusterStateObserver.ValidationPredicate() { + @Override + protected boolean validate(final ClusterState newState) { + return activeShardCount.enoughShardsActive(newState, indexName); + } + }; + + final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + onResult.accept(true); + } + + @Override + public void onClusterServiceClose() { + logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName); + onFailure.accept(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + onResult.accept(false); + } + }; + + observer.waitForNextChange(observerListener, shardsAllocatedPredicate, timeout); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java b/core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java new file mode 100644 index 0000000000000..4f0e99ae558fe --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +/** + * A cluster state update response with specific fields for index creation. + */ +public class CreateIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse { + + private final boolean shardsAcked; + + public CreateIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcked) { + super(acknowledged); + this.shardsAcked = shardsAcked; + } + + /** + * Returns whether the requisite number of shard copies started before the completion of the operation. + */ + public boolean isShardsAcked() { + return shardsAcked; + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 4f7aa68d7de79..bab03febaae87 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -27,9 +27,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; +import org.elasticsearch.cluster.ack.CreateIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -68,6 +70,7 @@ import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.threadpool.ThreadPool; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -108,13 +111,15 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final Environment env; private final NodeServicesProvider nodeServicesProvider; private final IndexScopedSettings indexScopedSettings; - + private final ActiveShardsObserver activeShardsObserver; @Inject public MetaDataCreateIndexService(Settings settings, ClusterService clusterService, IndicesService indicesService, AllocationService allocationService, AliasValidator aliasValidator, - Set indexTemplateFilters, Environment env, NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings) { + Set indexTemplateFilters, Environment env, + NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings, + ThreadPool threadPool) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; @@ -135,6 +140,7 @@ public MetaDataCreateIndexService(Settings settings, ClusterService clusterServi } this.indexTemplateFilter = new IndexTemplateFilter.Compound(templateFilters); } + this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool); } public void validateIndexName(String index, ClusterState state) { @@ -176,7 +182,38 @@ public void validateIndexName(String index, ClusterState state) { } } - public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener) { + /** + * Creates an index in the cluster state and waits for the specified number of shard copies to + * become active (as specified in {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()}) + * before sending the response on the listener. If the index creation was successfully applied on + * the cluster state, then {@link CreateIndexClusterStateUpdateResponse#isAcknowledged()} will return + * true, otherwise it will return false and no waiting will occur for started shards + * ({@link CreateIndexClusterStateUpdateResponse#isShardsAcked()} will also be false). If the index + * creation in the cluster state was successful and the requisite shard copies were started before + * the timeout, then {@link CreateIndexClusterStateUpdateResponse#isShardsAcked()} will + * return true, otherwise if the operation timed out, then it will return false. + * + * @param request the index creation cluster state update request + * @param listener the listener on which to send the index creation cluster state update response + */ + public void createIndex(final CreateIndexClusterStateUpdateRequest request, + final ActionListener listener) { + onlyCreateIndex(request, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + activeShardsObserver.waitForActiveShards(request.index(), request.waitForActiveShards(), request.ackTimeout(), + shardsAcked -> { + logger.debug("[{}] index created, but the operation timed out while waiting for " + + "enough shards to be started.", request.index()); + listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked)); + }, listener::onFailure); + } else { + listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false)); + } + }, listener::onFailure)); + } + + private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, + final ActionListener listener) { Settings.Builder updatedSettingsBuilder = Settings.builder(); updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); indexScopedSettings.validate(updatedSettingsBuilder); @@ -308,6 +345,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); + if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) { + throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() + + "]: cannot be greater than number of shard copies [" + + (tmpImd.getNumberOfReplicas() + 1) + "]"); + } // create the index here (on the master) to validate it can be created, as well as adding the mapping final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList()); createdIndex = indexService.index(); @@ -408,6 +450,16 @@ public ClusterState execute(ClusterState currentState) throws Exception { } } } + + @Override + public void onFailure(String source, Exception e) { + if (e instanceof IndexAlreadyExistsException) { + logger.trace("[{}] failed to create", e, request.index()); + } else { + logger.debug("[{}] failed to create", e, request.index()); + } + super.onFailure(source, e); + } }); } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java index a922edff484a2..021eff9ea684b 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; @@ -51,6 +52,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel, rolloverIndexRequest.dryRun(request.paramAsBoolean("dry_run", false)); rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout())); rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout())); + rolloverIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); client.admin().indices().rolloverIndex(rolloverIndexRequest, new RestToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java index 96b4abbe06331..11285238703ca 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java @@ -21,14 +21,19 @@ import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; +import java.io.IOException; + /** * */ @@ -56,6 +61,12 @@ public void handleRequest(final RestRequest request, final RestChannel channel, } shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout())); shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout())); - client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<>(channel)); + shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); + client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener(channel) { + @Override + public void addCustomFields(XContentBuilder builder, ShrinkResponse response) throws IOException { + response.addCustomFields(builder); + } + }); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java index 46ee596286e20..bc45093e4bc8a 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java @@ -22,14 +22,18 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; +import java.io.IOException; + /** * */ @@ -52,6 +56,12 @@ public void handleRequest(final RestRequest request, final RestChannel channel, createIndexRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false)); createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout())); createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout())); - client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener(channel)); + createIndexRequest.waitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); + client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener(channel) { + @Override + public void addCustomFields(XContentBuilder builder, CreateIndexResponse response) throws IOException { + response.addCustomFields(builder); + } + }); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index eff436d3a0eca..3b26345148775 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -29,10 +30,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; -import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -54,11 +55,11 @@ public void testDelayShards() throws Exception { }); logger.info("--> creating 'test' index"); - prepareCreate("test").setSettings(Settings.builder() + assertAcked(prepareCreate("test").setSettings(Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1m") .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).get(); - ensureGreen("test"); + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)) + .setWaitForActiveShards(ActiveShardCount.ALL).get()); logger.info("--> stopping a random node"); assertTrue(internalCluster().stopRandomDataNode()); @@ -89,6 +90,7 @@ public void testUnassignedShards() throws Exception { .setSettings(Settings.builder() .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)) + .setWaitForActiveShards(ActiveShardCount.ALL) // wait on all shards .get(); client().admin().indices().prepareCreate("only-baz") @@ -96,6 +98,7 @@ public void testUnassignedShards() throws Exception { .put("index.routing.allocation.include.bar", "baz") .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)) + .setWaitForActiveShards(ActiveShardCount.ALL) .get(); client().admin().indices().prepareCreate("only-foo") @@ -105,9 +108,6 @@ public void testUnassignedShards() throws Exception { .put("index.number_of_replicas", 1)) .get(); - ensureGreen("anywhere", "only-baz"); - ensureYellow("only-foo"); - ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain() .setIndex("only-foo") .setShard(0) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 428e859e34221..7231bee0bef12 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; @@ -289,7 +290,7 @@ public void testMappingConflictRootCause() throws Exception { public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get(); - client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get(); + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(indexSettings()).get(); internalCluster().fullRestart(); ensureGreen("test"); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 7d163630afbca..8a4a62f97287b 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -166,6 +167,8 @@ public void testCreateIndexRequest() throws Exception { String alias = randomAsciiOfLength(10); String rolloverIndex = randomAsciiOfLength(10); final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAsciiOfLength(10)); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + rolloverRequest.setWaitForActiveShards(activeShardCount); final Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java index 3236de4aaeb0d..3fcade05839f0 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexWriter; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -130,6 +131,8 @@ public void testShrinkIndexSettings() { int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards(); DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000)); ShrinkRequest target = new ShrinkRequest("target", indexName); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + target.setWaitForActiveShards(activeShardCount); CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( target, clusterState, (i) -> stats, new IndexNameExpressionResolver(Settings.EMPTY)); @@ -137,6 +140,7 @@ public void testShrinkIndexSettings() { assertEquals(indexName, request.shrinkFrom().getName()); assertEquals("1", request.settings().get("index.number_of_shards")); assertEquals("shrink_index", request.cause()); + assertEquals(request.waitForActiveShards(), activeShardCount); } private DiscoveryNode newNode(String nodeId) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java index 58784fdb7dfba..8493c58729daa 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java @@ -160,7 +160,7 @@ private static List putTemplate(PutRequest request) { null, new HashSet<>(), null, - null, null); + null, null, null); MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(Settings.EMPTY, null, createIndexService, new AliasValidator(Settings.EMPTY), null, null); final List throwables = new ArrayList<>(); @@ -191,6 +191,7 @@ private List putTemplateDetail(PutRequest request) throws Exception { new HashSet<>(), null, nodeServicesProvider, + null, null); MetaDataIndexTemplateService service = new MetaDataIndexTemplateService( Settings.EMPTY, clusterService, createIndexService, new AliasValidator(Settings.EMPTY), indicesService, nodeServicesProvider); diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java new file mode 100644 index 0000000000000..83f0b1332c7da --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java @@ -0,0 +1,305 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the {@link ActiveShardCount} class + */ +public class ActiveShardCountTests extends ESTestCase { + + public void testFromIntValue() { + assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE); + final int value = randomIntBetween(1, 50); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1))); + } + + public void testResolve() { + // one shard + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); + final int value = randomIntBetween(2, 20); + assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); + + // more than one shard + final int numNewShards = randomIntBetween(1, 20); + indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(numNewShards) + .build(); + assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1)); + assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); + assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); + } + + public void testSerialization() throws IOException { + doWriteRead(ActiveShardCount.ALL); + doWriteRead(ActiveShardCount.DEFAULT); + doWriteRead(ActiveShardCount.NONE); + doWriteRead(ActiveShardCount.from(randomIntBetween(1, 50))); + } + + public void testParseString() { + assertSame(ActiveShardCount.parseString("all"), ActiveShardCount.ALL); + assertSame(ActiveShardCount.parseString(null), ActiveShardCount.DEFAULT); + assertSame(ActiveShardCount.parseString("0"), ActiveShardCount.NONE); + int value = randomIntBetween(1, 50); + assertEquals(ActiveShardCount.parseString(value + ""), ActiveShardCount.from(value)); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomAsciiOfLengthBetween(4, 8))); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-1")); // magic numbers not exposed through API + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-2")); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + "")); + } + + private void doWriteRead(ActiveShardCount activeShardCount) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + activeShardCount.writeTo(out); + final ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes)); + ActiveShardCount readActiveShardCount = ActiveShardCount.readFrom(in); + if (activeShardCount == ActiveShardCount.DEFAULT + || activeShardCount == ActiveShardCount.ALL + || activeShardCount == ActiveShardCount.NONE) { + assertSame(activeShardCount, readActiveShardCount); + } else { + assertEquals(activeShardCount, readActiveShardCount); + } + } + + public void testEnoughShardsActiveZero() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0); + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + public void testEnoughShardsActiveLevelOne() { + runTestForOneActiveShard(ActiveShardCount.ONE); + } + + public void testEnoughShardsActiveLevelDefault() { + // default is 1 + runTestForOneActiveShard(ActiveShardCount.DEFAULT); + } + + public void testEnoughShardsActiveRandom() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas)); + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + public void testEnoughShardsActiveLevelAll() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + // both values should represent "all" + final ActiveShardCount waitForActiveShards = randomBoolean() ? ActiveShardCount.from(numberOfReplicas + 1) : ActiveShardCount.ALL; + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + assert activeShardCount == ActiveShardCount.ONE || activeShardCount == ActiveShardCount.DEFAULT; + final ActiveShardCount waitForActiveShards = activeShardCount; + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + private ClusterState initializeWithNewIndex(final String indexName, final int numShards, final int numReplicas) { + // initial index creation and new routing table info + final IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())) + .numberOfShards(numShards) + .numberOfReplicas(numReplicas) + .build(); + final MetaData metaData = MetaData.builder().put(indexMetaData, true).build(); + final RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetaData).build(); + return ClusterState.builder(new ClusterName("test_cluster")).metaData(metaData).routingTable(routingTable).build(); + } + + private ClusterState startPrimaries(final ClusterState clusterState, final String indexName) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, + final ActiveShardCount waitForActiveShards) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + assert shardRoutingTable.getSize() > 2; + // want less than half, and primary is already started + int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (numToStart > 0) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + numToStart--; + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, + final ActiveShardCount waitForActiveShards) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + assert shardRoutingTable.getSize() > 2; + int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (shardRouting.active() == false) { + if (numToStart > 0) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + numToStart--; + } + } else { + numToStart--; + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startAllShards(final ClusterState clusterState, final String indexName) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (shardRouting.active() == false) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + +} diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java new file mode 100644 index 0000000000000..ef371188a58e5 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +/** + * Tests that the index creation operation waits for the appropriate + * number of active shards to be started before returning. + */ +public class ActiveShardsObserverIT extends ESIntegTestCase { + + public void testCreateIndexNoActiveShardsTimesOut() throws Exception { + final String indexName = "test-idx"; + Settings.Builder settingsBuilder = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + if (internalCluster().getNodeNames().length > 0) { + String exclude = String.join(",", internalCluster().getNodeNames()); + settingsBuilder.put("index.routing.allocation.exclude._name", exclude); + } + Settings settings = settingsBuilder.build(); + assertFalse(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(randomBoolean() ? ActiveShardCount.from(1) : ActiveShardCount.ALL) + .setTimeout("100ms") + .get() + .isShardsAcked()); + } + + public void testCreateIndexNoActiveShardsNoWaiting() throws Exception { + final String indexName = "test-idx"; + Settings.Builder settingsBuilder = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + if (internalCluster().getNodeNames().length > 0) { + String exclude = String.join(",", internalCluster().getNodeNames()); + settingsBuilder.put("index.routing.allocation.exclude._name", exclude); + } + Settings settings = settingsBuilder.build(); + CreateIndexResponse response = prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.from(0)) + .get(); + assertTrue(response.isAcknowledged()); + } + + public void testCreateIndexNotEnoughActiveShardsTimesOut() throws Exception { + final String indexName = "test-idx"; + final int numDataNodes = internalCluster().numDataNodes(); + final int numReplicas = numDataNodes + randomInt(4); + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertFalse(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(numDataNodes + 1, numReplicas + 1))) + .setTimeout("100ms") + .get() + .isShardsAcked()); + } + + public void testCreateIndexEnoughActiveShards() throws Exception { + final String indexName = "test-idx"; + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3)) + .build(); + ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes())); + assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get()); + } + + public void testCreateIndexWaitsForAllActiveShards() throws Exception { + final String indexName = "test-idx"; + // not enough data nodes, index creation times out + final int numReplicas = internalCluster().numDataNodes() + randomInt(4); + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertFalse(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.ALL) + .setTimeout("100ms") + .get() + .isShardsAcked()); + if (client().admin().indices().prepareExists(indexName).get().isExists()) { + assertAcked(client().admin().indices().prepareDelete(indexName)); + } + + // enough data nodes, all shards are active + settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1) + .build(); + assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(ActiveShardCount.ALL).get()); + } + + public void testCreateIndexStopsWaitingWhenIndexDeleted() throws Exception { + final String indexName = "test-idx"; + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1) + .build(); + + logger.info("--> start the index creation process"); + ListenableActionFuture responseListener = + prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.ALL) + .execute(); + + logger.info("--> wait until the cluster state contains the new index"); + assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().metaData().hasIndex(indexName))); + + logger.info("--> delete the index"); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + logger.info("--> ensure the create index request completes"); + assertAcked(responseListener.get()); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java index 43aa088a3b945..f411e00468eb7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -40,7 +41,7 @@ public class SimpleDataNodesIT extends ESIntegTestCase { public void testDataNodes() throws Exception { internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); - client().admin().indices().create(createIndexRequest("test")).actionGet(); + client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet(); try { client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); fail("no allocation should happen"); diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index c86535e40c59c..70af580824118 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -91,7 +92,7 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception { final String node_2 = nodesIds.get(1); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); - client().admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE) .setSettings(Settings.builder().put("index.number_of_shards", 1)) .execute().actionGet(); @@ -203,7 +204,7 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc assertThat(healthResponse.isTimedOut(), equalTo(false)); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); - client().admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE) .setSettings(Settings.builder().put("index.number_of_shards", 1)) .execute().actionGet(); @@ -253,14 +254,13 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.INITIALIZING)); - healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - logger.info("--> get the state, verify shard 1 primary allocated"); - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); - assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); - + final String nodeToCheck = node_1; + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + String nodeId = clusterState.nodes().resolveNode(nodeToCheck).getId(); + assertThat(clusterState.getRoutingNodes().node(nodeId).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); + }); } public void testRerouteExplain() { diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index ff31a72a5c6d9..f6f7aaf322800 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -216,6 +216,7 @@ private MetaDataCreateIndexService getCreateIndexService() { new HashSet<>(), null, null, + null, null); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index f990f382c8c50..f267af66dc643 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; @@ -108,7 +109,7 @@ private void createStaleReplicaScenario() throws Exception { logger.info("--> check that old primary shard does not get promoted to primary again"); // kick reroute and wait for all shard states to be fetched client(master).admin().cluster().prepareReroute().get(); - assertBusy(new Runnable() { + assertBusy(new Runnable() { @Override public void run() { assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)); @@ -157,7 +158,8 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { createStaleReplicaScenario(); logger.info("--> explicitly promote old primary shard"); - ImmutableOpenIntMap> storeStatuses = client().admin().indices().prepareShardStores("test").get().getStoreStatuses().get("test"); + final String idxName = "test"; + ImmutableOpenIntMap> storeStatuses = client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName); ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute(); for (IntObjectCursor> shardStoreStatuses : storeStatuses) { int shardId = shardStoreStatuses.key; @@ -165,22 +167,30 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { logger.info("--> adding allocation command for shard {}", shardId); // force allocation based on node id if (useStaleReplica) { - rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); + rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } else { - rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); + rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } } rerouteBuilder.get(); logger.info("--> check that the stale primary shard gets allocated and that documents are available"); - ensureYellow("test"); + ensureYellow(idxName); + + if (useStaleReplica == false) { + // When invoking AllocateEmptyPrimaryAllocationCommand, due to the UnassignedInfo.Reason being changed to INDEX_CREATION, + // its possible that the shard has not completed initialization, even though the cluster health is yellow, so the + // search can throw an "all shards failed" exception. We will wait until the shard initialization has completed before + // verifying the search hit count. + assertBusy(() -> assertTrue(clusterService().state().routingTable().index(idxName).allPrimaryShardsActive())); - assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); + } + assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); } public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException { String node = internalCluster().startNode(); - client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() .put("index.routing.allocation.exclude._name", node) .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index df45b422843e7..0854d27e208f4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -351,7 +350,7 @@ public void testAllocationStatusSerialization() throws IOException { for (AllocationStatus allocationStatus : AllocationStatus.values()) { BytesStreamOutput out = new BytesStreamOutput(); allocationStatus.writeTo(out); - ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytes())); + ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes)); AllocationStatus readStatus = AllocationStatus.readFrom(in); assertThat(readStatus, equalTo(allocationStatus)); } diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index f86b56f105207..eaf9a351824af 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; @@ -205,7 +206,7 @@ public void testJustMasterNode() throws Exception { internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); logger.info("--> create an index"); - client().admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).execute().actionGet(); logger.info("--> closing master node"); internalCluster().closeNonSharedNodes(false); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 09441f7011081..1a8caaa351481 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -158,7 +158,7 @@ public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, nodeServicesProvider); MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, allocationService, new AliasValidator(settings), Collections.emptySet(), environment, - nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); + nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, threadPool); transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool, indexStateService, clusterSettings, actionFilters, indexNameExpressionResolver, destructiveOperations); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index a26cf9124f2d9..c2ccb9cd4ab2a 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -140,7 +141,7 @@ public ClusterState randomlyUpdateClusterState(ClusterState state, CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) .put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)) - .build()); + .build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); } diff --git a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 5f7f26cd38cb4..7d950a73837cc 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; @@ -213,7 +214,7 @@ private void assertFlushResponseEqualsShardStats(ShardStats[] shardsStats, List< public void testUnallocatedShardsDoesNotHang() throws InterruptedException { // create an index but disallow allocation - prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); + prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); // this should not hang but instead immediately return with empty result set List shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test"); diff --git a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java index ea3ebf5179b48..66687ea74fa81 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -98,7 +99,7 @@ public void testSimpleOpenClose() { public void testFastCloseAfterCreateContinuesCreateAfterOpen() { logger.info("--> creating test index that cannot be allocated"); - client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() .put("index.routing.allocation.include.tag", "no_such_node").build()).get(); ClusterHealthResponse health = client().admin().cluster().prepareHealth("test").setWaitForNodes(">=2").get(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 9d571c02c90c8..9731e3ce065f1 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; @@ -45,6 +47,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -443,9 +446,9 @@ public void testRestoreIndexWithMissingShards() throws Exception { logger.info("--> create an index that will have no allocated shards"); assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6) .put("index.routing.allocation.include.tag", "nowhere") - .put("number_of_replicas", 0))); + .put("number_of_replicas", 0)).setWaitForActiveShards(ActiveShardCount.NONE).get()); + assertTrue(client().admin().indices().prepareExists("test-idx-none").get().isExists()); - logger.info("--> create repository"); logger.info("--> creating repository"); PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index fb52d0f634bac..e59f349548400 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -755,7 +756,7 @@ public void testUnallocatedShards() throws Exception { .put("location", randomRepoPath()))); logger.info("--> creating index that cannot be allocated"); - prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).get(); + prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).setWaitForActiveShards(ActiveShardCount.NONE).get(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json index bdac6d9a9ab89..b0267eae3a3c7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json @@ -13,6 +13,10 @@ } }, "params": { + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for before the operation returns." + }, "timeout": { "type" : "time", "description" : "Explicit operation timeout" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json index 218871a976554..97580182ea1bd 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json @@ -25,6 +25,10 @@ "master_timeout": { "type" : "time", "description" : "Specify timeout for connection to master" + }, + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for on the newly created rollover index before the operation returns." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json index 633e9e160936b..5ef943eacba6c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json @@ -25,6 +25,10 @@ "master_timeout": { "type" : "time", "description" : "Specify timeout for connection to master" + }, + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for on the shrunken index before the operation returns." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml index 17b86d647649e..3c6ad7a70519f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml @@ -30,6 +30,35 @@ - match: { test_index.settings.index.number_of_replicas: "0"} +--- +"Create index with too large wait_for_active_shards": + + - do: + indices.create: + index: test_index + timeout: 100ms + master_timeout: 100ms + wait_for_active_shards: 6 + body: + settings: + number_of_replicas: 5 + + - match: { shards_acknowledged: false } + +--- +"Create index with wait_for_active_shards set to all": + + - do: + indices.create: + index: test_index + wait_for_active_shards: all + body: + settings: + number_of_replicas: "0" + + - match: { acknowledged: true } + - match: { shards_acknowledged: true } + --- "Create index with aliases": diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml index 3a4821193e640..0152903b5fa72 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml @@ -4,6 +4,7 @@ - do: indices.create: index: logs-1 + wait_for_active_shards: 1 body: aliases: logs_index: {} @@ -30,11 +31,12 @@ # perform alias rollover - do: - indices.rollover: - alias: "logs_search" - body: - conditions: - max_docs: 1 + indices.rollover: + alias: "logs_search" + wait_for_active_shards: 1 + body: + conditions: + max_docs: 1 - match: { old_index: logs-1 } - match: { new_index: logs-2 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml index b1aac4952c402..cdfdaa806847f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml @@ -6,6 +6,7 @@ - do: indices.create: index: source + wait_for_active_shards: 1 body: settings: number_of_replicas: "0" @@ -53,6 +54,7 @@ indices.shrink: index: "source" target: "target" + wait_for_active_shards: 1 body: settings: index.number_of_replicas: 0 diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 24af2b72bd176..34653a69b3dd2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; @@ -123,6 +124,17 @@ public static void assertAcked(DeleteIndexResponse response) { assertVersionSerializable(response); } + /** + * Assert that an index creation was fully acknowledged, meaning that both the index creation cluster + * state update was successful and that the requisite number of shard copies were started before returning. + */ + public static void assertAcked(CreateIndexResponse response) { + assertThat(response.getClass().getSimpleName() + " failed - not acked", response.isAcknowledged(), equalTo(true)); + assertVersionSerializable(response); + assertTrue(response.getClass().getSimpleName() + " failed - index creation acked but not all shards were started", + response.isShardsAcked()); + } + /** * Executes the request and fails if the request has not been blocked. *