From 9ef0deeb4db78487798c83b131a981377426e915 Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Mon, 4 Jul 2016 00:07:06 -0400 Subject: [PATCH] Index creation waits for active shards before returning Before returning, index creation now waits for the configured number of shard copies to be started. In the past, a client would create an index and then potentially have to check the cluster health to wait to execute write operations. With the cluster health semantics changing so that index creation does not cause the cluster health to go RED, this change enables waiting for the desired number of active shards to be active before returning from index creation. Relates #9126 --- .../CreateIndexClusterStateUpdateRequest.java | 12 + .../indices/create/CreateIndexRequest.java | 26 ++ .../create/CreateIndexRequestBuilder.java | 17 + .../indices/create/CreateIndexResponse.java | 24 +- .../create/TransportCreateIndexAction.java | 34 +- .../indices/rollover/RolloverRequest.java | 16 + .../indices/rollover/RolloverResponse.java | 18 +- .../rollover/TransportRolloverAction.java | 57 ++-- .../admin/indices/shrink/ShrinkRequest.java | 17 +- .../admin/indices/shrink/ShrinkResponse.java | 4 +- .../indices/shrink/TransportShrinkAction.java | 30 +- .../action/delete/TransportDeleteAction.java | 5 +- .../action/index/TransportIndexAction.java | 2 +- .../action/support/ActiveShardCount.java | 221 +++++++++++++ .../support/WaitForActiveShardsMonitor.java | 111 +++++++ .../action/update/TransportUpdateAction.java | 5 +- .../metadata/MetaDataCreateIndexService.java | 6 +- .../common/settings/IndexScopedSettings.java | 2 + .../elasticsearch/index/IndexSettings.java | 12 + .../indices/RestRolloverIndexAction.java | 2 + .../admin/indices/RestShrinkIndexAction.java | 13 +- .../indices/create/RestCreateIndexAction.java | 12 +- .../ClusterAllocationExplainIT.java | 11 +- .../admin/indices/create/CreateIndexIT.java | 6 +- .../indices/create/CreateIndexWaitsIT.java | 129 ++++++++ .../TransportRolloverActionTests.java | 4 + .../shrink/TransportShrinkActionTests.java | 4 + .../action/support/ActiveShardCountTests.java | 294 ++++++++++++++++++ .../cluster/SimpleDataNodesIT.java | 3 +- .../cluster/allocation/ClusterRerouteIT.java | 18 +- .../cluster/routing/PrimaryAllocationIT.java | 24 +- .../gateway/GatewayIndexStateIT.java | 3 +- ...ClusterStateServiceRandomUpdatesTests.java | 3 +- .../elasticsearch/indices/flush/FlushIT.java | 3 +- .../indices/state/SimpleIndexStateIT.java | 3 +- .../DedicatedClusterSnapshotRestoreIT.java | 12 +- .../SharedClusterSnapshotRestoreIT.java | 3 +- .../rest-api-spec/api/indices.create.json | 4 + 38 files changed, 1098 insertions(+), 72 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java create mode 100644 core/src/main/java/org/elasticsearch/action/support/WaitForActiveShardsMonitor.java create mode 100644 core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexWaitsIT.java create mode 100644 core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java index 8d7e75b162610..7b3b2a0a2f0c6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexClusterStateUpdateRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ private final Set blocks = new HashSet<>(); + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) { this.originalMessage = originalMessage; @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) { return this; } + public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + public TransportMessage originalMessage() { return originalMessage; } @@ -142,4 +150,8 @@ public Index shrinkFrom() { public boolean updateAllTypes() { return updateAllTypes; } + + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index 43d45672aec00..b70c85c85b42b 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest private boolean updateAllTypes = false; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; + public CreateIndexRequest() { } @@ -440,6 +443,27 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) { return this; } + public ActiveShardCount waitForActiveShards() { + return waitForActiveShards; + } + + /** + * Sets the number of shard copies that must be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * ensure all shards (primary and all replicas) are active before proceeding with the + * write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; + return this; + } + + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -462,6 +486,7 @@ public void readFrom(StreamInput in) throws IOException { aliases.add(Alias.read(in)); } updateAllTypes = in.readBoolean(); + waitForActiveShards = ActiveShardCount.readFrom(in); } @Override @@ -486,5 +511,6 @@ public void writeTo(StreamOutput out) throws IOException { alias.writeTo(out); } out.writeBoolean(updateAllTypes); + waitForActiveShards.writeTo(out); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java index 2de6d4d57a051..6786c5f1e65c7 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.create; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -249,4 +250,20 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) { request.updateAllTypes(updateAllTypes); return this; } + + /** + * Sets the number of shard copies that must be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * ensure all shards (primary and all replicas) are active before proceeding with the + * write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); + return this; + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java index b9282002349b3..5e0903bdc9516 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexResponse.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; @@ -30,22 +31,43 @@ */ public class CreateIndexResponse extends AcknowledgedResponse { + private boolean timedOutWaitingForShards; + protected CreateIndexResponse() { } - protected CreateIndexResponse(boolean acknowledged) { + protected CreateIndexResponse(boolean acknowledged, boolean timedOutWaitingForShards) { super(acknowledged); + this.timedOutWaitingForShards = timedOutWaitingForShards; } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); readAcknowledged(in); + timedOutWaitingForShards = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); writeAcknowledged(out); + out.writeBoolean(timedOutWaitingForShards); + } + + /** + * Returns true if the request timed out waiting for the required number + * of active shards to be started. If the index was not successfully + * created in the first place, then this value is meaningless. + */ + public boolean isTimedOutWaitingForShards() { + return timedOutWaitingForShards; + } + + /** + * Adds custom fields to the AcknowledgedResponse's x-content. + */ + public void addCustomFields(XContentBuilder builder, CreateIndexResponse response) throws IOException { + builder.field("timed_out_waiting_for_shards", response.isTimedOutWaitingForShards()); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index 7b47a46a236c2..6e5df96606e2f 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.WaitForActiveShardsMonitor; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; @@ -41,6 +42,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction { private final MetaDataCreateIndexService createIndexService; + private final WaitForActiveShardsMonitor shardsMonitor; @Inject public TransportCreateIndexAction(Settings settings, TransportService transportService, ClusterService clusterService, @@ -48,6 +50,7 @@ public TransportCreateIndexAction(Settings settings, TransportService transportS ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, CreateIndexRequest::new); this.createIndexService = createIndexService; + this.shardsMonitor = new WaitForActiveShardsMonitor(settings, clusterService, threadPool); } @Override @@ -77,24 +80,47 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .settings(request.settings()).mappings(request.mappings()) - .aliases(request.aliases()).customs(request.customs()); + .aliases(request.aliases()).customs(request.customs()) + .waitForActiveShards(request.waitForActiveShards()); createIndexService.createIndex(updateRequest, new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new CreateIndexResponse(response.isAcknowledged())); + if (response.isAcknowledged()) { + // the cluster state that includes the newly created index has been acknowledged, + // now wait for the configured number of active shards to be allocated before returning, + // as that is when indexing operations can take place on the newly created index + try { + shardsMonitor.waitOnShards(indexName, request.waitForActiveShards(), request.masterNodeTimeout(), listener, + (timedOut) -> { + if (timedOut) { + logger.debug("[{}] index created, but the operation timed out while waiting for " + + "enough shards to be started.", indexName); + } + listener.onResponse(new CreateIndexResponse(true, timedOut)); + }); + + } catch (Exception ex) { + logger.debug("[{}] index creation failed on waiting for shards", indexName); + listener.onFailure(ex); + } + } else { + listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), true)); + } } @Override public void onFailure(Throwable t) { if (t instanceof IndexAlreadyExistsException) { - logger.trace("[{}] failed to create", t, request.index()); + logger.trace("[{}] failed to create", t, indexName); } else { - logger.debug("[{}] failed to create", t, request.index()); + logger.debug("[{}] failed to create", t, indexName); } listener.onFailure(t); } }); } + + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java index 9e98418f18428..710f8cc8fa0de 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -206,4 +207,19 @@ public void source(BytesReference source) { } } + /** + * Sets the number of shard copies that must be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * ensure all shards (primary and all replicas) are active before proceeding with the + * write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.createIndexRequest.waitForActiveShards(waitForActiveShards); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java index 0f254e825da3f..f3dfc12793471 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponse.java @@ -39,22 +39,25 @@ public final class RolloverResponse extends ActionResponse implements ToXContent private static final String DRY_RUN = "dry_run"; private static final String ROLLED_OVER = "rolled_over"; private static final String CONDITIONS = "conditions"; + private static final String TIMED_OUT_WAITING_FOR_SHARDS = "timed_out_waiting_for_shards"; private String oldIndex; private String newIndex; private Set> conditionStatus; private boolean dryRun; private boolean rolledOver; + private boolean timedOutWaitingForShards; RolloverResponse() { } RolloverResponse(String oldIndex, String newIndex, Set conditionResults, - boolean dryRun, boolean rolledOver) { + boolean dryRun, boolean rolledOver, boolean timedOutWaitingForShards) { this.oldIndex = oldIndex; this.newIndex = newIndex; this.dryRun = dryRun; this.rolledOver = rolledOver; + this.timedOutWaitingForShards = timedOutWaitingForShards; this.conditionStatus = conditionResults.stream() .map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched)) .collect(Collectors.toSet()); @@ -95,6 +98,16 @@ public boolean isRolledOver() { return rolledOver; } + /** + * Returns true if there was a timeout waiting for enough active shards + * on the newly created rollover index. Even if true, the rollover index + * itself could still be properly created, check {@link #isRolledOver()} to + * determine. + */ + public boolean isTimedOutWaitingForShards() { + return timedOutWaitingForShards; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -110,6 +123,7 @@ public void readFrom(StreamInput in) throws IOException { conditionStatus = conditions; dryRun = in.readBoolean(); rolledOver = in.readBoolean(); + timedOutWaitingForShards = in.readBoolean(); } @Override @@ -124,6 +138,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeBoolean(dryRun); out.writeBoolean(rolledOver); + out.writeBoolean(timedOutWaitingForShards); } @Override @@ -132,6 +147,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(NEW_INDEX, newIndex); builder.field(ROLLED_OVER, rolledOver); builder.field(DRY_RUN, dryRun); + builder.field(TIMED_OUT_WAITING_FOR_SHARDS, timedOutWaitingForShards); builder.startObject(CONDITIONS); for (Map.Entry entry : conditionStatus) { builder.field(entry.getKey(), entry.getValue()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java index 952448af5e7ab..a3a07fc5ca559 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -25,7 +25,9 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WaitForActiveShardsMonitor; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -43,7 +45,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.DocsStats; -import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -60,6 +61,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction result.matched)) { @@ -119,23 +122,36 @@ public void onResponse(IndicesStatsResponse statsResponse) { new ActionListener() { @Override public void onResponse(ClusterStateUpdateResponse response) { - // switch the alias to point to the newly created index - indexAliasesService.indicesAliases( - prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, - rolloverRequest), - new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { - listener.onResponse( - new RolloverResponse(sourceIndexName, rolloverIndexName, - conditionResults, false, true)); - } - - @Override - public void onFailure(Throwable e) { - listener.onFailure(e); - } - }); + // wait for the configured number of active shards to be allocated before returning, + // as that is when indexing operations can take place on the newly created index + try { + shardsMonitor.waitOnShards(rolloverIndexName, + rolloverRequest.getCreateIndexRequest().waitForActiveShards(), + rolloverRequest.masterNodeTimeout(), + listener, + (timedOut) -> { + // switch the alias to point to the newly created index + indexAliasesService.indicesAliases( + prepareRolloverAliasesUpdateRequest(sourceIndexName, rolloverIndexName, + rolloverRequest), + new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + listener.onResponse( + new RolloverResponse(sourceIndexName, rolloverIndexName, + conditionResults, false, true, timedOut)); + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + }); + } catch (Exception ex) { + logger.debug("[{}] rollover index failed on waiting for shards", rolloverIndexName); + listener.onFailure(ex); + } } @Override @@ -146,7 +162,7 @@ public void onFailure(Throwable t) { } else { // conditions not met listener.onResponse( - new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false) + new RolloverResponse(sourceIndexName, sourceIndexName, conditionResults, false, false, false) ); } } @@ -217,6 +233,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Stri .masterNodeTimeout(createIndexRequest.masterNodeTimeout()) .settings(createIndexRequest.settings()) .aliases(createIndexRequest.aliases()) + .waitForActiveShards(createIndexRequest.waitForActiveShards()) .mappings(createIndexRequest.mappings()); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java index 791d35220e21e..1deb2bf874ba6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.common.ParseField; @@ -36,7 +37,6 @@ import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; -import java.util.Map; import java.util.Objects; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -126,6 +126,21 @@ public String getSourceIndex() { return sourceIndex; } + /** + * Sets the number of shard copies that must be active for index creation to return. + * Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy + * (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to + * ensure all shards (primary and all replicas) are active before proceeding with the + * write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any + * non-negative integer, up to the number of copies per shard (number of replicas + 1), + * to wait for the desired amount of shard copies to become active before returning. + * + * @param waitForActiveShards number of active shard copies to wait on + */ + public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards); + } + public void source(BytesReference source) { XContentType xContentType = XContentFactory.xContentType(source); if (xContentType != null) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java index 4835471ae4c24..11f8eba63a5c8 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkResponse.java @@ -25,7 +25,7 @@ public final class ShrinkResponse extends CreateIndexResponse { ShrinkResponse() { } - ShrinkResponse(boolean acknowledged) { - super(acknowledged); + ShrinkResponse(boolean acknowledged, boolean timedOutWaitingForShards) { + super(acknowledged, timedOutWaitingForShards); } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java index a00dd23b76423..b3bbc9d3acb5a 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkAction.java @@ -26,6 +26,8 @@ import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.WaitForActiveShardsMonitor; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; @@ -54,6 +56,7 @@ public class TransportShrinkAction extends TransportMasterNodeAction() { @Override public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new ShrinkResponse(response.isAcknowledged())); + if (response.isAcknowledged()) { + // wait for the configured number of active shards to be allocated before returning, + // as that is when indexing operations can take place on the newly created index + final CreateIndexRequest createIndexRequest = shrinkRequest.getShrinkIndexRequest(); + try { + shardsMonitor.waitOnShards(createIndexRequest.index(), + createIndexRequest.waitForActiveShards(), + createIndexRequest.masterNodeTimeout(), + listener, + (timedOut) -> { + if (timedOut) { + logger.debug("[{}] shrink index created, but the operation timed out while waiting " + + "for enough shards to be started.", createIndexRequest.index()); + } + listener.onResponse(new ShrinkResponse(true, timedOut)); + }); + + } catch (Exception ex) { + logger.debug("[{}] shrink index failed on waiting for shards", createIndexRequest.index()); + listener.onFailure(ex); + } + } else { + listener.onResponse(new ShrinkResponse(response.isAcknowledged(), true)); + } } @Override @@ -162,6 +189,7 @@ static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final Shri .settings(targetIndex.settings()) .aliases(targetIndex.aliases()) .customs(targetIndex.customs()) + .waitForActiveShards(targetIndex.waitForActiveShards()) .shrinkFrom(metaData.getIndex()); } diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index beced23c338a8..6eadf0e7130f8 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -70,7 +70,10 @@ public TransportDeleteAction(Settings settings, TransportService transportServic protected void doExecute(Task task, final DeleteRequest request, final ActionListener listener) { ClusterState state = clusterService.state(); if (autoCreateIndex.shouldAutoCreate(request.index(), state)) { - createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest() + .index(request.index()).cause("auto(delete api)") + .masterNodeTimeout(request.timeout()); + createIndexAction.execute(task, createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { innerExecute(task, request, listener); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 00be64757ae26..d9c92e2f73d07 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -196,7 +196,7 @@ public static WriteResult executeIndexRequestOnPrimary(IndexReque assert request.versionType().validateVersionForWrites(request.version()); - IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), created); + IndexResponse response = new IndexResponse(shardId, request.type(), request.id(), request.version(), created); return new WriteResult<>(response, operation.getTranslogLocation()); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java new file mode 100644 index 0000000000000..a81b30d55cac7 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java @@ -0,0 +1,221 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexSettings; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.index.IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING; + +/** + * A class whose instances represent a value for counting the number + * of active shard copies for a given shard in an index. + */ +public final class ActiveShardCount implements Writeable { + + private static final int ACTIVE_SHARD_COUNT_DEFAULT = -2; + private static final int ALL_ACTIVE_SHARDS = -1; + + public static final ActiveShardCount DEFAULT = new ActiveShardCount(ACTIVE_SHARD_COUNT_DEFAULT); + public static final ActiveShardCount ALL = new ActiveShardCount(ALL_ACTIVE_SHARDS); + public static final ActiveShardCount NONE = new ActiveShardCount(0); + public static final ActiveShardCount ONE = new ActiveShardCount(1); + + private final int value; + + private ActiveShardCount(final int value) { + this.value = value; + } + + /** + * Get an ActiveShardCount instance for the given value. The value is first validated to ensure + * it is a valid shard count and throws an IllegalArgumentException if validation fails. Valid + * values are any non-negative number. Directly use {@link ActiveShardCount#DEFAULT} for the + * default value (which is half the shards) or {@link ActiveShardCount#ALL} to specify all the shards. + */ + public static ActiveShardCount from(final int value) { + if (value < 0) { + throw new IllegalArgumentException("shard count cannot be a negative value"); + } + return get(value); + } + + private static ActiveShardCount get(final int value) { + switch (validateValue(value)) { + case ACTIVE_SHARD_COUNT_DEFAULT: + return DEFAULT; + case ALL_ACTIVE_SHARDS: + return ALL; + case 0: + return NONE; + default: + return new ActiveShardCount(value); + } + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeInt(value); + } + + public static ActiveShardCount readFrom(final StreamInput in) throws IOException { + return get(in.readInt()); + } + + private static int validateValue(final int value) { + if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) { + throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]"); + } + return value; + } + + /** + * Resolve this instance to an actual integer value for the number of active shard counts. + * If {@link ActiveShardCount#ALL} are specified, then the given {@link IndexMetaData} is + * used to determine what the actual active shard count should be. The default value indicates + * one active shard. + */ + public int resolve(final IndexMetaData indexMetaData) { + Objects.requireNonNull(indexMetaData); + if (this == ActiveShardCount.DEFAULT) { + return 1; + } else if (this == ActiveShardCount.ALL) { + return indexMetaData.getNumberOfReplicas() + 1; + } else { + return value; + } + } + + /** + * Parses the active shard count from the given string. Valid values are "all" for + * all shard copies, null for the default value set by {@link IndexSettings#WAIT_FOR_ACTIVE_SHARDS_SETTING} + * (which defaults to half the shard copies, or a numeric value greater than or equal to 0. + * Any other input will throw an IllegalArgumentException. + */ + public static ActiveShardCount parseString(final String str) { + if (str == null) { + return ActiveShardCount.DEFAULT; + } else if (str.equals("all")) { + return ActiveShardCount.ALL; + } else { + int val; + try { + val = Integer.parseInt(str); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("cannot parse ActiveShardCount[" + str + "]"); + } + return ActiveShardCount.from(val); + } + } + + /** + * Returns true iff the given cluster state's routing table contains enough active + * shards to meet the required shard count represented by this instance. + */ + public boolean enoughShardsActive(final ClusterState clusterState, final String indexName, final Settings settings) { + Objects.requireNonNull(clusterState); + Objects.requireNonNull(indexName); + Objects.requireNonNull(settings); + if (this == ActiveShardCount.NONE) { + // not waiting for any active shards + return true; + } + final ActiveShardCount waitForActiveShards; + if (this == ActiveShardCount.DEFAULT) { + waitForActiveShards = WAIT_FOR_ACTIVE_SHARDS_SETTING.get(settings); + } else { + waitForActiveShards = this; + } + final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName); + assert indexRoutingTable != null; + if (indexRoutingTable.allPrimaryShardsActive() == false) { + // all primary shards aren't active yet + return false; + } + final IndexMetaData indexMetaData = clusterState.metaData().index(indexName); + assert indexMetaData != null; + for (final IntObjectCursor shardRouting : indexRoutingTable.getShards()) { + if (waitForActiveShards.checkWaitForActiveShards(shardRouting.value, indexMetaData) == false) { + // not enough active shard copies yet + return false; + } + } + return true; + } + + /** + * Returns true iff the active shard count in the shard routing table is enough + * to meet the required shard count represented by this instance. + */ + public boolean checkWaitForActiveShards(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { + Objects.requireNonNull(shardRoutingTable); + Objects.requireNonNull(indexMetaData); + if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) { + // not enough active shard copies yet + return false; + } + return true; + } + + @Override + public int hashCode() { + return Integer.hashCode(value); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + @SuppressWarnings("unchecked") ActiveShardCount that = (ActiveShardCount) o; + return value == that.value; + } + + @Override + public String toString() { + final String valStr; + switch (value) { + case ALL_ACTIVE_SHARDS: + valStr = "ALL"; + break; + case ACTIVE_SHARD_COUNT_DEFAULT: + valStr = "DEFAULT"; + break; + default: + valStr = Integer.toString(value); + } + return "ActiveShardCount[" + valStr + "]"; + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/support/WaitForActiveShardsMonitor.java b/core/src/main/java/org/elasticsearch/action/support/WaitForActiveShardsMonitor.java new file mode 100644 index 0000000000000..d9f9fcae99470 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/support/WaitForActiveShardsMonitor.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.function.Consumer; + +/** + * This class enables waiting for a configured number of shards + * to become active before proceeding with an action. + */ +public class WaitForActiveShardsMonitor extends AbstractComponent { + + private final ClusterService clusterService; + private final ThreadPool threadPool; + + public WaitForActiveShardsMonitor(final Settings settings, + final ClusterService clusterService, + final ThreadPool threadPool) { + super(settings); + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + /** + * Waits for the specified number of shards to become active in the given index, + * or times out with the specified timeout value. + * + * @param indexName the index to wait on active shards for + * @param waitForActiveShards the number of active shards to wait to be started on the index + * @param timeout the timeout value + * @param listener the response listener + * @param onPredicateAppliedOrTimeout the consumer function to execute upon a matching cluster state predicate, or on timeout; + * the boolean input value represents if a timeout occurred (if false, then it was a successful + * matching predicate) + */ + public void waitOnShards(final String indexName, + final ActiveShardCount waitForActiveShards, + final TimeValue timeout, + final ActionListener listener, + final Consumer onPredicateAppliedOrTimeout) { + + if (waitForActiveShards == ActiveShardCount.NONE) { + // not waiting, so just run whatever we were to run when the waiting is + onPredicateAppliedOrTimeout.accept(false); + } + + // wait for the configured number of active shards to be allocated before returning + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext()); + final ClusterStateObserver.ChangePredicate shardsAllocatedPredicate = new ClusterStateObserver.ChangePredicate() { + @Override + public boolean apply(ClusterState previousState, ClusterState.ClusterStateStatus previousStatus, + ClusterState newState, ClusterState.ClusterStateStatus newStatus) { + return waitForActiveShards.enoughShardsActive(newState, indexName, settings); + } + + @Override + public boolean apply(ClusterChangedEvent changedEvent) { + return waitForActiveShards.enoughShardsActive(changedEvent.state(), indexName, settings); + } + }; + + final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + onPredicateAppliedOrTimeout.accept(false); + } + + @Override + public void onClusterServiceClose() { + logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName); + listener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + onPredicateAppliedOrTimeout.accept(true); + } + }; + + observer.waitForNextChange(observerListener, shardsAllocatedPredicate, timeout); + } + +} diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index ae6a15411f54a..05de6ce72803a 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -118,7 +118,10 @@ public static void resolveAndValidateRouting(MetaData metaData, String concreteI protected void doExecute(final UpdateRequest request, final ActionListener listener) { // if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { - createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener() { + CreateIndexRequest createIndexRequest = new CreateIndexRequest() + .index(request.index()).cause("auto(update api)") + .masterNodeTimeout(request.timeout()); + createIndexAction.execute(createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { innerExecute(request, listener); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 456675abcf197..3621f5503d38b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -109,7 +109,6 @@ public class MetaDataCreateIndexService extends AbstractComponent { private final NodeServicesProvider nodeServicesProvider; private final IndexScopedSettings indexScopedSettings; - @Inject public MetaDataCreateIndexService(Settings settings, ClusterService clusterService, IndicesService indicesService, AllocationService allocationService, @@ -308,6 +307,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); + if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) { + throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() + + "]: cannot be greater than number of shard copies [" + + (tmpImd.getNumberOfReplicas() + 1) + "]"); + } // create the index here (on the master) to validate it can be created, as well as adding the mapping final IndexService indexService = indicesService.createIndex(nodeServicesProvider, tmpImd, Collections.emptyList()); createdIndex = indexService.index(); diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index bbbbe57618b50..f63b1cb9b0d02 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.common.settings; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -140,6 +141,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING, FsDirectoryService.INDEX_LOCK_FACTOR_SETTING, EngineConfig.INDEX_CODEC_SETTING, + IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 2c20697d757e1..6848e8e5f63fc 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -20,6 +20,7 @@ import org.apache.lucene.index.MergePolicy; import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.logging.ESLogger; @@ -127,6 +128,17 @@ public final class IndexSettings { public static final Setting MAX_SLICES_PER_SCROLL = Setting.intSetting("index.max_slices_per_scroll", 1024, 1, Property.Dynamic, Property.IndexScope); + /** + * The number of shard copies required to be active before proceed with the + * write operation or returning from an index creation operation. + */ + public static final Setting WAIT_FOR_ACTIVE_SHARDS_SETTING = + new Setting<>("index.wait_for_active_shards", + (String) null, + ActiveShardCount::parseString, + Setting.Property.Dynamic, + Setting.Property.IndexScope); + private final Index index; private final Version version; private final ESLogger logger; diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java index 7eecaaa738c9a..66d230f3667dd 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRolloverIndexAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -51,6 +52,7 @@ public void handleRequest(final RestRequest request, final RestChannel channel, rolloverIndexRequest.dryRun(request.paramAsBoolean("dry_run", false)); rolloverIndexRequest.timeout(request.paramAsTime("timeout", rolloverIndexRequest.timeout())); rolloverIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", rolloverIndexRequest.masterNodeTimeout())); + rolloverIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); client.admin().indices().rolloverIndex(rolloverIndexRequest, new RestToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java index f145bcbf02a22..0bbffcaf379e3 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/RestShrinkIndexAction.java @@ -20,15 +20,20 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.shrink.ShrinkRequest; +import org.elasticsearch.action.admin.indices.shrink.ShrinkResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; +import java.io.IOException; + /** * */ @@ -56,6 +61,12 @@ public void handleRequest(final RestRequest request, final RestChannel channel, } shrinkIndexRequest.timeout(request.paramAsTime("timeout", shrinkIndexRequest.timeout())); shrinkIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", shrinkIndexRequest.masterNodeTimeout())); - client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener<>(channel)); + shrinkIndexRequest.setWaitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); + client.admin().indices().shrinkIndex(shrinkIndexRequest, new AcknowledgedRestListener(channel) { + @Override + public void addCustomFields(XContentBuilder builder, ShrinkResponse response) throws IOException { + response.addCustomFields(builder, response); + } + }); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java index 46bc9388972a6..3d2a2ac280090 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java @@ -21,15 +21,19 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.AcknowledgedRestListener; +import java.io.IOException; + /** * */ @@ -52,6 +56,12 @@ public void handleRequest(final RestRequest request, final RestChannel channel, createIndexRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false)); createIndexRequest.timeout(request.paramAsTime("timeout", createIndexRequest.timeout())); createIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", createIndexRequest.masterNodeTimeout())); - client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener(channel)); + createIndexRequest.waitForActiveShards(ActiveShardCount.parseString(request.param("wait_for_active_shards"))); + client.admin().indices().create(createIndexRequest, new AcknowledgedRestListener(channel) { + @Override + public void addCustomFields(XContentBuilder builder, CreateIndexResponse response) throws IOException { + response.addCustomFields(builder, response); + } + }); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java index 9b19a34b2ff23..770d913425cea 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplainIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -29,7 +30,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,8 +60,8 @@ public void run() { prepareCreate("test").setSettings(Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "1m") .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 5) - .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).get(); - ensureGreen("test"); + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)) + .setWaitForActiveShards(ActiveShardCount.ALL).get(); logger.info("--> stopping a random node"); assertTrue(internalCluster().stopRandomDataNode()); @@ -92,6 +92,7 @@ public void testUnassignedShards() throws Exception { .setSettings(Settings.builder() .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)) + .setWaitForActiveShards(ActiveShardCount.ALL) // wait on all shards .get(); client().admin().indices().prepareCreate("only-baz") @@ -99,6 +100,7 @@ public void testUnassignedShards() throws Exception { .put("index.routing.allocation.include.bar", "baz") .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)) + .setWaitForActiveShards(ActiveShardCount.ALL) .get(); client().admin().indices().prepareCreate("only-foo") @@ -108,9 +110,6 @@ public void testUnassignedShards() throws Exception { .put("index.number_of_replicas", 1)) .get(); - ensureGreen("anywhere", "only-baz"); - ensureYellow("only-foo"); - ClusterAllocationExplainResponse resp = client().admin().cluster().prepareAllocationExplain() .setIndex("only-foo") .setShard(0) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 3e7323dceeb16..0ce96172ad300 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; @@ -41,14 +42,11 @@ import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; -import org.elasticsearch.node.service.NodeService; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; -import org.junit.Ignore; import java.util.HashMap; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -292,7 +290,7 @@ public void testMappingConflictRootCause() throws Exception { public void testRestartIndexCreationAfterFullClusterRestart() throws Exception { client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get(); - client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get(); + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(indexSettings()).get(); internalCluster().fullRestart(); ensureGreen("test"); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexWaitsIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexWaitsIT.java new file mode 100644 index 0000000000000..b53936fea4f99 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexWaitsIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.admin.indices.create; + +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.elasticsearch.cluster.metadata.IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +/** + * Tests that the index creation operation waits for the appropriate + * number of active shards to be started before returning. + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class CreateIndexWaitsIT extends ESIntegTestCase { + + public void testCreateIndexNoActiveShardsTimesOut() throws Exception { + final String indexName = "test-idx"; + Settings.Builder settingsBuilder = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(2, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + if (internalCluster().getNodeNames().length > 0) { + String exclude = String.join(",", internalCluster().getNodeNames()); + settingsBuilder.put("index.routing.allocation.exclude._name", exclude); + } + Settings settings = settingsBuilder.build(); + assertTrue(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(randomBoolean() ? ActiveShardCount.from(1) : ActiveShardCount.ALL) + .setTimeout("100ms") + .get() + .isTimedOutWaitingForShards()); + } + + public void testCreateIndexNoActiveShardsNoWaiting() throws Exception { + final String indexName = "test-idx"; + Settings.Builder settingsBuilder = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(2, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + if (internalCluster().getNodeNames().length > 0) { + String exclude = String.join(",", internalCluster().getNodeNames()); + settingsBuilder.put("index.routing.allocation.exclude._name", exclude); + } + Settings settings = settingsBuilder.build(); + CreateIndexResponse response = prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.from(0)) + .get(); + assertTrue(response.isAcknowledged()); + } + + public void testCreateIndexNotEnoughActiveShardsTimesOut() throws Exception { + final String indexName = "test-idx"; + final int numReplicas = internalCluster().numDataNodes() + randomIntBetween(2, 4); + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(2, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertTrue(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.from(internalCluster().numDataNodes() + 1)) + .setTimeout("100ms") + .get() + .isTimedOutWaitingForShards()); + } + + public void testCreateIndexEnoughActiveShards() throws Exception { + final String indexName = "test-idx"; + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(2, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3)) + .build(); + ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes() - 1)); + assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get()); + } + + public void testCreateIndexWaitsForAllActiveShards() throws Exception { + final String indexName = "test-idx"; + // not enough data nodes, index creation times out + final int numReplicas = internalCluster().numDataNodes() + randomIntBetween(2, 4); + Settings settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(2, 5)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertTrue(prepareCreate(indexName) + .setSettings(settings) + .setWaitForActiveShards(ActiveShardCount.ALL) + .setTimeout("100ms") + .get() + .isTimedOutWaitingForShards()); + if (client().admin().indices().prepareExists(indexName).get().isExists()) { + assertAcked(client().admin().indices().prepareDelete(indexName)); + } + + // enough data nodes, all shards are active + settings = Settings.builder() + .put(indexSettings()) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(2, 7)) + .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() - 1) + .build(); + assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(ActiveShardCount.ALL).get()); + } + +} diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java index 7d163630afbca..4bbe7e097512e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/rollover/TransportRolloverActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.metadata.AliasAction; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -166,6 +167,8 @@ public void testCreateIndexRequest() throws Exception { String alias = randomAsciiOfLength(10); String rolloverIndex = randomAsciiOfLength(10); final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAsciiOfLength(10)); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + rolloverRequest.setWaitForActiveShards(activeShardCount); final Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()) @@ -178,5 +181,6 @@ public void testCreateIndexRequest() throws Exception { assertThat(createIndexRequest.settings(), equalTo(settings)); assertThat(createIndexRequest.index(), equalTo(rolloverIndex)); assertThat(createIndexRequest.cause(), equalTo("rollover_index")); + assertThat(createIndexRequest.waitForActiveShards(), equalTo(activeShardCount)); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java index d78374d446fb7..cecf180f02406 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportShrinkActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.index.IndexWriter; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; @@ -130,6 +131,8 @@ public void testShrinkIndexSettings() { int numSourceShards = clusterState.metaData().index(indexName).getNumberOfShards(); DocsStats stats = new DocsStats(randomIntBetween(0, (IndexWriter.MAX_DOCS) / numSourceShards), randomIntBetween(1, 1000)); ShrinkRequest target = new ShrinkRequest("target", indexName); + final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE; + target.setWaitForActiveShards(ActiveShardCount.ONE); CreateIndexClusterStateUpdateRequest request = TransportShrinkAction.prepareCreateIndexRequest( target, clusterState, (i) -> stats, new IndexNameExpressionResolver(Settings.EMPTY)); @@ -137,6 +140,7 @@ public void testShrinkIndexSettings() { assertEquals(indexName, request.shrinkFrom().getName()); assertEquals("1", request.settings().get("index.number_of_shards")); assertEquals("shrink_index", request.cause()); + assertEquals(request.waitForActiveShards(), activeShardCount); } private DiscoveryNode newNode(String nodeId) { diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java new file mode 100644 index 0000000000000..252d68485cccd --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java @@ -0,0 +1,294 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import com.carrotsearch.hppc.cursors.IntObjectCursor; +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.ByteBufferStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +/** + * Tests for the {@link ActiveShardCount} class + */ +public class ActiveShardCountTests extends ESTestCase { + + public void testFromIntValue() { + assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE); + final int value = randomIntBetween(1, 50); + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1))); + } + + public void testResolve() { + // one shard + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); + final int value = randomIntBetween(2, 20); + assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); + + // more than one shard + final int numNewShards = randomIntBetween(1, 20); + indexMetaData = IndexMetaData.builder("test") + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(numNewShards) + .build(); + assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1)); + assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); + assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); + assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); + } + + public void testSerialization() throws IOException { + doWriteRead(ActiveShardCount.ALL); + doWriteRead(ActiveShardCount.DEFAULT); + doWriteRead(ActiveShardCount.NONE); + doWriteRead(ActiveShardCount.from(randomIntBetween(1, 50))); + } + + public void testParseString() { + assertSame(ActiveShardCount.parseString("all"), ActiveShardCount.ALL); + assertSame(ActiveShardCount.parseString(null), ActiveShardCount.DEFAULT); + assertSame(ActiveShardCount.parseString("0"), ActiveShardCount.NONE); + int value = randomIntBetween(1, 50); + assertEquals(ActiveShardCount.parseString(value + ""), ActiveShardCount.from(value)); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomAsciiOfLengthBetween(4, 8))); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-1")); // magic numbers not exposed through API + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString("-2")); + expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + "")); + } + + private void doWriteRead(ActiveShardCount activeShardCount) throws IOException { + final BytesStreamOutput out = new BytesStreamOutput(); + activeShardCount.writeTo(out); + final ByteBufferStreamInput in = new ByteBufferStreamInput(ByteBuffer.wrap(out.bytes().toBytes())); + ActiveShardCount readActiveShardCount = ActiveShardCount.readFrom(in); + if (activeShardCount == ActiveShardCount.DEFAULT + || activeShardCount == ActiveShardCount.ALL + || activeShardCount == ActiveShardCount.NONE) { + assertSame(activeShardCount, readActiveShardCount); + } else { + assertEquals(activeShardCount, readActiveShardCount); + } + } + + public void testEnoughShardsActiveZero() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0); + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startPrimaries(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + } + + public void testEnoughShardsActiveLevelOne() { + runTestForOneActiveShard(ActiveShardCount.from(1)); + } + + public void testEnoughShardsActiveLevelDefault() { + // default is 1 + runTestForOneActiveShard(ActiveShardCount.DEFAULT); + } + + public void testEnoughShardsActiveRandom() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas)); + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startPrimaries(clusterState, indexName); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + } + + public void testEnoughShardsActiveLevelAll() { + final String indexName = "test-idx"; + final int numberOfShards = randomIntBetween(1, 5); + final int numberOfReplicas = randomIntBetween(4, 7); + // both values should represent "all" + final ActiveShardCount waitForActiveShards = randomBoolean() ? ActiveShardCount.from(numberOfReplicas + 1) : ActiveShardCount.ALL; + ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startPrimaries(clusterState, indexName); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + clusterState = startAllShards(clusterState, indexName); + assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName, Settings.EMPTY)); + } + + private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) { + + } + + private ClusterState initializeWithNewIndex(final String indexName, final int numShards, final int numReplicas) { + // initial index creation and new routing table info + final IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())) + .numberOfShards(numShards) + .numberOfReplicas(numReplicas) + .build(); + final MetaData metaData = MetaData.builder().put(indexMetaData, true).build(); + final RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetaData).build(); + return ClusterState.builder(new ClusterName("test_cluster")).metaData(metaData).routingTable(routingTable).build(); + } + + private ClusterState startPrimaries(final ClusterState clusterState, final String indexName) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, + final ActiveShardCount waitForActiveShards) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + assert shardRoutingTable.getSize() > 2; + // want less than half, and primary is already started + int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (numToStart > 0) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + numToStart--; + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, + final ActiveShardCount waitForActiveShards) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + assert shardRoutingTable.getSize() > 2; + int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (shardRouting.active() == false) { + if (numToStart > 0) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + numToStart--; + } + } else { + numToStart--; + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + private ClusterState startAllShards(final ClusterState clusterState, final String indexName) { + RoutingTable routingTable = clusterState.routingTable(); + IndexRoutingTable indexRoutingTable = routingTable.index(indexName); + IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); + for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { + final IndexShardRoutingTable shardRoutingTable = shardEntry.value; + for (ShardRouting shardRouting : shardRoutingTable.getShards()) { + if (shardRouting.primary()) { + assertTrue(shardRouting.active()); + } else { + if (shardRouting.active() == false) { + shardRouting = shardRouting.initialize(randomAsciiOfLength(8), null, shardRouting.getExpectedShardSize()) + .moveToStarted(); + } + } + newIndexRoutingTable.addShard(shardRouting); + } + } + routingTable = RoutingTable.builder(routingTable).add(newIndexRoutingTable).build(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java index 43aa088a3b945..f411e00468eb7 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/SimpleDataNodesIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -40,7 +41,7 @@ public class SimpleDataNodesIT extends ESIntegTestCase { public void testDataNodes() throws Exception { internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); - client().admin().indices().create(createIndexRequest("test")).actionGet(); + client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet(); try { client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test")).timeout(timeValueSeconds(1))).actionGet(); fail("no allocation should happen"); diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index c86535e40c59c..70af580824118 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -91,7 +92,7 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception { final String node_2 = nodesIds.get(1); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); - client().admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE) .setSettings(Settings.builder().put("index.number_of_shards", 1)) .execute().actionGet(); @@ -203,7 +204,7 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc assertThat(healthResponse.isTimedOut(), equalTo(false)); logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate"); - client().admin().indices().prepareCreate("test") + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE) .setSettings(Settings.builder().put("index.number_of_shards", 1)) .execute().actionGet(); @@ -253,14 +254,13 @@ private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exc assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.INITIALIZING)); - healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet(); - assertThat(healthResponse.isTimedOut(), equalTo(false)); - logger.info("--> get the state, verify shard 1 primary allocated"); - state = client().admin().cluster().prepareState().execute().actionGet().getState(); - assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1)); - assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); - + final String nodeToCheck = node_1; + assertBusy(() -> { + ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState(); + String nodeId = clusterState.nodes().resolveNode(nodeToCheck).getId(); + assertThat(clusterState.getRoutingNodes().node(nodeId).iterator().next().state(), equalTo(ShardRoutingState.STARTED)); + }); } public void testRerouteExplain() { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index f990f382c8c50..f267af66dc643 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; @@ -108,7 +109,7 @@ private void createStaleReplicaScenario() throws Exception { logger.info("--> check that old primary shard does not get promoted to primary again"); // kick reroute and wait for all shard states to be fetched client(master).admin().cluster().prepareReroute().get(); - assertBusy(new Runnable() { + assertBusy(new Runnable() { @Override public void run() { assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)); @@ -157,7 +158,8 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { createStaleReplicaScenario(); logger.info("--> explicitly promote old primary shard"); - ImmutableOpenIntMap> storeStatuses = client().admin().indices().prepareShardStores("test").get().getStoreStatuses().get("test"); + final String idxName = "test"; + ImmutableOpenIntMap> storeStatuses = client().admin().indices().prepareShardStores(idxName).get().getStoreStatuses().get(idxName); ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute(); for (IntObjectCursor> shardStoreStatuses : storeStatuses) { int shardId = shardStoreStatuses.key; @@ -165,22 +167,30 @@ public void testForceStaleReplicaToBePromotedToPrimary() throws Exception { logger.info("--> adding allocation command for shard {}", shardId); // force allocation based on node id if (useStaleReplica) { - rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); + rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } else { - rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand("test", shardId, storeStatus.getNode().getId(), true)); + rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(idxName, shardId, storeStatus.getNode().getId(), true)); } } rerouteBuilder.get(); logger.info("--> check that the stale primary shard gets allocated and that documents are available"); - ensureYellow("test"); + ensureYellow(idxName); + + if (useStaleReplica == false) { + // When invoking AllocateEmptyPrimaryAllocationCommand, due to the UnassignedInfo.Reason being changed to INDEX_CREATION, + // its possible that the shard has not completed initialization, even though the cluster health is yellow, so the + // search can throw an "all shards failed" exception. We will wait until the shard initialization has completed before + // verifying the search hit count. + assertBusy(() -> assertTrue(clusterService().state().routingTable().index(idxName).allPrimaryShardsActive())); - assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); + } + assertHitCount(client().prepareSearch(idxName).setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1L : 0L); } public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException { String node = internalCluster().startNode(); - client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() .put("index.routing.allocation.exclude._name", node) .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get(); diff --git a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index d5dee3c1bdc59..21622a13472ae 100644 --- a/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; @@ -205,7 +206,7 @@ public void testJustMasterNode() throws Exception { internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build()); logger.info("--> create an index"); - client().admin().indices().prepareCreate("test").execute().actionGet(); + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).execute().actionGet(); logger.info("--> closing master node"); internalCluster().closeNonSharedNodes(false); diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 78ef13dde568c..64052d640f1d0 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -140,7 +141,7 @@ public ClusterState randomlyUpdateClusterState(ClusterState state, CreateIndexRequest request = new CreateIndexRequest(name, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)) .put(SETTING_NUMBER_OF_REPLICAS, randomInt(2)) - .build()); + .build()).waitForActiveShards(ActiveShardCount.NONE); state = cluster.createIndex(state, request); assertTrue(state.metaData().hasIndex(name)); } diff --git a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 8c724efdfc7b4..32928bb92818c 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.flush.SyncedFlushResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; @@ -213,7 +214,7 @@ private void assertFlushResponseEqualsShardStats(ShardStats[] shardsStats, List< public void testUnallocatedShardsDoesNotHang() throws InterruptedException { // create an index but disallow allocation - prepareCreate("test").setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); + prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder().put("index.routing.allocation.include._name", "nonexistent")).get(); // this should not hang but instead immediately return with empty result set List shardsResult = client().admin().indices().prepareSyncedFlush("test").get().getShardsResultPerIndex().get("test"); diff --git a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java index ea3ebf5179b48..66687ea74fa81 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -98,7 +99,7 @@ public void testSimpleOpenClose() { public void testFastCloseAfterCreateContinuesCreateAfterOpen() { logger.info("--> creating test index that cannot be allocated"); - client().admin().indices().prepareCreate("test").setSettings(Settings.builder() + client().admin().indices().prepareCreate("test").setWaitForActiveShards(ActiveShardCount.NONE).setSettings(Settings.builder() .put("index.routing.allocation.include.tag", "no_such_node").build()).get(); ClusterHealthResponse health = client().admin().cluster().prepareHealth("test").setWaitForNodes(">=2").get(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 3ba6c875b68e9..35d74f59e2e79 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -44,6 +46,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -437,11 +440,12 @@ public void testRestoreIndexWithMissingShards() throws Exception { assertAcked(client().admin().indices().prepareClose("test-idx-closed")); logger.info("--> create an index that will have no allocated shards"); - assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6) + CreateIndexResponse createIndexResponse = prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6) .put("index.routing.allocation.include.tag", "nowhere") - .put("number_of_replicas", 0))); + .put("number_of_replicas", 0)).setWaitForActiveShards(ActiveShardCount.NONE).get(); + assertTrue(createIndexResponse.isAcknowledged()); + assertTrue(client().admin().indices().prepareExists("test-idx-none").get().isExists()); - logger.info("--> create repository"); logger.info("--> creating repository"); PutRepositoryResponse putRepositoryResponse = client().admin().cluster().preparePutRepository("test-repo") .setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet(); @@ -683,7 +687,7 @@ public void sendResponse(RestResponse response) { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621") public void testChaosSnapshot() throws Exception { final List indices = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("action.write_consistency", "one").build(); + Settings settings = Settings.builder().put(IndexSettings.WAIT_FOR_ACTIVE_SHARDS_SETTING.getKey(), "1").build(); int initialNodes = between(1, 3); logger.info("--> start {} nodes", initialNodes); for (int i = 0; i < initialNodes; i++) { diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 19b46710fea0a..46ed602cb1583 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -755,7 +756,7 @@ public void testUnallocatedShards() throws Exception { .put("location", randomRepoPath()))); logger.info("--> creating index that cannot be allocated"); - prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).get(); + prepareCreate("test-idx", 2, Settings.builder().put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + ".tag", "nowhere").put("index.number_of_shards", 3)).setWaitForActiveShards(ActiveShardCount.NONE).get(); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").get(); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json index bdac6d9a9ab89..b0cf3f6247ac2 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.create.json @@ -13,6 +13,10 @@ } }, "params": { + "wait_for_active_shards": { + "type" : "string", + "description" : "Set the number of active shards to wait for before proceeding with the operation." + }, "timeout": { "type" : "time", "description" : "Explicit operation timeout"