Skip to content

Commit

Permalink
Index creation waits for active shards before returning
Browse files Browse the repository at this point in the history
Before returning, index creation now waits for the configured number
of shard copies to be started. In the past, a client would create an
index and then potentially have to check the cluster health to wait
to execute write operations. With the cluster health semantics changing
so that index creation does not cause the cluster health to go RED,
this change enables waiting for the desired number of active shards
to be active before returning from index creation.

Relates elastic#9126
  • Loading branch information
Ali Beyad committed Jul 11, 2016
1 parent 7759c23 commit af48cb9
Show file tree
Hide file tree
Showing 44 changed files with 1,117 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ

private final Set<ClusterBlock> blocks = new HashSet<>();

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;


public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
this.originalMessage = originalMessage;
Expand Down Expand Up @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) {
return this;
}

public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

public TransportMessage originalMessage() {
return originalMessage;
}
Expand Down Expand Up @@ -142,4 +150,8 @@ public Index shrinkFrom() {
public boolean updateAllTypes() {
return updateAllTypes;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>

private boolean updateAllTypes = false;

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CreateIndexRequest() {
}

Expand Down Expand Up @@ -440,6 +443,27 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the number of shard copies that must be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* ensure all shards (primary and all replicas) are active before proceeding with the
* write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -462,6 +486,7 @@ public void readFrom(StreamInput in) throws IOException {
aliases.add(Alias.read(in));
}
updateAllTypes = in.readBoolean();
waitForActiveShards = ActiveShardCount.readFrom(in);
}

@Override
Expand All @@ -486,5 +511,6 @@ public void writeTo(StreamOutput out) throws IOException {
alias.writeTo(out);
}
out.writeBoolean(updateAllTypes);
waitForActiveShards.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -249,4 +250,20 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) {
request.updateAllTypes(updateAllTypes);
return this;
}

/**
* Sets the number of shard copies that must be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* ensure all shards (primary and all replicas) are active before proceeding with the
* write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

Expand All @@ -30,22 +31,40 @@
*/
public class CreateIndexResponse extends AcknowledgedResponse {

private boolean timedOutWaitingForShards;

protected CreateIndexResponse() {
}

protected CreateIndexResponse(boolean acknowledged) {
protected CreateIndexResponse(boolean acknowledged, boolean timedOutWaitingForShards) {
super(acknowledged);
this.timedOutWaitingForShards = timedOutWaitingForShards;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
timedOutWaitingForShards = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeBoolean(timedOutWaitingForShards);
}

/**
* Returns true if the request timed out waiting for the required number
* of active shards. If the index was not successfully created in the
* first place, then this value is meaningless.
*/
public boolean isTimedOutWaitingForShards() {
return timedOutWaitingForShards;
}

public void addCustomFields(XContentBuilder builder) throws IOException {
builder.field("timed_out_waiting_for_shards", isTimedOutWaitingForShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -77,24 +75,11 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
.aliases(request.aliases()).customs(request.customs())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
createIndexService.createIndexAndWaitForActiveShards(updateRequest, listener, CreateIndexResponse::new);
}

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -206,4 +207,19 @@ public void source(BytesReference source) {
}
}

/**
* Sets the number of shard copies that must be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* ensure all shards (primary and all replicas) are active before proceeding with the
* write. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,25 @@ public final class RolloverResponse extends ActionResponse implements ToXContent
private static final String DRY_RUN = "dry_run";
private static final String ROLLED_OVER = "rolled_over";
private static final String CONDITIONS = "conditions";
private static final String TIMED_OUT_WAITING_FOR_SHARDS = "timed_out_waiting_for_shards";

private String oldIndex;
private String newIndex;
private Set<Map.Entry<String, Boolean>> conditionStatus;
private boolean dryRun;
private boolean rolledOver;
private boolean timedOutWaitingForShards;

RolloverResponse() {
}

RolloverResponse(String oldIndex, String newIndex, Set<Condition.Result> conditionResults,
boolean dryRun, boolean rolledOver) {
boolean dryRun, boolean rolledOver, boolean timedOutWaitingForShards) {
this.oldIndex = oldIndex;
this.newIndex = newIndex;
this.dryRun = dryRun;
this.rolledOver = rolledOver;
this.timedOutWaitingForShards = timedOutWaitingForShards;
this.conditionStatus = conditionResults.stream()
.map(result -> new AbstractMap.SimpleEntry<>(result.condition.toString(), result.matched))
.collect(Collectors.toSet());
Expand Down Expand Up @@ -95,6 +98,16 @@ public boolean isRolledOver() {
return rolledOver;
}

/**
* Returns true if there was a timeout waiting for enough active shards
* on the newly created rollover index. Even if true, the rollover index
* itself could still be properly created, check {@link #isRolledOver()} to
* determine.
*/
public boolean isTimedOutWaitingForShards() {
return timedOutWaitingForShards;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -110,6 +123,7 @@ public void readFrom(StreamInput in) throws IOException {
conditionStatus = conditions;
dryRun = in.readBoolean();
rolledOver = in.readBoolean();
timedOutWaitingForShards = in.readBoolean();
}

@Override
Expand All @@ -124,6 +138,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeBoolean(dryRun);
out.writeBoolean(rolledOver);
out.writeBoolean(timedOutWaitingForShards);
}

@Override
Expand All @@ -132,6 +147,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(NEW_INDEX, newIndex);
builder.field(ROLLED_OVER, rolledOver);
builder.field(DRY_RUN, dryRun);
builder.field(TIMED_OUT_WAITING_FOR_SHARDS, timedOutWaitingForShards);
builder.startObject(CONDITIONS);
for (Map.Entry<String, Boolean> entry : conditionStatus) {
builder.field(entry.getKey(), entry.getValue());
Expand Down
Loading

0 comments on commit af48cb9

Please sign in to comment.