Skip to content

Commit

Permalink
Write Consistency Level for index/delete/delete_by_query/bulk with on…
Browse files Browse the repository at this point in the history
…e/quorum/all. Defaults to quorum, closes elastic#444.
  • Loading branch information
kimchy committed Oct 22, 2010
1 parent 5649df5 commit 5d1d927
Show file tree
Hide file tree
Showing 29 changed files with 437 additions and 60 deletions.
Expand Up @@ -25,9 +25,9 @@
/**
* @author kimchy (shay.banon)
*/
public class PrimaryNotStartedActionException extends ElasticSearchException {
public class UnavailableShardsException extends ElasticSearchException {

public PrimaryNotStartedActionException(ShardId shardId, String message) {
public UnavailableShardsException(ShardId shardId, String message) {
super(buildMessage(shardId, message));
}

Expand Down
@@ -0,0 +1,71 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action;

import org.elasticsearch.ElasticSearchIllegalArgumentException;

/**
* Write Consistency Level control how many replicas should be active for a write operation to occur (a write operation
* can be index, or delete).
*
* @author kimchy (shay.banon)
*/
public enum WriteConsistencyLevel {
DEFAULT((byte) 0),
ONE((byte) 1),
QUORUM((byte) 2),
ALL((byte) 3);

private final byte id;

WriteConsistencyLevel(byte id) {
this.id = id;
}

public byte id() {
return id;
}

public static WriteConsistencyLevel fromId(byte value) {
if (value == 0) {
return DEFAULT;
} else if (value == 1) {
return ONE;
} else if (value == 2) {
return QUORUM;
} else if (value == 3) {
return ALL;
}
throw new ElasticSearchIllegalArgumentException("No write consistency match [" + value + "]");
}

public static WriteConsistencyLevel fromString(String value) {
if (value.equals("default")) {
return DEFAULT;
} else if (value.equals("one")) {
return ONE;
} else if (value.equals("quorum")) {
return QUORUM;
} else if (value.equals("all")) {
return ALL;
}
throw new ElasticSearchIllegalArgumentException("No write consistency match [" + value + "]");
}
}
Expand Up @@ -31,7 +31,7 @@
import org.elasticsearch.transport.TransportService;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportShardReplicationPingAction extends TransportShardReplicationOperationAction<ShardReplicationPingRequest, ShardReplicationPingResponse> {

Expand All @@ -41,6 +41,10 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
}

@Override protected boolean checkWriteConsistency() {
return true;
}

@Override protected ShardReplicationPingRequest newRequestInstance() {
return new ShardReplicationPingRequest();
}
Expand Down
Expand Up @@ -21,8 +21,10 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -48,6 +50,9 @@ public class BulkRequest implements ActionRequest {

private boolean listenerThreaded = false;

private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
Expand Down Expand Up @@ -150,6 +155,30 @@ public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe)
return this;
}

/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}

/**
* Set the replication type for this operation.
*/
public BulkRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}

public ReplicationType replicationType() {
return this.replicationType;
}

private int findNextMarker(byte marker, int from, byte[] data, int length) {
for (int i = from; i < length; i++) {
if (data[i] == marker) {
Expand Down Expand Up @@ -191,6 +220,8 @@ public int numberOfActions() {
}

@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
int size = in.readVInt();
for (int i = 0; i < size; i++) {
byte type = in.readByte();
Expand All @@ -207,6 +238,8 @@ public int numberOfActions() {
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
out.writeVInt(requests.size());
for (ActionRequest request : requests) {
if (request instanceof IndexRequest) {
Expand Down
Expand Up @@ -175,7 +175,10 @@ private void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bul
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
shardBulkAction.execute(new BulkShardRequest(shardId.index().name(), shardId.id(), requests.toArray(new BulkItemRequest[requests.size()])), new ActionListener<BulkShardResponse>() {
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId.index().name(), shardId.id(), requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.replicationType(bulkRequest.replicationType());
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override public void onResponse(BulkShardResponse bulkShardResponse) {
synchronized (responses) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.responses()) {
Expand Down
Expand Up @@ -66,6 +66,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
this.mappingUpdatedAction = mappingUpdatedAction;
}

@Override protected boolean checkWriteConsistency() {
return true;
}

@Override protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.options().withCompress(true);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.delete;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.Required;
Expand Down Expand Up @@ -117,6 +118,14 @@ public DeleteRequest() {
return this;
}

/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
@Override public DeleteRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
super.consistencyLevel(consistencyLevel);
return this;
}

/**
* The type of the document to delete.
*/
Expand Down
Expand Up @@ -80,6 +80,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
}

@Override protected boolean checkWriteConsistency() {
return true;
}

@Override protected DeleteRequest newRequestInstance() {
return new DeleteRequest();
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Requests;
Expand Down Expand Up @@ -238,6 +239,11 @@ public DeleteByQueryRequest replicationType(ReplicationType replicationType) {
return this;
}

public DeleteByQueryRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
this.consistencyLevel = consistencyLevel;
return this;
}

/**
* The replication type to use with this operation.
*/
Expand Down
Expand Up @@ -50,9 +50,9 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
this.queryParserName = request.queryParserName();
this.types = request.types();
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
}


IndexDeleteByQueryRequest() {
}

Expand Down
Expand Up @@ -55,6 +55,7 @@ public ShardDeleteByQueryRequest(String index, byte[] querySource, @Nullable Str
ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
this(request.index(), request.querySource(), request.queryParserName(), request.types(), shardId);
replicationType(request.replicationType());
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
}

Expand Down
Expand Up @@ -44,6 +44,10 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
}

@Override protected boolean checkWriteConsistency() {
return true;
}

@Override protected ShardDeleteByQueryRequest newRequestInstance() {
return new ShardDeleteByQueryRequest();
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.common.Required;
Expand Down Expand Up @@ -366,6 +367,14 @@ public IndexRequest opType(String opType) throws ElasticSearchIllegalArgumentExc
return this;
}

/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/
@Override public IndexRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) {
super.consistencyLevel(consistencyLevel);
return this;
}

/**
* Set the replication type for this operation.
*/
Expand Down
Expand Up @@ -107,6 +107,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}

@Override protected boolean checkWriteConsistency() {
return true;
}

@Override protected IndexRequest newRequestInstance() {
return new IndexRequest();
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -41,6 +42,7 @@ public class IndexReplicationOperationRequest implements ActionRequest {
private boolean threadedListener = false;

protected ReplicationType replicationType = ReplicationType.DEFAULT;
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

public TimeValue timeout() {
return timeout;
Expand All @@ -63,6 +65,10 @@ public ReplicationType replicationType() {
return this.replicationType;
}

public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}

@Override public IndexReplicationOperationRequest listenerThreaded(boolean threadedListener) {
this.threadedListener = threadedListener;
return this;
Expand All @@ -78,12 +84,14 @@ public ReplicationType replicationType() {

@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readUTF();
}

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeUTF(index);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -39,6 +40,7 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
private boolean threadedListener = false;

protected ReplicationType replicationType = ReplicationType.DEFAULT;
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;

public TimeValue timeout() {
return timeout;
Expand Down Expand Up @@ -76,8 +78,13 @@ public ReplicationType replicationType() {
return this.replicationType;
}

public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}

@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
Expand All @@ -87,6 +94,7 @@ public ReplicationType replicationType() {

@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeVInt(indices.length);
for (String index : indices) {
Expand Down

0 comments on commit 5d1d927

Please sign in to comment.