Skip to content

Commit

Permalink
Merge pull request #19454 from abeyad/remove-write-consistency-level
Browse files Browse the repository at this point in the history
Removes write consistency level across replication action APIs in favor of wait_for_active_shards
  • Loading branch information
Ali Beyad committed Aug 2, 2016
2 parents b99a482 + 4923da9 commit 3d2a105
Show file tree
Hide file tree
Showing 61 changed files with 523 additions and 443 deletions.
3 changes: 1 addition & 2 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]AutoCreateIndexTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptionsTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]TransportActionFilterChainTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
Expand Down Expand Up @@ -799,7 +800,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]cbor[/\\]JsonVsCborTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]smile[/\\]JsonVsSmileTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]xcontent[/\\]support[/\\]filtering[/\\]FilterPathGeneratorFilteringTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]consistencylevel[/\\]WriteConsistencyLevelIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]deps[/\\]joda[/\\]SimpleJodaTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]BlockingClusterStatePublishResponseHandlerTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]DiscoveryWithServiceDisruptionsIT.java" checks="LineLength" />
Expand Down Expand Up @@ -1025,7 +1025,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWhileRelocatingIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomExceptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]SearchWithRandomIOExceptionsIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportSearchFailuresIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]basic[/\\]TransportTwoNodesSearchIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ChildQuerySearchIT.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]child[/\\]ParentFieldLoadingIT.java" checks="LineLength" />
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,15 @@ public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShar
return this;
}

/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}


@Override
public void readFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,13 @@ public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitFor
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -33,6 +34,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(shardId);
this.request = request;
this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default
}

public ShardFlushRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request) {
return new ReplicaResult();
}

@Override
protected boolean checkWriteConsistency() {
return false;
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
Expand Down Expand Up @@ -54,7 +55,9 @@ protected ReplicationResponse newShardResponse() {

@Override
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
return new BasicReplicationRequest(shardId);
BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId);
replicationRequest.waitForActiveShards(ActiveShardCount.NONE);
return replicationRequest;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;


public class TransportShardRefreshAction
extends TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> {

Expand Down Expand Up @@ -70,11 +71,6 @@ protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request)
return new ReplicaResult();
}

@Override
protected boolean checkWriteConsistency() {
return false;
}

@Override
protected ClusterBlockLevel globalBlockLevel() {
return ClusterBlockLevel.METADATA_WRITE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,13 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public void setWaitForActiveShards(final int waitForActiveShards) {
setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,13 @@ public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActive
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public RolloverRequestBuilder waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards);
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public void setWaitForActiveShards(final int waitForActiveShards) {
setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

public void source(BytesReference source) {
XContentType xContentType = XContentFactory.xContentType(source);
if (xContentType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ public ShrinkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiv
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public ShrinkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}
}
29 changes: 20 additions & 9 deletions core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
List<Object> payloads = null;

protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

private long sizeInBytes = 0;
Expand Down Expand Up @@ -432,15 +433,25 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
}

/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
public BulkRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
/**
* A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public BulkRequest waitForActiveShards(final int waitForActiveShards) {
return waitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

public ActiveShardCount waitForActiveShards() {
return this.waitForActiveShards;
}

@Override
Expand Down Expand Up @@ -525,7 +536,7 @@ public ActionRequestValidationException validate() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
waitForActiveShards = ActiveShardCount.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
byte type = in.readByte();
Expand All @@ -550,7 +561,7 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(consistencyLevel.id());
waitForActiveShards.writeTo(out);
out.writeVInt(requests.size());
for (ActionRequest<?> request : requests) {
if (request instanceof IndexRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
Expand Down Expand Up @@ -111,13 +112,23 @@ public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable Strin
}

/**
* Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
* Sets the number of shard copies that must be active before proceeding with the write.
* See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details.
*/
public BulkRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);
public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}

/**
* A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical
* shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)}
* to get the ActiveShardCount.
*/
public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) {
return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards));
}

/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void onFailure(Exception e) {

@Override
protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void onFailure(Exception e) {

@Override
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
super.resolveRequest(metaData, indexMetaData, request);
MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
request.resolveRouting(metaData);
request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ BulkRequest getBulkRequest() {
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());

int slot = 0;
Expand Down
Loading

0 comments on commit 3d2a105

Please sign in to comment.