Skip to content

Commit

Permalink
Index creation waits for active shard copies before returning (#18985)
Browse files Browse the repository at this point in the history
Before returning, index creation now waits for the configured number
of shard copies to be started. In the past, a client would create an
index and then potentially have to check the cluster health to wait
to execute write operations. With the cluster health semantics changing
so that index creation does not cause the cluster health to go RED,
this change enables waiting for the desired number of active shards
to be active before returning from index creation.

Relates #9126
  • Loading branch information
Ali Beyad committed Jul 15, 2016
1 parent 7759c23 commit d78f40f
Show file tree
Hide file tree
Showing 47 changed files with 1,282 additions and 121 deletions.
29 changes: 29 additions & 0 deletions core/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.action;

import java.util.function.Consumer;

/**
* A listener for action responses or failures.
*/
Expand All @@ -33,4 +35,31 @@ public interface ActionListener<Response> {
* A failure caused by an exception at some phase of the task.
*/
void onFailure(Exception e);

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received
*/
static <Response> ActionListener<Response> wrap(Consumer<Response> onResponse, Consumer<Exception> onFailure) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
onResponse.accept(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ

private final Set<ClusterBlock> blocks = new HashSet<>();

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;


public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
this.originalMessage = originalMessage;
Expand Down Expand Up @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) {
return this;
}

public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

public TransportMessage originalMessage() {
return originalMessage;
}
Expand Down Expand Up @@ -142,4 +150,8 @@ public Index shrinkFrom() {
public boolean updateAllTypes() {
return updateAllTypes;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>

private boolean updateAllTypes = false;

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CreateIndexRequest() {
}

Expand Down Expand Up @@ -440,6 +443,30 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the number of shard copies that should be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -462,6 +489,7 @@ public void readFrom(StreamInput in) throws IOException {
aliases.add(Alias.read(in));
}
updateAllTypes = in.readBoolean();
waitForActiveShards = ActiveShardCount.readFrom(in);
}

@Override
Expand All @@ -486,5 +514,6 @@ public void writeTo(StreamOutput out) throws IOException {
alias.writeTo(out);
}
out.writeBoolean(updateAllTypes);
waitForActiveShards.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -249,4 +250,23 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) {
request.updateAllTypes(updateAllTypes);
return this;
}

/**
* Sets the number of shard copies that should be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

Expand All @@ -30,22 +31,41 @@
*/
public class CreateIndexResponse extends AcknowledgedResponse {

private boolean shardsAcked;

protected CreateIndexResponse() {
}

protected CreateIndexResponse(boolean acknowledged) {
protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked) {
super(acknowledged);
assert acknowledged || shardsAcked == false; // if its not acknowledged, then shards acked should be false too
this.shardsAcked = shardsAcked;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
shardsAcked = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeBoolean(shardsAcked);
}

/**
* Returns true if the requisite number of shards were started before
* returning from the index creation operation. If {@link #isAcknowledged()}
* is false, then this also returns false.
*/
public boolean isShardsAcked() {
return shardsAcked;
}

public void addCustomFields(XContentBuilder builder) throws IOException {
builder.field("shards_acknowledged", isShardsAcked());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -77,24 +75,12 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
.aliases(request.aliases()).customs(request.customs())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked())),
listener::onFailure));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -206,4 +207,22 @@ public void source(BytesReference source) {
}
}

/**
* Sets the number of shard copies that should be active for creation of the
* new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -70,4 +71,23 @@ public RolloverRequestBuilder mapping(String type, String source) {
this.request.getCreateIndexRequest().mapping(type, source);
return this;
}

/**
* Sets the number of shard copies that should be active for creation of the
* new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}
}

0 comments on commit d78f40f

Please sign in to comment.