Skip to content

Commit

Permalink
Redesigned the percolator engine to execute in a distribute manner.
Browse files Browse the repository at this point in the history
With this design the percolate queries will be stored in a special `_percolator` type with its own mapping in the same index where the actual data is or in a different index (dedicated percolation index, which might require different sharding behavior compared to the index that holds actual data and being search on). This approach allows percolate requests to scale to the number of primary shards an index has been configured with and effectively distributes the percolate execution.

This commit doesn't add new percolate features other than scaling. The response remains similar, with exception that a header similar to the search api has been added to the percolate response.

Closes #3173
  • Loading branch information
martijnvg committed Jul 18, 2013
1 parent f38103a commit c222ce2
Show file tree
Hide file tree
Showing 55 changed files with 2,157 additions and 2,053 deletions.
Expand Up @@ -20,18 +20,12 @@
*/

/**
* This class overwrites {@link MemoryIndex} to make the reuse constructor
* visible.
* This class overwrites {@link MemoryIndex} to make the reuse constructor visible.
*/
public final class ReusableMemoryIndex extends MemoryIndex {
private final long maxReuseBytes;
public ReusableMemoryIndex(boolean storeOffsets, long maxReusedBytes) {
public final class ExtendedMemoryIndex extends MemoryIndex {

public ExtendedMemoryIndex(boolean storeOffsets, long maxReusedBytes) {
super(storeOffsets, maxReusedBytes);
this.maxReuseBytes = maxReusedBytes;
}

public long getMaxReuseBytes() {
return maxReuseBytes;
}

}
Expand Up @@ -22,19 +22,15 @@
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.admin.indices.mapping.delete.TransportDeleteMappingAction;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -116,23 +112,7 @@ protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, final
@Override
public void onResponse(MetaDataDeleteIndexService.Response response) {
responseRef.set(new DeleteIndexResponse(response.acknowledged()));
// YACK, but here we go: If this index is also percolated, make sure to delete all percolated queries from the _percolator index
IndexMetaData percolatorMetaData = state.metaData().index(PercolatorService.INDEX_NAME);
if (percolatorMetaData != null && percolatorMetaData.mappings().containsKey(index)) {
deleteMappingAction.execute(new DeleteMappingRequest(PercolatorService.INDEX_NAME).type(index), new ActionListener<DeleteMappingResponse>() {
@Override
public void onResponse(DeleteMappingResponse deleteMappingResponse) {
latch.countDown();
}

@Override
public void onFailure(Throwable e) {
latch.countDown();
}
});
} else {
latch.countDown();
}
latch.countDown();
}

@Override
Expand Down
14 changes: 4 additions & 10 deletions src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Expand Up @@ -311,8 +311,6 @@ public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable Str
version = parser.longValue();
} else if ("_version_type".equals(currentFieldName) || "_versionType".equals(currentFieldName) || "version_type".equals(currentFieldName) || "versionType".equals(currentFieldName)) {
versionType = VersionType.fromString(parser.text());
} else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) {
percolate = parser.textOrNull();
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
}
Expand All @@ -332,24 +330,20 @@ public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable Str
if ("index".equals(action)) {
if (opType == null) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate), payload);
.source(data.slice(from, nextMarker - from), contentUnsafe), payload);
} else {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create("create".equals(opType))
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate), payload);
.source(data.slice(from, nextMarker - from), contentUnsafe), payload);
}
} else if ("create".equals(action)) {
internalAdd(new IndexRequest(index, type, id).routing(routing).parent(parent).timestamp(timestamp).ttl(ttl).version(version).versionType(versionType)
.create(true)
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate), payload);
.source(data.slice(from, nextMarker - from), contentUnsafe), payload);
} else if ("update".equals(action)) {
internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from))
.percolate(percolate), payload);
.source(data.slice(from, nextMarker - from)), payload);
}
// move pointers
from = nextMarker + 1;
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -55,8 +54,6 @@
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -232,9 +229,7 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(),
indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
updateResponse.setMatches(indexResponse.getMatches());
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
Expand Down Expand Up @@ -402,11 +397,6 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
}

// if we are going to percolate, then we need to keep this op for the postPrimary operation
if (!Strings.hasLength(indexRequest.percolate())) {
op = null;
}

IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version, created);
return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op);
}
Expand Down Expand Up @@ -511,39 +501,6 @@ private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRe
}
}

@Override
protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse, BulkShardRequest> response) {
IndexService indexService = indicesService.indexServiceSafe(request.index());
Engine.IndexingOperation[] ops = (Engine.IndexingOperation[]) response.payload();
if (ops == null) {
return;
}
for (int i = 0; i < ops.length; i++) {
BulkItemRequest itemRequest = request.items()[i];
BulkItemResponse itemResponse = response.response().getResponses()[i];
if (itemResponse.isFailed()) {
// failure, continue
continue;
}
Engine.IndexingOperation op = ops[i];
if (op == null) {
continue; // failed / no matches requested
}
if (itemRequest.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) itemRequest.request();
if (!Strings.hasLength(indexRequest.percolate())) {
continue;
}
try {
PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest(op.parsedDoc(), indexRequest.percolate()));
((IndexResponse) itemResponse.getResponse()).setMatches(percolate.matches());
} catch (Exception e) {
logger.warn("failed to percolate [{}]", e, itemRequest.request());
}
}
}
}

