Skip to content

Commit

Permalink
Index creation waits for active shards before returning
Browse files Browse the repository at this point in the history
Before returning, index creation now waits for the configured number
of shard copies to be started. In the past, a client would create an
index and then potentially have to check the cluster health to wait
to execute write operations. With the cluster health semantics changing
so that index creation does not cause the cluster health to go RED,
this change enables waiting for the desired number of active shards
to be active before returning from index creation.

Relates elastic#9126
  • Loading branch information
Ali Beyad committed Jul 4, 2016
1 parent 0c74177 commit 9ef0dee
Show file tree
Hide file tree
Showing 38 changed files with 1,098 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ

private final Set<ClusterBlock> blocks = new HashSet<>();

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;


public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
this.originalMessage = originalMessage;
Expand Down Expand Up @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) {
return this;
}

public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

public TransportMessage originalMessage() {
return originalMessage;
}
Expand Down Expand Up @@ -142,4 +150,8 @@ public Index shrinkFrom() {
public boolean updateAllTypes() {
return updateAllTypes;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>

private boolean updateAllTypes = false;

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CreateIndexRequest() {
}

Expand Down Expand Up @@ -440,6 +443,27 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the number of shard copies that must be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* ensure all shards (primary and all replicas) are active before proceeding with the
* write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -462,6 +486,7 @@ public void readFrom(StreamInput in) throws IOException {
aliases.add(Alias.read(in));
}
updateAllTypes = in.readBoolean();
waitForActiveShards = ActiveShardCount.readFrom(in);
}

@Override
Expand All @@ -486,5 +511,6 @@ public void writeTo(StreamOutput out) throws IOException {
alias.writeTo(out);
}
out.writeBoolean(updateAllTypes);
waitForActiveShards.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -249,4 +250,20 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) {
request.updateAllTypes(updateAllTypes);
return this;
}

/**
* Sets the number of shard copies that must be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* ensure all shards (primary and all replicas) are active before proceeding with the
* write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

Expand All @@ -30,22 +31,43 @@
*/
public class CreateIndexResponse extends AcknowledgedResponse {

private boolean timedOutWaitingForShards;

protected CreateIndexResponse() {
}

protected CreateIndexResponse(boolean acknowledged) {
protected CreateIndexResponse(boolean acknowledged, boolean timedOutWaitingForShards) {
super(acknowledged);
this.timedOutWaitingForShards = timedOutWaitingForShards;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
timedOutWaitingForShards = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeBoolean(timedOutWaitingForShards);
}

/**
* Returns true if the request timed out waiting for the required number
* of active shards to be started. If the index was not successfully
* created in the first place, then this value is meaningless.
*/
public boolean isTimedOutWaitingForShards() {
return timedOutWaitingForShards;
}

/**
* Adds custom fields to the AcknowledgedResponse's x-content.
*/
public void addCustomFields(XContentBuilder builder, CreateIndexResponse response) throws IOException {
builder.field("timed_out_waiting_for_shards", response.isTimedOutWaitingForShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WaitForActiveShardsMonitor;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
Expand All @@ -41,13 +42,15 @@
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetaDataCreateIndexService createIndexService;
private final WaitForActiveShardsMonitor shardsMonitor;

@Inject
public TransportCreateIndexAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, CreateIndexRequest::new);
this.createIndexService = createIndexService;
this.shardsMonitor = new WaitForActiveShardsMonitor(settings, clusterService, threadPool);
}

@Override
Expand Down Expand Up @@ -77,24 +80,47 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
.aliases(request.aliases()).customs(request.customs())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
if (response.isAcknowledged()) {
// the cluster state that includes the newly created index has been acknowledged,
// now wait for the configured number of active shards to be allocated before returning,
// as that is when indexing operations can take place on the newly created index
try {
shardsMonitor.waitOnShards(indexName, request.waitForActiveShards(), request.masterNodeTimeout(), listener,
(timedOut) -> {
if (timedOut) {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
"enough shards to be started.", indexName);
}
listener.onResponse(new CreateIndexResponse(true, timedOut));
});

} catch (Exception ex) {
logger.debug("[{}] index creation failed on waiting for shards", indexName);
listener.onFailure(ex);
}
} else {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), true));
}
}

@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
logger.trace("[{}] failed to create", t, indexName);
} else {
logger.debug("[{}] failed to create", t, request.index());
logger.debug("[{}] failed to create", t, indexName);
}
listener.onFailure(t);
}
});
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -206,4 +207,19 @@ public void source(BytesReference source) {
}
}

/**
* Sets the number of shard copies that must be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* ensure all shards (primary and all replicas) are active before proceeding with the
* write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,25 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
private static final String DRY_RUN = "dry_run";
private static final String ROLLED_OVER = "rolled_over";
private static final String CONDITIONS = "conditions";
private static final String TIMED_OUT_WAITING_FOR_SHARDS = "timed_out_waiting_for_shards";

private String oldIndex;
private String newIndex;
private Set<Map.Entry<String, Boolean>> conditionStatus;
private boolean dryRun;
private boolean rolledOver;
private boolean timedOutWaitingForShards;

RolloverResponse() {
}

RolloverResponse(String oldIndex, String newIndex, Set<Condition.Result> conditionResults,
boolean dryRun, boolean rolledOver) {
boolean dryRun, boolean rolledOver, boolean timedOutWaitingForShards) {
this.oldIndex = oldIndex;
this.newIndex = newIndex;
this.dryRun = dryRun;
this.rolledOver = rolledOver;
this.timedOutWaitingForShards = timedOutWaitingForShards;
this.conditionStatus = conditionResults.stream()
.map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched))
.collect(Collectors.toSet());
Expand Down Expand Up @@ -95,6 +98,16 @@ public boolean isRolledOver() {
return rolledOver;
}

/**
* Returns true if there was a timeout waiting for enough active shards
* on the newly created rollover index. Even if true, the rollover index
* itself could still be properly created, check {@link #isRolledOver()} to
* determine.
*/
public boolean isTimedOutWaitingForShards() {
return timedOutWaitingForShards;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -110,6 +123,7 @@ public void readFrom(StreamInput in) throws IOException {
conditionStatus = conditions;
dryRun = in.readBoolean();
rolledOver = in.readBoolean();
timedOutWaitingForShards = in.readBoolean();
}

@Override
Expand All @@ -124,6 +138,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeBoolean(dryRun);
out.writeBoolean(rolledOver);
out.writeBoolean(timedOutWaitingForShards);
}

@Override
Expand All @@ -132,6 +147,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(NEW_INDEX, newIndex);
builder.field(ROLLED_OVER, rolledOver);
builder.field(DRY_RUN, dryRun);
builder.field(TIMED_OUT_WAITING_FOR_SHARDS, timedOutWaitingForShards);
builder.startObject(CONDITIONS);
for (Map.Entry<String, Boolean> entry : conditionStatus) {
builder.field(entry.getKey(), entry.getValue());
Expand Down

0 comments on commit 9ef0dee

Please sign in to comment.