Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removes write consistency level across replication action APIs in favor of wait_for_active_shards #19454

Merged
merged 5 commits into from
Aug 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]AutoCreateIndexTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptionsTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]TransportActionFilterChainTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
Expand Down Expand Up @@ -799,7 +800,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]cbor[/\\]JsonVsCborTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]smile[/\\]JsonVsSmileTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]support[/\\]filtering[/\\]FilterPathGeneratorFilteringTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]consistencylevel[/\\]WriteConsistencyLevelIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
Expand Down Expand Up @@ -1025,7 +1025,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWhileRelocatingIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomExceptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomIOExceptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportSearchFailuresIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportTwoNodesSearchIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ChildQuerySearchIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ParentFieldLoadingIT.java" checks="LineLength" />
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShar
return this;
}

/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}


@Override
public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,13 @@ public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitFor
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -33,6 +34,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
}

public ShardFlushRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request) {
return new ReplicaResult();
}

@Override
protected boolean checkWriteConsistency() {
return false;
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
Expand Down Expand Up @@ -54,7 +55,9 @@ protected ReplicationResponse newShardResponse() {

@Override
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
return new BasicReplicationRequest(shardId);
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
replicationRequest.waitForActiveShards(ActiveShardCount.NONE);
return replicationRequest;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;


public class TransportShardRefreshAction
extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {

Expand Down Expand Up @@ -70,11 +71,6 @@ protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request)
return new ReplicaResult();
}

@Override
protected boolean checkWriteConsistency() {
return false;
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,13 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public void setWaitForActiveShards(final int waitForActiveShards) {
setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,13 @@ public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActive
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public RolloverRequestBuilder waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards);
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public void setWaitForActiveShards(final int waitForActiveShards) {
setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ public ShrinkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiv
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public ShrinkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
29 changes: 20 additions & 9 deletions core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
List<Object> payloads = null;

protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

private long sizeInBytes = 0;
Expand Down Expand Up @@ -432,15 +433,25 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
}

/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
public BulkRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public BulkRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

public ActiveShardCount waitForActiveShards() {
return this.waitForActiveShards;
}

@Override
Expand Down Expand Up @@ -525,7 +536,7 @@ public ActionRequestValidationException validate() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
byte type = in.readByte();
Expand All @@ -550,7 +561,7 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(consistencyLevel.id());
waitForActiveShards.writeTo(out);
out.writeVInt(requests.size());
for (ActionRequest<?> request : requests) {
if (request instanceof IndexRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
Expand Down Expand Up @@ -111,13 +112,23 @@ public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable Strin
}

/**
* Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void onFailure(Exception e) {

@Override
protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void onFailure(Exception e) {

@Override
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
request.resolveRouting(metaData);
request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ BulkRequest getBulkRequest() {
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());

int slot = 0;
Expand Down
Loading