Skip to content

Commit

Permalink
Add wait_for_active_shards parameter to index open command (#26682)
Browse files Browse the repository at this point in the history
Adds the wait_for_active_shards parameter to the index open command. Similar to the index creation command, the index open command will now, by default, wait until the primaries have been allocated.

Closes #20937
  • Loading branch information
alexshadow007 authored and ywelsch committed Sep 22, 2017
1 parent 109c6c2 commit ff737a8
Show file tree
Hide file tree
Showing 17 changed files with 278 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,26 @@
*/
package org.elasticsearch.action.admin.indices.open;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;

/**
* Cluster state update request that allows to open one or more indices
*/
public class OpenIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<OpenIndexClusterStateUpdateRequest> {

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

OpenIndexClusterStateUpdateRequest() {

}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

public OpenIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.elasticsearch.action.admin.indices.open;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -38,6 +40,7 @@ public class OpenIndexRequest extends AcknowledgedRequest<OpenIndexRequest> impl

private String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, true, false, true);
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public OpenIndexRequest() {
}
Expand Down Expand Up @@ -101,17 +104,55 @@ public OpenIndexRequest indicesOptions(IndicesOptions indicesOptions) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the number of shard copies that should be active for indices opening to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Indices opening will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link OpenIndexResponse#isShardsAcknowledged()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public OpenIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public OpenIndexRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
waitForActiveShards = ActiveShardCount.readFrom(in);
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
waitForActiveShards.writeTo(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.open;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
Expand Down Expand Up @@ -58,4 +59,32 @@ public OpenIndexRequestBuilder setIndicesOptions(IndicesOptions indicesOptions)
request.indicesOptions(indicesOptions);
return this;
}

/**
* Sets the number of shard copies that should be active for indices opening to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Indices opening will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link OpenIndexResponse#isShardsAcknowledged()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public OpenIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public OpenIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.open;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -30,22 +31,41 @@
*/
public class OpenIndexResponse extends AcknowledgedResponse {

private boolean shardsAcknowledged;

OpenIndexResponse() {
}

OpenIndexResponse(boolean acknowledged) {
OpenIndexResponse(boolean acknowledged, boolean shardsAcknowledged) {
super(acknowledged);
assert acknowledged || shardsAcknowledged == false; // if its not acknowledged, then shards acked should be false too
this.shardsAcknowledged = shardsAcknowledged;
}

/**
* Returns true if the requisite number of shards were started before
* returning from the indices opening operation. If {@link #isAcknowledged()}
* is false, then this also returns false.
*/
public boolean isShardsAcknowledged() {
return shardsAcknowledged;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
shardsAcknowledged = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(shardsAcknowledged);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -84,18 +83,18 @@ protected ClusterBlockException checkBlock(OpenIndexRequest request, ClusterStat
protected void masterOperation(final OpenIndexRequest request, final ClusterState state, final ActionListener<OpenIndexResponse> listener) {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new OpenIndexResponse(true));
listener.onResponse(new OpenIndexResponse(true, true));
return;
}
OpenIndexClusterStateUpdateRequest updateRequest = new OpenIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
.indices(concreteIndices).waitForActiveShards(request.waitForActiveShards());

indexStateService.openIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
indexStateService.openIndex(updateRequest, new ActionListener<OpenIndexClusterStateUpdateResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new OpenIndexResponse(response.isAcknowledged()));
public void onResponse(OpenIndexClusterStateUpdateResponse response) {
listener.onResponse(new OpenIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void onResponse(IndicesStatsResponse statsResponse) {
rolloverRequest),
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(rolloverIndexName,
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcked -> listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,36 +138,40 @@ public boolean enoughShardsActive(final int activeShardCount) {

/**
* Returns true iff the given cluster state's routing table contains enough active
* shards for the given index to meet the required shard count represented by this instance.
* shards for the given indices to meet the required shard count represented by this instance.
*/
public boolean enoughShardsActive(final ClusterState clusterState, final String indexName) {
public boolean enoughShardsActive(final ClusterState clusterState, final String... indices) {
if (this == ActiveShardCount.NONE) {
// not waiting for any active shards
return true;
}
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
if (indexMetaData == null) {
// its possible the index was deleted while waiting for active shard copies,
// in this case, we'll just consider it that we have enough active shard copies
// and we can stop waiting
return true;
}
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
assert indexRoutingTable != null;
if (indexRoutingTable.allPrimaryShardsActive() == false) {
// all primary shards aren't active yet
return false;
}
ActiveShardCount waitForActiveShards = this;
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
}
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
// not enough active shard copies yet

for (final String indexName : indices) {
final IndexMetaData indexMetaData = clusterState.metaData().index(indexName);
if (indexMetaData == null) {
// its possible the index was deleted while waiting for active shard copies,
// in this case, we'll just consider it that we have enough active shard copies
// and we can stop waiting
continue;
}
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
assert indexRoutingTable != null;
if (indexRoutingTable.allPrimaryShardsActive() == false) {
// all primary shards aren't active yet
return false;
}
ActiveShardCount waitForActiveShards = this;
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings());
}
for (final IntObjectCursor<IndexShardRoutingTable> shardRouting : indexRoutingTable.getShards()) {
if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) {
// not enough active shard copies yet
return false;
}
}
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -50,13 +51,13 @@ public ActiveShardsObserver(final Settings settings, final ClusterService cluste
/**
* Waits on the specified number of active shards to be started before executing the
*
* @param indexName the index to wait for active shards on
* @param indexNames the indices to wait for active shards on
* @param activeShardCount the number of active shards to wait on before returning
* @param timeout the timeout value
* @param onResult a function that is executed in response to the requisite shards becoming active or a timeout (whichever comes first)
* @param onFailure a function that is executed in response to an error occurring during waiting for the active shards
*/
public void waitForActiveShards(final String indexName,
public void waitForActiveShards(final String[] indexNames,
final ActiveShardCount activeShardCount,
final TimeValue timeout,
final Consumer<Boolean> onResult,
Expand All @@ -71,10 +72,10 @@ public void waitForActiveShards(final String indexName,

final ClusterState state = clusterService.state();
final ClusterStateObserver observer = new ClusterStateObserver(state, clusterService, null, logger, threadPool.getThreadContext());
if (activeShardCount.enoughShardsActive(state, indexName)) {
if (activeShardCount.enoughShardsActive(state, indexNames)) {
onResult.accept(true);
} else {
final Predicate<ClusterState> shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexName);
final Predicate<ClusterState> shardsAllocatedPredicate = newState -> activeShardCount.enoughShardsActive(newState, indexNames);

final ClusterStateObserver.Listener observerListener = new ClusterStateObserver.Listener() {
@Override
Expand All @@ -84,7 +85,7 @@ public void onNewClusterState(ClusterState state) {

@Override
public void onClusterServiceClose() {
logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", indexName);
logger.debug("[{}] cluster service closed while waiting for enough shards to be started.", Arrays.toString(indexNames));
onFailure.accept(new NodeClosedException(clusterService.localNode()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.ack;

/**
* A cluster state update response with specific fields for index opening.
*/
public class OpenIndexClusterStateUpdateResponse extends ClusterStateUpdateResponse {

private final boolean shardsAcknowledged;

public OpenIndexClusterStateUpdateResponse(boolean acknowledged, boolean shardsAcknowledged) {
super(acknowledged);
this.shardsAcknowledged = shardsAcknowledged;
}

/**
* Returns whether the requisite number of shard copies started before the completion of the operation.
*/
public boolean isShardsAcknowledged() {
return shardsAcknowledged;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void createIndex(final CreateIndexClusterStateUpdateRequest request,
final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
onlyCreateIndex(request, ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(request.index(), request.waitForActiveShards(), request.ackTimeout(),
activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
shardsAcked -> {
if (shardsAcked == false) {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
Expand Down
Loading

0 comments on commit ff737a8

Please sign in to comment.