Skip to content

Commit

Permalink
API: Allow to control document shard routing, and search shard routing,
Browse files Browse the repository at this point in the history
closes elastic#470.
  • Loading branch information
kimchy committed Nov 2, 2010
1 parent 8e2e85f commit a62f1f3
Show file tree
Hide file tree
Showing 44 changed files with 680 additions and 98 deletions.
Expand Up @@ -61,7 +61,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct
}

@Override protected GroupShardsIterator shards(BroadcastPingRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), null);
}

@Override protected BroadcastPingResponse newResponse(BroadcastPingRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
Expand Down
Expand Up @@ -108,6 +108,7 @@ public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe)
String index = null;
String type = null;
String id = null;
String routing = null;
String opType = null;

String currentFieldName = null;
Expand All @@ -121,30 +122,32 @@ public BulkRequest add(byte[] data, int from, int length, boolean contentUnsafe)
type = parser.text();
} else if ("_id".equals(currentFieldName)) {
id = parser.text();
} else if ("_routing".equals(currentFieldName)) {
routing = parser.text();
} else if ("op_type".equals(currentFieldName) || "opType".equals(currentFieldName)) {
opType = parser.text();
}
}
}

if ("delete".equals(action)) {
add(new DeleteRequest(index, type, id));
add(new DeleteRequest(index, type, id).routing(routing));
} else {
nextMarker = findNextMarker(marker, from, data, length);
if (nextMarker == -1) {
break;
}
if ("index".equals(action)) {
if (opType == null) {
add(new IndexRequest(index, type, id)
add(new IndexRequest(index, type, id).routing(routing)
.source(data, from, nextMarker - from, contentUnsafe));
} else {
add(new IndexRequest(index, type, id)
add(new IndexRequest(index, type, id).routing(routing)
.create("create".equals(opType))
.source(data, from, nextMarker - from, contentUnsafe));
}
} else if ("create".equals(action)) {
add(new IndexRequest(index, type, id)
add(new IndexRequest(index, type, id).routing(routing)
.create(true)
.source(data, from, nextMarker - from, contentUnsafe));
}
Expand Down
Expand Up @@ -157,10 +157,10 @@ private void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bul
ShardId shardId = null;
if (request instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) request;
shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id()).shardId();
shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
} else if (request instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) request;
shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id()).shardId();
shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
}
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
Expand Down
Expand Up @@ -63,6 +63,7 @@ public class CountRequest extends BroadcastOperationRequest {
private float minScore = DEFAULT_MIN_SCORE;

@Nullable protected String queryHint;
@Nullable protected String routing;

private byte[] querySource;
private int querySourceOffset;
Expand Down Expand Up @@ -264,13 +265,39 @@ public CountRequest types(String... types) {
return this;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
return this.routing;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public CountRequest routing(String routing) {
this.routing = routing;
return this;
}

/**
* The routing values to control the shards that the search will be executed on.
*/
public CountRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}

@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
minScore = in.readFloat();

if (in.readBoolean()) {
queryHint = in.readUTF();
}
if (in.readBoolean()) {
routing = in.readUTF();
}

querySourceUnsafe = false;
querySourceOffset = 0;
Expand Down Expand Up @@ -300,6 +327,12 @@ public CountRequest types(String... types) {
out.writeBoolean(true);
out.writeUTF(queryHint);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}

out.writeVInt(querySourceLength);
out.writeBytes(querySource, querySourceOffset, querySourceLength);
Expand Down
Expand Up @@ -79,7 +79,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction<Coun
}

@Override protected GroupShardsIterator shards(CountRequest request, ClusterState clusterState) {
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint());
return clusterService.operationRouting().searchShards(clusterState, request.indices(), request.queryHint(), request.routing());
}

@Override protected void checkBlock(CountRequest request, ClusterState state) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;

import javax.annotation.Nullable;
import java.io.IOException;

import static org.elasticsearch.action.Actions.*;
Expand All @@ -48,6 +49,7 @@ public class DeleteRequest extends ShardReplicationOperationRequest {

private String type;
private String id;
@Nullable private String routing;
private boolean refresh;

/**
Expand Down Expand Up @@ -164,6 +166,23 @@ public DeleteRequest timeout(TimeValue timeout) {
return this;
}

/**
* Controls the shard routing of the request. Using this value to hash the shard
* and not the id.
*/
public DeleteRequest routing(String routing) {
this.routing = routing;
return this;
}

/**
* Controls the shard routing of the delete request. Using this value to hash the shard
* and not the id.
*/
public String routing() {
return this.routing;
}

/**
* Should a refresh be executed post this index operation causing the operation to
* be searchable. Note, heavy indexing should not set this to <tt>true</tt>. Defaults
Expand All @@ -182,13 +201,22 @@ public boolean refresh() {
super.readFrom(in);
type = in.readUTF();
id = in.readUTF();
if (in.readBoolean()) {
routing = in.readUTF();
}
refresh = in.readBoolean();
}

@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeUTF(type);
out.writeUTF(id);
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
out.writeBoolean(refresh);
}

Expand Down
Expand Up @@ -102,7 +102,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct

@Override protected DeleteResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
indexShard(shardRequest).delete(request.type(), request.id());
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
delete.refresh(request.refresh());
indexShard.delete(delete);
return new DeleteResponse(request.index(), request.type(), request.id());
}

Expand All @@ -116,6 +119,6 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct

@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {
return clusterService.operationRouting()
.deleteShards(clusterService.state(), request.index(), request.type(), request.id());
.deleteShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}
}
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {

private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;

/**
* Constructs a new delete by query request to run against the provided indices. No indices means
Expand Down Expand Up @@ -207,6 +209,29 @@ String[] types() {
return this.types;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public String routing() {
return this.routing;
}

/**
* A comma separated list of routing values to control the shards the search will be executed on.
*/
public DeleteByQueryRequest routing(String routing) {
this.routing = routing;
return this;
}

/**
* The routing values to control the shards that the search will be executed on.
*/
public DeleteByQueryRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}

/**
* The types of documents the query will run against. Defaults to all types.
*/
Expand Down Expand Up @@ -264,6 +289,10 @@ public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
queryParserName = in.readUTF();
}

if (in.readBoolean()) {
routing = in.readUTF();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -278,6 +307,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
out.writeUTF(queryParserName);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}

@Override public String toString() {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;

import javax.annotation.Nullable;
import java.io.IOException;

import static org.elasticsearch.action.Actions.*;
Expand All @@ -42,6 +43,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
private byte[] querySource;
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
@Nullable private String routing;

IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index) {
this.index = index;
Expand All @@ -51,6 +53,7 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
this.types = request.types();
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
this.routing = request.routing();
}

IndexDeleteByQueryRequest() {
Expand Down Expand Up @@ -81,6 +84,10 @@ String queryParserName() {
return queryParserName;
}

String routing() {
return this.routing;
}

String[] types() {
return this.types;
}
Expand Down Expand Up @@ -109,6 +116,9 @@ public void readFrom(StreamInput in) throws IOException {
types[i] = in.readUTF();
}
}
if (in.readBoolean()) {
routing = in.readUTF();
}
}

public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -125,5 +135,11 @@ public void writeTo(StreamOutput out) throws IOException {
for (String type : types) {
out.writeUTF(type);
}
if (routing == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(routing);
}
}
}

0 comments on commit a62f1f3

Please sign in to comment.