From d78f40fb1e88c78ce4466fe145d365c205441e43 Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Fri, 15 Jul 2016 11:19:27 -0400 Subject: [PATCH] Index creation waits for active shard copies before returning (#18985) Before returning, index creation now waits for the configured number of shard copies to be started. In the past, a client would create an index and then potentially have to check the cluster health to wait to execute write operations. With the cluster health semantics changing so that index creation does not cause the cluster health to go RED, this change enables waiting for the desired number of active shards to be active before returning from index creation. Relates #9126 --- .../elasticsearch/action/ActionListener.java | 29 ++ .../CreateIndexClusterStateUpdateRequest.java | 12 + .../indices/create/CreateIndexRequest.java | 29 ++ .../create/CreateIndexRequestBuilder.java | 20 ++ .../indices/create/CreateIndexResponse.java | 22 +- .../create/TransportCreateIndexAction.java | 26 +- .../indices/rollover/RolloverRequest.java | 19 ++ .../rollover/RolloverRequestBuilder.java | 20 ++ .../indices/rollover/RolloverResponse.java | 35 +- .../rollover/TransportRolloverAction.java | 58 ++-- .../admin/indices/shrink/ShrinkRequest.java | 20 +- .../indices/shrink/ShrinkRequestBuilder.java | 20 ++ .../admin/indices/shrink/ShrinkResponse.java | 4 +- .../indices/shrink/TransportShrinkAction.java | 21 +- .../action/support/ActiveShardCount.java | 211 ++++++++++++ .../action/support/ActiveShardsObserver.java | 105 ++++++ ...CreateIndexClusterStateUpdateResponse.java | 40 +++ .../metadata/MetaDataCreateIndexService.java | 58 +++- .../indices/RestRolloverIndexAction.java | 2 + .../admin/indices/RestShrinkIndexAction.java | 13 +- .../indices/create/RestCreateIndexAction.java | 12 +- .../ClusterAllocationExplainIT.java | 14 +- .../admin/indices/create/CreateIndexIT.java | 3 +- .../TransportRolloverActionTests.java | 3 + .../shrink/TransportShrinkActionTests.java | 4 + .../MetaDataIndexTemplateServiceTests.java | 3 +- .../action/support/ActiveShardCountTests.java | 305 ++++++++++++++++++ .../support/ActiveShardsObserverIT.java | 155 +++++++++ .../cluster/SimpleDataNodesIT.java | 3 +- .../cluster/allocation/ClusterRerouteIT.java | 18 +- .../MetaDataCreateIndexServiceTests.java | 1 + .../cluster/routing/PrimaryAllocationIT.java | 24 +- .../cluster/routing/UnassignedInfoTests.java | 3 +- .../gateway/GatewayIndexStateIT.java | 3 +- .../indices/cluster/ClusterStateChanges.java | 2 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../elasticsearch/indices/flush/FlushIT.java | 3 +- .../indices/state/SimpleIndexStateIT.java | 3 +- .../DedicatedClusterSnapshotRestoreIT.java | 7 +- .../SharedClusterSnapshotRestoreIT.java | 3 +- .../rest-api-spec/api/indices.create.json | 4 + .../rest-api-spec/api/indices.rollover.json | 4 + .../rest-api-spec/api/indices.shrink.json | 4 + .../test/indices.create/10_basic.yaml | 29 ++ .../test/indices.rollover/10_basic.yaml | 12 +- .../test/indices.shrink/10_basic.yaml | 2 + .../hamcrest/ElasticsearchAssertions.java | 12 + 47 files changed, 1282 insertions(+), 121 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java create mode 100644 core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java create mode 100644 core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java create mode 100644 core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java create mode 100644 core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java diff --git a/core/src/main/java/org/elasticsearch/action/ActionListener.java b/core/src/main/java/org/elasticsearch/action/ActionListener.java index e7d5ecd8d648e..6ce8592879e08 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListener.java @@ -19,6 +19,8 @@ package org.elasticsearch.action; +import java.util.function.Consumer; + /** * A listener for action responses or failures. */ @@ -33,4 +35,31 @@ public interface ActionListener { * A failure caused by an exception at some phase of the task. */ void onFailure(Exception e); + + /** + * Creates a listener that listens for a response (or failure) and executes the + * corresponding consumer when the response (or failure) is received. + * + * @param onResponse the consumer of the response, when the listener receives one + * @param onFailure the consumer of the failure, when the listener receives one + * @param the type of the response + * @return a listener that listens for responses and invokes the consumer when received + */ + static ActionListener wrap(Consumer onResponse, Consumer onFailure) { + return new ActionListener() { + @Override + public void onResponse(Response response) { + try { + onResponse.accept(response); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } + }; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 8d7e75b162610..7b3b2a0a2f0c6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final Set blocks = new HashSet<>(); + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { this.originalMessage = originalMessage; @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) { return this; } + public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + public TransportMessage originalMessage() { return originalMessage; } @@ -142,4 +150,8 @@ public Index shrinkFrom() { public boolean updateAllTypes() { return updateAllTypes; } + + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index b808484cef53d..38401fef18126 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest private boolean updateAllTypes = false; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + public CreateIndexRequest() { } @@ -440,6 +443,30 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) { return this; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + /** + * Sets the number of shard copies that should be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * wait for all shards (primary and all replicas) to be active before returning. + * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -462,6 +489,7 @@ public void readFrom(StreamInput in) throws IOException { aliases.add(Alias.read(in)); } updateAllTypes = in.readBoolean(); + waitForActiveShards = ActiveShardCount.readFrom(in); } @Override @@ -486,5 +514,6 @@ public void writeTo(StreamOutput out) throws IOException { alias.writeTo(out); } out.writeBoolean(updateAllTypes); + waitForActiveShards.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java index 2de6d4d57a051..1c930b8951065 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -249,4 +250,23 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) { request.updateAllTypes(updateAllTypes); return this; } + + /** + * Sets the number of shard copies that should be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * wait for all shards (primary and all replicas) to be active before returning. + * Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index b9282002349b3..35dd53276cd6d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; @@ -30,22 +31,41 @@ */ public class CreateIndexResponse extends AcknowledgedResponse { + private boolean shardsAcked; + protected CreateIndexResponse() { } - protected CreateIndexResponse(boolean acknowledged) { + protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked) { super(acknowledged); + assert acknowledged || shardsAcked == false; // if its not acknowledged, then shards acked should be false too + this.shardsAcked = shardsAcked; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); readAcknowledged(in); + shardsAcked = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); writeAcknowledged(out); + out.writeBoolean(shardsAcked); + } + + /** + * Returns true if the requisite number of shards were started before + * returning from the index creation operation. If {@link #isAcknowledged()} + * is false, then this also returns false. + */ + public boolean isShardsAcked() { + return shardsAcked; + } + + public void addCustomFields(XContentBuilder builder) throws IOException { + builder.field("shards_acknowledged", isShardsAcked()); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 444da8df082bf..d3ce1975e8917 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -31,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -77,24 +75,12 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .settings(request.settings()).mappings(request.mappings()) - .aliases(request.aliases()).customs(request.customs()); + .aliases(request.aliases()).customs(request.customs()) + .waitForActiveShards(request.waitForActiveShards()); - createIndexService.createIndex(updateRequest, new ActionListener() { - - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new CreateIndexResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof IndexAlreadyExistsException) { - logger.trace("[{}] failed to create", t, request.index()); - } else { - logger.debug("[{}] failed to create", t, request.index()); - } - listener.onFailure(t); - } - }); + createIndexService.createIndex(updateRequest, ActionListener.wrap(response -> + listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked())), + listener::onFailure)); } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java index 9e98418f18428..481a375492a8c 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -206,4 +207,22 @@ public void source(BytesReference source) { } } + /** + * Sets the number of shard copies that should be active for creation of the + * new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.createIndexRequest.waitForActiveShards(waitForActiveShards); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java index e9b4351fc5dea..edc7aaa92d6b9 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.rollover; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; @@ -70,4 +71,23 @@ public RolloverRequestBuilder mapping(String type, String source) { this.request.getCreateIndexRequest().mapping(type, source); return this; } + + /** + * Sets the number of shard copies that should be active for creation of the + * new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.request.setWaitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java index 0f254e825da3f..b495e3c6a0f32 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java @@ -39,22 +39,28 @@ public final class RolloverResponse extends ActionResponse implements ToXContent private static final String DRY_RUN = "dry_run"; private static final String ROLLED_OVER = "rolled_over"; private static final String CONDITIONS = "conditions"; + private static final String ACKNOWLEDGED = "acknowledged"; + private static final String SHARDS_ACKED = "shards_acknowledged"; private String oldIndex; private String newIndex; private Set> conditionStatus; private boolean dryRun; private boolean rolledOver; + private boolean acknowledged; + private boolean shardsAcked; RolloverResponse() { } RolloverResponse(String oldIndex, String newIndex, Set conditionResults, - boolean dryRun, boolean rolledOver) { + boolean dryRun, boolean rolledOver, boolean acknowledged, boolean shardsAcked) { this.oldIndex = oldIndex; this.newIndex = newIndex; this.dryRun = dryRun; this.rolledOver = rolledOver; + this.acknowledged = acknowledged; + this.shardsAcked = shardsAcked; this.conditionStatus = conditionResults.stream() .map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched)) .collect(Collectors.toSet()); @@ -89,12 +95,31 @@ public boolean isDryRun() { } /** - * Returns if the rollover was not simulated and the conditions were met + * Returns true if the rollover was not simulated and the conditions were met */ public boolean isRolledOver() { return rolledOver; } + /** + * Returns true if the creation of the new rollover index and switching of the + * alias to the newly created index was successful, and returns false otherwise. + * If {@link #isDryRun()} is true, then this will also return false. If this + * returns false, then {@link #isShardsAcked()} will also return false. + */ + public boolean isAcknowledged() { + return acknowledged; + } + + /** + * Returns true if the requisite number of shards were started in the newly + * created rollover index before returning. If {@link #isAcknowledged()} is + * false, then this will also return false. + */ + public boolean isShardsAcked() { + return shardsAcked; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -110,6 +135,8 @@ public void readFrom(StreamInput in) throws IOException { conditionStatus = conditions; dryRun = in.readBoolean(); rolledOver = in.readBoolean(); + acknowledged = in.readBoolean(); + shardsAcked = in.readBoolean(); } @Override @@ -124,6 +151,8 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBoolean(dryRun); out.writeBoolean(rolledOver); + out.writeBoolean(acknowledged); + out.writeBoolean(shardsAcked); } @Override @@ -132,6 +161,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(NEW_INDEX, newIndex); builder.field(ROLLED_OVER, rolledOver); builder.field(DRY_RUN, dryRun); + builder.field(ACKNOWLEDGED, acknowledged); + builder.field(SHARDS_ACKED, shardsAcked); builder.startObject(CONDITIONS); for (Map.Entry entry : conditionStatus) { builder.field(entry.getKey(), entry.getValue()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index af81e5cf8c908..d88cfe35f8f7e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -25,11 +25,12 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.AliasAction; @@ -58,6 +59,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction result.matched)) { - createIndexService.createIndex(prepareCreateIndexRequest(rolloverIndexName, rolloverRequest), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - // switch the alias to point to the newly created index - indexAliasesService.indicesAliases( - prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, - rolloverRequest), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { - listener.onResponse( - new RolloverResponse(sourceIndexName, rolloverIndexName, - conditionResults, false, true)); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } - - @Override - public void onFailure(Exception t) { - listener.onFailure(t); - } - }); + CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(rolloverIndexName, rolloverRequest); + createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> { + // switch the alias to point to the newly created index + indexAliasesService.indicesAliases( + prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, + rolloverRequest), + ActionListener.wrap(aliasClusterStateUpdateResponse -> { + if (aliasClusterStateUpdateResponse.isAcknowledged()) { + activeShardsObserver.waitForActiveShards(rolloverIndexName, + rolloverRequest.getCreateIndexRequest().waitForActiveShards(), + rolloverRequest.masterNodeTimeout(), + isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, + conditionResults, false, true, true, isShardsAcked)), + listener::onFailure); + } else { + listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, + false, true, false, false)); + } + }, listener::onFailure)); + }, listener::onFailure)); } else { // conditions not met listener.onResponse( - new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false) + new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false, false, false) ); } } @@ -216,6 +211,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Stri .masterNodeTimeout(createIndexRequest.masterNodeTimeout()) .settings(createIndexRequest.settings()) .aliases(createIndexRequest.aliases()) + .waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation .mappings(createIndexRequest.mappings()); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java index 791d35220e21e..9ba4acdefc60f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -36,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; -import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -126,6 +126,24 @@ public String getSourceIndex() { return sourceIndex; } + /** + * Sets the number of shard copies that should be active for creation of the + * new shrunken index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards); + } + public void source(BytesReference source) { XContentType xContentType = XContentFactory.xContentType(source); if (xContentType != null) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java index ab392a7f8249d..5ec1a5066ebdc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.shrink; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.settings.Settings; @@ -44,4 +45,23 @@ public ShrinkRequestBuilder setSettings(Settings settings) { this.request.getShrinkIndexRequest().settings(settings); return this; } + + /** + * Sets the number of shard copies that should be active for creation of the + * new shrunken index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will + * wait for one shard copy (the primary) to become active. Set this value to + * {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active + * before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * Index creation will only wait up until the timeout value for the number of shard copies + * to be active before returning. Check {@link ShrinkResponse#isShardsAcked()} to + * determine if the requisite shard copies were all started before returning or timing out. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public ShrinkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.request.setWaitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java index 4835471ae4c24..e7ad0afe3aa17 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java @@ -25,7 +25,7 @@ public final class ShrinkResponse extends CreateIndexResponse { ShrinkResponse() { } - ShrinkResponse(boolean acknowledged) { - super(acknowledged); + ShrinkResponse(boolean acknowledged, boolean shardsAcked) { + super(acknowledged, shardsAcked); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java index 57dc3e82e1877..4667f1e98255a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -40,7 +39,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -93,22 +91,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i); return shard == null ? null : shard.getPrimary().getDocs(); }, indexNameExpressionResolver); - createIndexService.createIndex(updateRequest, new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new ShrinkResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception t) { - if (t instanceof IndexAlreadyExistsException) { - logger.trace("[{}] failed to create shrink index", t, updateRequest.index()); - } else { - logger.debug("[{}] failed to create shrink index", t, updateRequest.index()); - } - listener.onFailure(t); - } - }); + createIndexService.createIndex(updateRequest, ActionListener.wrap(response -> + listener.onResponse(new ShrinkResponse(response.isAcknowledged(), response.isShardsAcked())), listener::onFailure)); } @Override @@ -162,6 +146,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Shri .settings(targetIndex.settings()) .aliases(targetIndex.aliases()) .customs(targetIndex.customs()) + .waitForActiveShards(targetIndex.waitForActiveShards()) .shrinkFrom(metaData.getIndex()); } diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java new file mode 100644 index 0000000000000..90bd0450afb81 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * A class whose instances represent a value for counting the number + * of active shard copies for a given shard in an index. + */ +public final class ActiveShardCount implements Writeable { + + private static final int ACTIVE_SHARD_COUNT_DEFAULT = -2; + private static final int ALL_ACTIVE_SHARDS = -1; + + public static final ActiveShardCount DEFAULT = new ActiveShardCount(ACTIVE_SHARD_COUNT_DEFAULT); + public static final ActiveShardCount ALL = new ActiveShardCount(ALL_ACTIVE_SHARDS); + public static final ActiveShardCount NONE = new ActiveShardCount(0); + public static final ActiveShardCount ONE = new ActiveShardCount(1); + + private final int value; + + private ActiveShardCount(final int value) { + this.value = value; + } + + /** + * Get an ActiveShardCount instance for the given value. The value is first validated to ensure + * it is a valid shard count and throws an IllegalArgumentException if validation fails. Valid + * values are any non-negative number. Directly use {@link ActiveShardCount#DEFAULT} for the + * default value (which is one shard copy) or {@link ActiveShardCount#ALL} to specify all the shards. + */ + public static ActiveShardCount from(final int value) { + if (value < 0) { + throw new IllegalArgumentException("shard count cannot be a negative value"); + } + return get(value); + } + + private static ActiveShardCount get(final int value) { + switch (validateValue(value)) { + case ACTIVE_SHARD_COUNT_DEFAULT: + return DEFAULT; + case ALL_ACTIVE_SHARDS: + return ALL; + case 1: + return ONE; + case 0: + return NONE; + default: + return new ActiveShardCount(value); + } + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeInt(value); + } + + public static ActiveShardCount readFrom(final StreamInput in) throws IOException { + return get(in.readInt()); + } + + private static int validateValue(final int value) { + if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) { + throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]"); + } + return value; + } + + /** + * Resolve this instance to an actual integer value for the number of active shard counts. + * If {@link ActiveShardCount#ALL} is specified, then the given {@link IndexMetaData} is + * used to determine what the actual active shard count should be. The default value indicates + * one active shard. + */ + public int resolve(final IndexMetaData indexMetaData) { + if (this == ActiveShardCount.DEFAULT) { + return 1; + } else if (this == ActiveShardCount.ALL) { + return indexMetaData.getNumberOfReplicas() + 1; + } else { + return value; + } + } + + /** + * Parses the active shard count from the given string. Valid values are "all" for + * all shard copies, null for the default value (which defaults to one shard copy), + * or a numeric value greater than or equal to 0. Any other input will throw an + * IllegalArgumentException. + */ + public static ActiveShardCount parseString(final String str) { + if (str == null) { + return ActiveShardCount.DEFAULT; + } else if (str.equals("all")) { + return ActiveShardCount.ALL; + } else { + int val; + try { + val = Integer.parseInt(str); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("cannot parse ActiveShardCount[" + str + "]", e); + } + return ActiveShardCount.from(val); + } + } + + /** + * Returns true iff the given cluster state's routing table contains enough active + * shards to meet the required shard count represented by this instance. + */ + public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) { + if (this == ActiveShardCount.NONE) { + // not waiting for any active shards + return true; + } + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + if (indexMetaData == null) { + // its possible the index was deleted while waiting for active shard copies, + // in this case, we'll just consider it that we have enough active shard copies + // and we can stop waiting + return true; + } + final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName); + assert indexRoutingTable != null; + if (indexRoutingTable.allPrimaryShardsActive() == false) { + // all primary shards aren't active yet + return false; + } + for (final IntObjectCursor shardRouting : indexRoutingTable.getShards()) { + if (enoughShardsActive(shardRouting.value, indexMetaData) == false) { + // not enough active shard copies yet + return false; + } + } + return true; + } + + /** + * Returns true iff the active shard count in the shard routing table is enough + * to meet the required shard count represented by this instance. + */ + public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { + if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) { + // not enough active shard copies yet + return false; + } + return true; + } + + @Override + public int hashCode() { + return Integer.hashCode(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + @SuppressWarnings("unchecked") ActiveShardCount that = (ActiveShardCount) o; + return value == that.value; + } + + @Override + public String toString() { + final String valStr; + switch (value) { + case ALL_ACTIVE_SHARDS: + valStr = "ALL"; + break; + case ACTIVE_SHARD_COUNT_DEFAULT: + valStr = "DEFAULT"; + break; + default: + valStr = Integer.toString(value); + } + return "ActiveShardCount[" + valStr + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java new file mode 100644 index 0000000000000..7217961d899c4 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardsObserver.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.function.Consumer; + +/** + * This class provides primitives for waiting for a configured number of shards + * to become active before sending a response on an {@link ActionListener}. + */ +public class ActiveShardsObserver extends AbstractComponent { + + private final ClusterService clusterService; + private final ThreadPool threadPool; + + public ActiveShardsObserver(final Settings settings, final ClusterService clusterService, final ThreadPool threadPool) { + super(settings); + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + /** + * Waits on the specified number of active shards to be started before executing the + * + * @param indexName the index to wait for active shards on + * @param activeShardCount the number of active shards to wait on before returning + * @param timeout the timeout value + * @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first) + * @param onFailure a function that is executed in response to an error occurring during waiting for the active shards + */ + public void waitForActiveShards(final String indexName, + final ActiveShardCount activeShardCount, + final TimeValue timeout, + final Consumer onResult, + final Consumer onFailure) { + + // wait for the configured number of active shards to be allocated before executing the result consumer + if (activeShardCount == ActiveShardCount.NONE) { + // not waiting, so just run whatever we were to run when the waiting is + onResult.accept(true); + return; + } + + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext()); + if (activeShardCount.enoughShardsActive(observer.observedState(), indexName)) { + onResult.accept(true); + } else { + final ClusterStateObserver.ChangePredicate shardsAllocatedPredicate = + new ClusterStateObserver.ValidationPredicate() { + @Override + protected boolean validate(final ClusterState newState) { + return activeShardCount.enoughShardsActive(newState, indexName); + } + }; + + final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + onResult.accept(true); + } + + @Override + public void onClusterServiceClose() { + logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName); + onFailure.accept(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + onResult.accept(false); + } + }; + + observer.waitForNextChange(observerListener, shardsAllocatedPredicate, timeout); + } + } + +} diff --git a/core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java b/core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java new file mode 100644 index 0000000000000..4f0e99ae558fe --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/ack/CreateIndexClusterStateUpdateResponse.java @@ -0,0 +1,40 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.ack; + +/** + * A cluster state update response with specific fields for index creation. + */ +public class CreateIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse { + + private final boolean shardsAcked; + + public CreateIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcked) { + super(acknowledged); + this.shardsAcked = shardsAcked; + } + + /** + * Returns whether the requisite number of shard copies started before the completion of the operation. + */ + public boolean isShardsAcked() { + return shardsAcked; + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 4f7aa68d7de79..bab03febaae87 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -27,9 +27,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; +import org.elasticsearch.cluster.ack.CreateIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -68,6 +70,7 @@ import org.elasticsearch.indices.IndexCreationException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.threadpool.ThreadPool; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -108,13 +111,15 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final Environment env; private final NodeServicesProvider nodeServicesProvider; private final IndexScopedSettings indexScopedSettings; - + private final ActiveShardsObserver activeShardsObserver; @Inject public MetaDataCreateIndexService(Settings settings, ClusterService clusterService, IndicesService indicesService, AllocationService allocationService, AliasValidator aliasValidator, - Set indexTemplateFilters, Environment env, NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings) { + Set indexTemplateFilters, Environment env, + NodeServicesProvider nodeServicesProvider, IndexScopedSettings indexScopedSettings, + ThreadPool threadPool) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; @@ -135,6 +140,7 @@ public MetaDataCreateIndexService(Settings settings, ClusterService clusterServi } this.indexTemplateFilter = new IndexTemplateFilter.Compound(templateFilters); } + this.activeShardsObserver = new ActiveShardsObserver(settings, clusterService, threadPool); } public void validateIndexName(String index, ClusterState state) { @@ -176,7 +182,38 @@ public void validateIndexName(String index, ClusterState state) { } } - public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener listener) { + /** + * Creates an index in the cluster state and waits for the specified number of shard copies to + * become active (as specified in {@link CreateIndexClusterStateUpdateRequest#waitForActiveShards()}) + * before sending the response on the listener. If the index creation was successfully applied on + * the cluster state, then {@link CreateIndexClusterStateUpdateResponse#isAcknowledged()} will return + * true, otherwise it will return false and no waiting will occur for started shards + * ({@link CreateIndexClusterStateUpdateResponse#isShardsAcked()} will also be false). If the index + * creation in the cluster state was successful and the requisite shard copies were started before + * the timeout, then {@link CreateIndexClusterStateUpdateResponse#isShardsAcked()} will + * return true, otherwise if the operation timed out, then it will return false. + * + * @param request the index creation cluster state update request + * @param listener the listener on which to send the index creation cluster state update response + */ + public void createIndex(final CreateIndexClusterStateUpdateRequest request, + final ActionListener listener) { + onlyCreateIndex(request, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + activeShardsObserver.waitForActiveShards(request.index(), request.waitForActiveShards(), request.ackTimeout(), + shardsAcked -> { + logger.debug("[{}] index created, but the operation timed out while waiting for " + + "enough shards to be started.", request.index()); + listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcked)); + }, listener::onFailure); + } else { + listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false)); + } + }, listener::onFailure)); + } + + private void onlyCreateIndex(final CreateIndexClusterStateUpdateRequest request, + final ActionListener listener) { Settings.Builder updatedSettingsBuilder = Settings.builder(); updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); indexScopedSettings.validate(updatedSettingsBuilder); @@ -308,6 +345,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); + if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) { + throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() + + "]: cannot be greater than number of shard copies [" + + (tmpImd.getNumberOfReplicas() + 1) + "]"); + } // create the index here (on the master) to validate it can be created, as well as adding the mapping final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList()); createdIndex = indexService.index(); @@ -408,6 +450,16 @@ public ClusterState execute(ClusterState currentState) throws Exception { } } } + + @Override + public void onFailure(String source, Exception e) { + if (e instanceof IndexAlreadyExistsException) { + logger.trace("[{}] failed to create", e, request.index()); + } else { + logger.debug("[{}] failed to create", e, request.index()); + } + super.onFailure(source, e); + } }); } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java index a922edff484a2..021eff9ea684b 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; @@ -51,6 +52,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel, rolloverIndexRequest.dryRun(request.paramAsBoolean("dry_run", false)); rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout())); rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout())); + rolloverIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); client.admin().indices().rolloverIndex(rolloverIndexRequest, new RestToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java index 96b4abbe06331..11285238703ca 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java @@ -21,14 +21,19 @@ import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; +import java.io.IOException; + /** * */ @@ -56,6 +61,12 @@ public void handleRequest(final RestRequest request, final RestChannel channel, } shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout())); shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout())); - client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<>(channel)); + shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); + client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener(channel) { + @Override + public void addCustomFields(XContentBuilder builder, ShrinkResponse response) throws IOException { + response.addCustomFields(builder); + } + }); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java index 46ee596286e20..bc45093e4bc8a 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java @@ -22,14 +22,18 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; +import java.io.IOException; + /** * */ @@ -52,6 +56,12 @@ public void handleRequest(final RestRequest request, final RestChannel channel, createIndexRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false)); createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout())); createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout())); - client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener(channel)); + createIndexRequest.waitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); + client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener(channel) { + @Override + public void addCustomFields(XContentBuilder builder, CreateIndexResponse response) throws IOException { + response.addCustomFields(builder); + } + }); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index eff436d3a0eca..3b26345148775 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -29,10 +30,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; -import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -54,11 +55,11 @@ public void testDelayShards() throws Exception { }); logger.info("--> creating 'test' index"); - prepareCreate("test").setSettings(Settings.builder() + assertAcked(prepareCreate("test").setSettings(Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1m") .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).get(); - ensureGreen("test"); + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)) + .setWaitForActiveShards(ActiveShardCount.ALL).get()); logger.info("--> stopping a random node"); assertTrue(internalCluster().stopRandomDataNode()); @@ -89,6 +90,7 @@ public void testUnassignedShards() throws Exception { .setSettings(Settings.builder() .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)) + .setWaitForActiveShards(ActiveShardCount.ALL) // wait on all shards .get(); client().admin().indices().prepareCreate("only-baz") @@ -96,6 +98,7 @@ public void testUnassignedShards() throws Exception { .put("index.routing.allocation.include.bar", "baz") .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)) + .setWaitForActiveShards(ActiveShardCount.ALL) .get(); client().admin().indices().prepareCreate("only-foo") @@ -105,9 +108,6 @@ public void testUnassignedShards() throws Exception { .put("index.number_of_replicas", 1)) .get(); - ensureGreen("anywhere", "only-baz"); - ensureYellow("only-foo"); - ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain() .setIndex("only-foo") .setShard(0) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 428e859e34221..7231bee0bef12 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; @@ -289,7 +290,7 @@ public void testMappingConflictRootCause() throws Exception { public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get(); - client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get(); + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(indexSettings()).get(); internalCluster().fullRestart(); ensureGreen("test"); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 7d163630afbca..8a4a62f97287b 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -166,6 +167,8 @@ public void testCreateIndexRequest() throws Exception { String alias = randomAsciiOfLength(10); String rolloverIndex = randomAsciiOfLength(10); final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAsciiOfLength(10)); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + rolloverRequest.setWaitForActiveShards(activeShardCount); final Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java index 3236de4aaeb0d..3fcade05839f0 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexWriter; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -130,6 +131,8 @@ public void testShrinkIndexSettings() { int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards(); DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000)); ShrinkRequest target = new ShrinkRequest("target", indexName); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + target.setWaitForActiveShards(activeShardCount); CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( target, clusterState, (i) -> stats, new IndexNameExpressionResolver(Settings.EMPTY)); @@ -137,6 +140,7 @@ public void testShrinkIndexSettings() { assertEquals(indexName, request.shrinkFrom().getName()); assertEquals("1", request.settings().get("index.number_of_shards")); assertEquals("shrink_index", request.cause()); + assertEquals(request.waitForActiveShards(), activeShardCount); } private DiscoveryNode newNode(String nodeId) { diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java index 58784fdb7dfba..8493c58729daa 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/template/put/MetaDataIndexTemplateServiceTests.java @@ -160,7 +160,7 @@ private static List putTemplate(PutRequest request) { null, new HashSet<>(), null, - null, null); + null, null, null); MetaDataIndexTemplateService service = new MetaDataIndexTemplateService(Settings.EMPTY, null, createIndexService, new AliasValidator(Settings.EMPTY), null, null); final List throwables = new ArrayList<>(); @@ -191,6 +191,7 @@ private List putTemplateDetail(PutRequest request) throws Exception { new HashSet<>(), null, nodeServicesProvider, + null, null); MetaDataIndexTemplateService service = new MetaDataIndexTemplateService( Settings.EMPTY, clusterService, createIndexService, new AliasValidator(Settings.EMPTY), indicesService, nodeServicesProvider); diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java new file mode 100644 index 0000000000000..83f0b1332c7da --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java @@ -0,0 +1,305 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests for the {@link ActiveShardCount} class + */ +public class ActiveShardCountTests extends ESTestCase { + + public void testFromIntValue() { + assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE); + final int value = randomIntBetween(1, 50); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1))); + } + + public void testResolve() { + // one shard + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); + final int value = randomIntBetween(2, 20); + assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); + + // more than one shard + final int numNewShards = randomIntBetween(1, 20); + indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(numNewShards) + .build(); + assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1)); + assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); + assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); + } + + public void testSerialization() throws IOException { + doWriteRead(ActiveShardCount.ALL); + doWriteRead(ActiveShardCount.DEFAULT); + doWriteRead(ActiveShardCount.NONE); + doWriteRead(ActiveShardCount.from(randomIntBetween(1, 50))); + } + + public void testParseString() { + assertSame(ActiveShardCount.parseString("all"), ActiveShardCount.ALL); + assertSame(ActiveShardCount.parseString(null), ActiveShardCount.DEFAULT); + assertSame(ActiveShardCount.parseString("0"), ActiveShardCount.NONE); + int value = randomIntBetween(1, 50); + assertEquals(ActiveShardCount.parseString(value + ""), ActiveShardCount.from(value)); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomAsciiOfLengthBetween(4, 8))); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-1")); // magic numbers not exposed through API + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-2")); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + "")); + } + + private void doWriteRead(ActiveShardCount activeShardCount) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + activeShardCount.writeTo(out); + final ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes)); + ActiveShardCount readActiveShardCount = ActiveShardCount.readFrom(in); + if (activeShardCount == ActiveShardCount.DEFAULT + || activeShardCount == ActiveShardCount.ALL + || activeShardCount == ActiveShardCount.NONE) { + assertSame(activeShardCount, readActiveShardCount); + } else { + assertEquals(activeShardCount, readActiveShardCount); + } + } + + public void testEnoughShardsActiveZero() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0); + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + public void testEnoughShardsActiveLevelOne() { + runTestForOneActiveShard(ActiveShardCount.ONE); + } + + public void testEnoughShardsActiveLevelDefault() { + // default is 1 + runTestForOneActiveShard(ActiveShardCount.DEFAULT); + } + + public void testEnoughShardsActiveRandom() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas)); + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + public void testEnoughShardsActiveLevelAll() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + // both values should represent "all" + final ActiveShardCount waitForActiveShards = randomBoolean() ? ActiveShardCount.from(numberOfReplicas + 1) : ActiveShardCount.ALL; + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + assert activeShardCount == ActiveShardCount.ONE || activeShardCount == ActiveShardCount.DEFAULT; + final ActiveShardCount waitForActiveShards = activeShardCount; + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startPrimaries(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); + } + + private ClusterState initializeWithNewIndex(final String indexName, final int numShards, final int numReplicas) { + // initial index creation and new routing table info + final IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())) + .numberOfShards(numShards) + .numberOfReplicas(numReplicas) + .build(); + final MetaData metaData = MetaData.builder().put(indexMetaData, true).build(); + final RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetaData).build(); + return ClusterState.builder(new ClusterName("test_cluster")).metaData(metaData).routingTable(routingTable).build(); + } + + private ClusterState startPrimaries(final ClusterState clusterState, final String indexName) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, + final ActiveShardCount waitForActiveShards) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + assert shardRoutingTable.getSize() > 2; + // want less than half, and primary is already started + int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (numToStart > 0) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + numToStart--; + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, + final ActiveShardCount waitForActiveShards) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + assert shardRoutingTable.getSize() > 2; + int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (shardRouting.active() == false) { + if (numToStart > 0) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + numToStart--; + } + } else { + numToStart--; + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startAllShards(final ClusterState clusterState, final String indexName) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (shardRouting.active() == false) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + +} diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java new file mode 100644 index 0000000000000..ef371188a58e5 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +/** + * Tests that the index creation operation waits for the appropriate + * number of active shards to be started before returning. + */ +public class ActiveShardsObserverIT extends ESIntegTestCase { + + public void testCreateIndexNoActiveShardsTimesOut() throws Exception { + final String indexName = "test-idx"; + Settings.Builder settingsBuilder = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + if (internalCluster().getNodeNames().length > 0) { + String exclude = String.join(",", internalCluster().getNodeNames()); + settingsBuilder.put("index.routing.allocation.exclude._name", exclude); + } + Settings settings = settingsBuilder.build(); + assertFalse(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(randomBoolean() ? ActiveShardCount.from(1) : ActiveShardCount.ALL) + .setTimeout("100ms") + .get() + .isShardsAcked()); + } + + public void testCreateIndexNoActiveShardsNoWaiting() throws Exception { + final String indexName = "test-idx"; + Settings.Builder settingsBuilder = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + if (internalCluster().getNodeNames().length > 0) { + String exclude = String.join(",", internalCluster().getNodeNames()); + settingsBuilder.put("index.routing.allocation.exclude._name", exclude); + } + Settings settings = settingsBuilder.build(); + CreateIndexResponse response = prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.from(0)) + .get(); + assertTrue(response.isAcknowledged()); + } + + public void testCreateIndexNotEnoughActiveShardsTimesOut() throws Exception { + final String indexName = "test-idx"; + final int numDataNodes = internalCluster().numDataNodes(); + final int numReplicas = numDataNodes + randomInt(4); + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertFalse(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(numDataNodes + 1, numReplicas + 1))) + .setTimeout("100ms") + .get() + .isShardsAcked()); + } + + public void testCreateIndexEnoughActiveShards() throws Exception { + final String indexName = "test-idx"; + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3)) + .build(); + ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes())); + assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get()); + } + + public void testCreateIndexWaitsForAllActiveShards() throws Exception { + final String indexName = "test-idx"; + // not enough data nodes, index creation times out + final int numReplicas = internalCluster().numDataNodes() + randomInt(4); + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertFalse(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.ALL) + .setTimeout("100ms") + .get() + .isShardsAcked()); + if (client().admin().indices().prepareExists(indexName).get().isExists()) { + assertAcked(client().admin().indices().prepareDelete(indexName)); + } + + // enough data nodes, all shards are active + settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1) + .build(); + assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(ActiveShardCount.ALL).get()); + } + + public void testCreateIndexStopsWaitingWhenIndexDeleted() throws Exception { + final String indexName = "test-idx"; + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1) + .build(); + + logger.info("--> start the index creation process"); + ListenableActionFuture responseListener = + prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.ALL) + .execute(); + + logger.info("--> wait until the cluster state contains the new index"); + assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().metaData().hasIndex(indexName))); + + logger.info("--> delete the index"); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + logger.info("--> ensure the create index request completes"); + assertAcked(responseListener.get()); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java index 43aa088a3b945..f411e00468eb7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -40,7 +41,7 @@ public class SimpleDataNodesIT extends ESIntegTestCase { public void testDataNodes() throws Exception { internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); - client().admin().indices().create(createIndexRequest("test")).actionGet(); + client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet(); try { client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); fail("no allocation should happen"); diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index c86535e40c59c..70af580824118 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -91,7 +92,7 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception { final String node_2 = nodesIds.get(1); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); - client().admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE) .setSettings(Settings.builder().put("index.number_of_shards", 1)) .execute().actionGet(); @@ -203,7 +204,7 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc assertThat(healthResponse.isTimedOut(), equalTo(false)); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); - client().admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE) .setSettings(Settings.builder().put("index.number_of_shards", 1)) .execute().actionGet(); @@ -253,14 +254,13 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.INITIALIZING)); - healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - logger.info("--> get the state, verify shard 1 primary allocated"); - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); - assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); - + final String nodeToCheck = node_1; + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + String nodeId = clusterState.nodes().resolveNode(nodeToCheck).getId(); + assertThat(clusterState.getRoutingNodes().node(nodeId).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); + }); } public void testRerouteExplain() { diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java index ff31a72a5c6d9..f6f7aaf322800 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexServiceTests.java @@ -216,6 +216,7 @@ private MetaDataCreateIndexService getCreateIndexService() { new HashSet<>(), null, null, + null, null); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index f990f382c8c50..f267af66dc643 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; @@ -108,7 +109,7 @@ private void createStaleReplicaScenario() throws Exception { logger.info("--> check that old primary shard does not get promoted to primary again"); // kick reroute and wait for all shard states to be fetched client(master).admin().cluster().prepareReroute().get(); - assertBusy(new Runnable() { + assertBusy(new Runnable() { @Override public void run() { assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)); @@ -157,7 +158,8 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { createStaleReplicaScenario(); logger.info("--> explicitly promote old primary shard"); - ImmutableOpenIntMap> storeStatuses = client().admin().indices().prepareShardStores("test").get().getStoreStatuses().get("test"); + final String idxName = "test"; + ImmutableOpenIntMap> storeStatuses = client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName); ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute(); for (IntObjectCursor> shardStoreStatuses : storeStatuses) { int shardId = shardStoreStatuses.key; @@ -165,22 +167,30 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { logger.info("--> adding allocation command for shard {}", shardId); // force allocation based on node id if (useStaleReplica) { - rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); + rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } else { - rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); + rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } } rerouteBuilder.get(); logger.info("--> check that the stale primary shard gets allocated and that documents are available"); - ensureYellow("test"); + ensureYellow(idxName); + + if (useStaleReplica == false) { + // When invoking AllocateEmptyPrimaryAllocationCommand, due to the UnassignedInfo.Reason being changed to INDEX_CREATION, + // its possible that the shard has not completed initialization, even though the cluster health is yellow, so the + // search can throw an "all shards failed" exception. We will wait until the shard initialization has completed before + // verifying the search hit count. + assertBusy(() -> assertTrue(clusterService().state().routingTable().index(idxName).allPrimaryShardsActive())); - assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); + } + assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); } public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException { String node = internalCluster().startNode(); - client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() .put("index.routing.allocation.exclude._name", node) .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index df45b422843e7..0854d27e208f4 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -34,7 +34,6 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -351,7 +350,7 @@ public void testAllocationStatusSerialization() throws IOException { for (AllocationStatus allocationStatus : AllocationStatus.values()) { BytesStreamOutput out = new BytesStreamOutput(); allocationStatus.writeTo(out); - ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytes())); + ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytesRef().bytes)); AllocationStatus readStatus = AllocationStatus.readFrom(in); assertThat(readStatus, equalTo(allocationStatus)); } diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index f86b56f105207..eaf9a351824af 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; @@ -205,7 +206,7 @@ public void testJustMasterNode() throws Exception { internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); logger.info("--> create an index"); - client().admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).execute().actionGet(); logger.info("--> closing master node"); internalCluster().closeNonSharedNodes(false); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 09441f7011081..1a8caaa351481 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -158,7 +158,7 @@ public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) { allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, nodeServicesProvider); MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(settings, clusterService, indicesService, allocationService, new AliasValidator(settings), Collections.emptySet(), environment, - nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); + nodeServicesProvider, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, threadPool); transportCloseIndexAction = new TransportCloseIndexAction(settings, transportService, clusterService, threadPool, indexStateService, clusterSettings, actionFilters, indexNameExpressionResolver, destructiveOperations); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index a26cf9124f2d9..c2ccb9cd4ab2a 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -140,7 +141,7 @@ public ClusterState randomlyUpdateClusterState(ClusterState state, CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) .put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)) - .build()); + .build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); } diff --git a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 5f7f26cd38cb4..7d950a73837cc 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; @@ -213,7 +214,7 @@ private void assertFlushResponseEqualsShardStats(ShardStats[] shardsStats, List< public void testUnallocatedShardsDoesNotHang() throws InterruptedException { // create an index but disallow allocation - prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); + prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); // this should not hang but instead immediately return with empty result set List shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test"); diff --git a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java index ea3ebf5179b48..66687ea74fa81 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -98,7 +99,7 @@ public void testSimpleOpenClose() { public void testFastCloseAfterCreateContinuesCreateAfterOpen() { logger.info("--> creating test index that cannot be allocated"); - client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() .put("index.routing.allocation.include.tag", "no_such_node").build()).get(); ClusterHealthResponse health = client().admin().cluster().prepareHealth("test").setWaitForNodes(">=2").get(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 9d571c02c90c8..9731e3ce065f1 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; @@ -45,6 +47,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -443,9 +446,9 @@ public void testRestoreIndexWithMissingShards() throws Exception { logger.info("--> create an index that will have no allocated shards"); assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6) .put("index.routing.allocation.include.tag", "nowhere") - .put("number_of_replicas", 0))); + .put("number_of_replicas", 0)).setWaitForActiveShards(ActiveShardCount.NONE).get()); + assertTrue(client().admin().indices().prepareExists("test-idx-none").get().isExists()); - logger.info("--> create repository"); logger.info("--> creating repository"); PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index fb52d0f634bac..e59f349548400 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -755,7 +756,7 @@ public void testUnallocatedShards() throws Exception { .put("location", randomRepoPath()))); logger.info("--> creating index that cannot be allocated"); - prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).get(); + prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).setWaitForActiveShards(ActiveShardCount.NONE).get(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json index bdac6d9a9ab89..b0267eae3a3c7 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json @@ -13,6 +13,10 @@ } }, "params": { + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for before the operation returns." + }, "timeout": { "type" : "time", "description" : "Explicit operation timeout" diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json index 218871a976554..97580182ea1bd 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.rollover.json @@ -25,6 +25,10 @@ "master_timeout": { "type" : "time", "description" : "Specify timeout for connection to master" + }, + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for on the newly created rollover index before the operation returns." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json index 633e9e160936b..5ef943eacba6c 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.shrink.json @@ -25,6 +25,10 @@ "master_timeout": { "type" : "time", "description" : "Specify timeout for connection to master" + }, + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for on the shrunken index before the operation returns." } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml index 17b86d647649e..3c6ad7a70519f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.create/10_basic.yaml @@ -30,6 +30,35 @@ - match: { test_index.settings.index.number_of_replicas: "0"} +--- +"Create index with too large wait_for_active_shards": + + - do: + indices.create: + index: test_index + timeout: 100ms + master_timeout: 100ms + wait_for_active_shards: 6 + body: + settings: + number_of_replicas: 5 + + - match: { shards_acknowledged: false } + +--- +"Create index with wait_for_active_shards set to all": + + - do: + indices.create: + index: test_index + wait_for_active_shards: all + body: + settings: + number_of_replicas: "0" + + - match: { acknowledged: true } + - match: { shards_acknowledged: true } + --- "Create index with aliases": diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml index 3a4821193e640..0152903b5fa72 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.rollover/10_basic.yaml @@ -4,6 +4,7 @@ - do: indices.create: index: logs-1 + wait_for_active_shards: 1 body: aliases: logs_index: {} @@ -30,11 +31,12 @@ # perform alias rollover - do: - indices.rollover: - alias: "logs_search" - body: - conditions: - max_docs: 1 + indices.rollover: + alias: "logs_search" + wait_for_active_shards: 1 + body: + conditions: + max_docs: 1 - match: { old_index: logs-1 } - match: { new_index: logs-2 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml index b1aac4952c402..cdfdaa806847f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.shrink/10_basic.yaml @@ -6,6 +6,7 @@ - do: indices.create: index: source + wait_for_active_shards: 1 body: settings: number_of_replicas: "0" @@ -53,6 +54,7 @@ indices.shrink: index: "source" target: "target" + wait_for_active_shards: 1 body: settings: index.number_of_replicas: 0 diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 24af2b72bd176..34653a69b3dd2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; @@ -123,6 +124,17 @@ public static void assertAcked(DeleteIndexResponse response) { assertVersionSerializable(response); } + /** + * Assert that an index creation was fully acknowledged, meaning that both the index creation cluster + * state update was successful and that the requisite number of shard copies were started before returning. + */ + public static void assertAcked(CreateIndexResponse response) { + assertThat(response.getClass().getSimpleName() + " failed - not acked", response.isAcknowledged(), equalTo(true)); + assertVersionSerializable(response); + assertTrue(response.getClass().getSimpleName() + " failed - index creation acked but not all shards were started", + response.isShardsAcked()); + } + /** * Executes the request and fails if the request has not been blocked. *