@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Expand Down
17 changes: 0 additions & 17 deletions src/main/java/org/elasticsearch/action/index/IndexRequest.java
Expand Up @@ -133,7 +133,6 @@ public static OpType fromId(byte id) {
private boolean refresh = false;
private long version = Versions.MATCH_ANY;
private VersionType versionType = VersionType.INTERNAL;
private String percolate;

private XContentType contentType = Requests.INDEX_CONTENT_TYPE;

Expand Down Expand Up @@ -549,20 +548,6 @@ public VersionType versionType() {
return this.versionType;
}

/**
* Causes the index request document to be percolated. The parameter is the percolate query
* to use to reduce the percolated queries that are going to run against this doc. Can be
* set to <tt>*</tt> to indicate that all percolate queries should be run.
*/
public IndexRequest percolate(String percolate) {
this.percolate = percolate;
return this;
}

public String percolate() {
return this.percolate;
}

public void process(MetaData metaData, String aliasOrIndex, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration) throws ElasticSearchException {
// resolve the routing if needed
routing(metaData.resolveIndexRouting(routing, aliasOrIndex));
Expand Down Expand Up @@ -635,7 +620,6 @@ public void readFrom(StreamInput in) throws IOException {
opType = OpType.fromId(in.readByte());
refresh = in.readBoolean();
version = in.readLong();
percolate = in.readOptionalString();
versionType = VersionType.fromValue(in.readByte());
}

Expand All @@ -652,7 +636,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeByte(opType.id());
out.writeBoolean(refresh);
out.writeLong(version);
out.writeOptionalString(percolate);
out.writeByte(versionType.getValue());
}

Expand Down
Expand Up @@ -294,16 +294,6 @@ public IndexRequestBuilder setVersionType(VersionType versionType) {
return this;
}

/**
* Causes the index request document to be percolated. The parameter is the percolate query
* to use to reduce the percolated queries that are going to run against this doc. Can be
* set to <tt>*</tt> to indicate that all percolate queries should be run.
*/
public IndexRequestBuilder setPercolate(String percolate) {
request.percolate(percolate);
return this;
}

/**
* Sets the timestamp either as millis since the epoch, or, in the configured date format.
*/
Expand Down
48 changes: 0 additions & 48 deletions src/main/java/org/elasticsearch/action/index/IndexResponse.java
Expand Up @@ -19,14 +19,11 @@

package org.elasticsearch.action.index;

import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* A response of an index operation,
Expand All @@ -41,7 +38,6 @@ public class IndexResponse extends ActionResponse {
private String type;
private long version;
private boolean created;
private List<String> matches;

public IndexResponse() {

Expand Down Expand Up @@ -90,20 +86,6 @@ public boolean isCreated() {
return this.created;
}

/**
* Returns the percolate queries matches. <tt>null</tt> if no percolation was requested.
*/
public List<String> getMatches() {
return this.matches;
}

/**
* Internal.
*/
public void setMatches(List<String> matches) {
this.matches = matches;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -112,27 +94,6 @@ public void readFrom(StreamInput in) throws IOException {
type = in.readString();
version = in.readLong();
created = in.readBoolean();
if (in.readBoolean()) {
int size = in.readVInt();
if (size == 0) {
matches = ImmutableList.of();
} else if (size == 1) {
matches = ImmutableList.of(in.readString());
} else if (size == 2) {
matches = ImmutableList.of(in.readString(), in.readString());
} else if (size == 3) {
matches = ImmutableList.of(in.readString(), in.readString(), in.readString());
} else if (size == 4) {
matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString());
} else if (size == 5) {
matches = ImmutableList.of(in.readString(), in.readString(), in.readString(), in.readString(), in.readString());
} else {
matches = new ArrayList<String>();
for (int i = 0; i < size; i++) {
matches.add(in.readString());
}
}
}
}

@Override
Expand All @@ -143,14 +104,5 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeLong(version);
out.writeBoolean(created);
if (matches == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(matches.size());
for (String match : matches) {
out.writeString(match);
}
}
}
}
Expand Up @@ -36,15 +36,12 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.percolator.PercolatorExecutor;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
Expand Down Expand Up @@ -236,21 +233,6 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
return new PrimaryResponse<IndexResponse, IndexRequest>(shardRequest.request, response, op);
}

@Override
protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse, IndexRequest> response) {
Engine.IndexingOperation op = (Engine.IndexingOperation) response.payload();
if (!Strings.hasLength(request.percolate())) {
return;
}
IndexService indexService = indicesService.indexServiceSafe(request.index());
try {
PercolatorExecutor.Response percolate = indexService.percolateService().percolate(new PercolatorExecutor.DocAndSourceQueryRequest(op.parsedDoc(), request.percolate()));
response.response().setMatches(percolate.matches());
} catch (Exception e) {
logger.warn("failed to percolate [{}]", e, request);
}
}

@Override
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Expand Down

0 comments on commit c222ce2

Please sign in to comment.