Skip to content

Commit

Permalink
Added support for percolating an existing document.
Browse files Browse the repository at this point in the history
The percolate an existing document feature executes an internal get operation to get the source of the document to percolate.

All options for percolating an existing document:
* `id` - The id of the document to percolate.
* `type` - The type of the document to percolate.
* `index` - The index to fetch the document to percolate from.
* `routing` - The routing value to use to retrieve the document to percolate.
* `preference` - Which shard to prefer (defaults to `_local`).
* `version` - Enables a version check. If the fetched document's version isn't equal to the specified version then the request fails with a version conflict and the percolation request is aborted.

All the option can be specified inside the `get` body part or via query string arguments.
Internally the percolate api will issue a get request for fetching the`_source` of the document to percolate.

For this feature to work the `_source` for documents to percolate need to be stored.

Closes #3380
  • Loading branch information
martijnvg committed Jul 26, 2013
1 parent cc5998b commit ebad9e5
Show file tree
Hide file tree
Showing 10 changed files with 830 additions and 122 deletions.
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.Required;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -41,12 +41,16 @@
*/
public class PercolateRequest extends BroadcastOperationRequest<PercolateRequest> {

public static final XContentType contentType = Requests.CONTENT_TYPE;

private String documentType;
private String routing;
private String preference;

private BytesReference documentSource;
private boolean documentUnsafe;
private BytesReference source;
private boolean unsafe;

private BytesReference fetchedDoc;

// Used internally in order to compute tookInMillis, TransportBroadcastOperationAction itself doesn't allow
// to hold it temporarily in an easy way
Expand All @@ -60,6 +64,15 @@ public PercolateRequest(String index, String documentType) {
this.documentType = documentType;
}

public PercolateRequest(PercolateRequest request, BytesReference fetchedDoc) {
super(request.indices());
this.documentType = request.documentType();
this.routing = request.routing();
this.preference = request.preference();
this.source = request.source;
this.fetchedDoc = fetchedDoc;
}

public String documentType() {
return documentType;
}
Expand Down Expand Up @@ -91,67 +104,70 @@ public PercolateRequest preference(String preference) {
*/
@Override
public void beforeLocalFork() {
if (documentUnsafe) {
documentSource = documentSource.copyBytesArray();
documentUnsafe = false;
if (unsafe) {
source = source.copyBytesArray();
unsafe = false;
}
}

public BytesReference documentSource() {
return documentSource;
public BytesReference source() {
return source;
}

@Required
public PercolateRequest documentSource(Map document) throws ElasticSearchGenerationException {
return documentSource(document, XContentType.SMILE);
public PercolateRequest source(Map document) throws ElasticSearchGenerationException {
return source(document, contentType);
}

@Required
public PercolateRequest documentSource(Map document, XContentType contentType) throws ElasticSearchGenerationException {
public PercolateRequest source(Map document, XContentType contentType) throws ElasticSearchGenerationException {
try {
XContentBuilder builder = XContentFactory.contentBuilder(contentType);
builder.map(document);
return documentSource(builder);
return source(builder);
} catch (IOException e) {
throw new ElasticSearchGenerationException("Failed to generate [" + document + "]", e);
}
}

@Required
public PercolateRequest documentSource(String document) {
this.documentSource = new BytesArray(document);
this.documentUnsafe = false;
public PercolateRequest source(String document) {
this.source = new BytesArray(document);
this.unsafe = false;
return this;
}

@Required
public PercolateRequest documentSource(XContentBuilder documentBuilder) {
documentSource = documentBuilder.bytes();
documentUnsafe = false;
public PercolateRequest source(XContentBuilder documentBuilder) {
source = documentBuilder.bytes();
unsafe = false;
return this;
}

public PercolateRequest documentSource(byte[] document) {
return documentSource(document, 0, document.length);
public PercolateRequest source(byte[] document) {
return source(document, 0, document.length);
}

@Required
public PercolateRequest documentSource(byte[] source, int offset, int length) {
return documentSource(source, offset, length, false);
public PercolateRequest source(byte[] source, int offset, int length) {
return source(source, offset, length, false);
}

@Required
public PercolateRequest documentSource(byte[] source, int offset, int length, boolean unsafe) {
return documentSource(new BytesArray(source, offset, length), unsafe);
public PercolateRequest source(byte[] source, int offset, int length, boolean unsafe) {
return source(new BytesArray(source, offset, length), unsafe);
}

@Required
public PercolateRequest documentSource(BytesReference source, boolean unsafe) {
this.documentSource = source;
this.documentUnsafe = unsafe;
public PercolateRequest source(BytesReference source, boolean unsafe) {
this.source = source;
this.unsafe = unsafe;
return this;
}

public PercolateRequest source(PercolateSourceBuilder sourceBuilder) {
this.source = sourceBuilder.buildAsBytes(contentType);
this.unsafe = false;
return this;
}

BytesReference fetchedDoc() {
return fetchedDoc;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
Expand All @@ -161,8 +177,8 @@ public ActionRequestValidationException validate() {
if (documentType == null) {
validationException = addValidationError("type is missing", validationException);
}
if (documentSource == null) {
validationException = addValidationError("documentSource is missing", validationException);
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
Expand All @@ -173,8 +189,8 @@ public void readFrom(StreamInput in) throws IOException {
documentType = in.readString();
routing = in.readOptionalString();
preference = in.readOptionalString();
documentUnsafe = false;
documentSource = in.readBytesReference();
unsafe = false;
source = in.readBytesReference();
startTime = in.readVLong();
}

Expand All @@ -184,7 +200,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(documentType);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeBytesReference(documentSource);
out.writeBytesReference(source);
out.writeVLong(startTime);
}
}
Expand Up @@ -27,6 +27,8 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;

import java.util.Map;

Expand All @@ -35,6 +37,8 @@
*/
public class PercolateRequestBuilder extends BroadcastOperationRequestBuilder<PercolateRequest, PercolateResponse, PercolateRequestBuilder> {

private PercolateSourceBuilder sourceBuilder;

public PercolateRequestBuilder(Client client, String index, String type) {
super((InternalClient) client, new PercolateRequest(index, type));
}
Expand Down Expand Up @@ -85,97 +89,88 @@ public PercolateRequestBuilder setPreference(String preference) {
return this;
}

/**
* Index the Map as a JSON.
*
* @param source The map to index
*/
public PercolateRequestBuilder setSource(PercolateSourceBuilder source) {
sourceBuilder = source;
return this;
}

public PercolateRequestBuilder setSource(Map<String, Object> source) {
request.documentSource(source);
request.source(source);
return this;
}

/**
* Index the Map as the provided content type.
*
* @param source The map to index
*/
public PercolateRequestBuilder setSource(Map<String, Object> source, XContentType contentType) {
request.documentSource(source, contentType);
request.source(source, contentType);
return this;
}

/**
* Sets the document source to index.
* <p/>
* <p>Note, its preferable to either set it using {@link #setSource(org.elasticsearch.common.xcontent.XContentBuilder)}
* or using the {@link #setSource(byte[])}.
*/
public PercolateRequestBuilder setSource(String source) {
request.documentSource(source);
request.source(source);
return this;
}

/**
* Sets the content source to index.
*/
public PercolateRequestBuilder setSource(XContentBuilder sourceBuilder) {
request.documentSource(sourceBuilder);
request.source(sourceBuilder);
return this;
}

/**
* Sets the document to index in bytes form.
*/
public PercolateRequestBuilder setSource(BytesReference source) {
request.documentSource(source, false);
request.source(source, false);
return this;
}

/**
* Sets the document to index in bytes form.
*/
public PercolateRequestBuilder setSource(BytesReference source, boolean unsafe) {
request.documentSource(source, unsafe);
request.source(source, unsafe);
return this;
}

/**
* Sets the document to index in bytes form.
*/
public PercolateRequestBuilder setSource(byte[] source) {
request.documentSource(source);
request.source(source);
return this;
}

/**
* Sets the document to index in bytes form (assumed to be safe to be used from different
* threads).
*
* @param source The source to index
* @param offset The offset in the byte array
* @param length The length of the data
*/
public PercolateRequestBuilder setSource(byte[] source, int offset, int length) {
request.documentSource(source, offset, length);
request.source(source, offset, length);
return this;
}

/**
* Sets the document to index in bytes form.
*
* @param source The source to index
* @param offset The offset in the byte array
* @param length The length of the data
* @param unsafe Is the byte array safe to be used form a different thread
*/
public PercolateRequestBuilder setSource(byte[] source, int offset, int length, boolean unsafe) {
request.documentSource(source, offset, length, unsafe);
request.source(source, offset, length, unsafe);
return this;
}

public PercolateRequestBuilder setPercolateDoc(PercolateSourceBuilder.DocBuilder docBuilder) {
sourceBuilder().setDoc(docBuilder);
return this;
}

public PercolateRequestBuilder setPercolateGet(PercolateSourceBuilder.GetBuilder getBuilder) {
sourceBuilder().setGet(getBuilder);
return this;
}

public PercolateRequestBuilder setPercolateQuery(QueryBuilder queryBuilder) {
sourceBuilder().setQueryBuilder(queryBuilder);
return this;
}

public PercolateRequestBuilder setPercolateFilter(FilterBuilder filterBuilder) {
sourceBuilder().setFilterBuilder(filterBuilder);
return this;
}

private PercolateSourceBuilder sourceBuilder() {
if (sourceBuilder == null) {
sourceBuilder = new PercolateSourceBuilder();
}
return sourceBuilder;
}

@Override
protected void doExecute(ActionListener<PercolateResponse> listener) {
if (sourceBuilder != null) {
request.source(sourceBuilder);
}
((Client) client).percolate(request, listener);
}

Expand Down
Expand Up @@ -12,37 +12,52 @@
public class PercolateShardRequest extends BroadcastShardOperationRequest {

private String documentType;
private BytesReference documentSource;
private BytesReference source;
private BytesReference fetchedDoc;

public PercolateShardRequest() {
}

public PercolateShardRequest(String index, int shardId, PercolateRequest request) {
super(index, shardId, request);
this.documentType = request.documentType();
this.documentSource = request.documentSource();
this.source = request.source();
this.fetchedDoc = request.fetchedDoc();
}

public String documentType() {
return documentType;
}

public BytesReference documentSource() {
return documentSource;
public BytesReference source() {
return source;
}

public BytesReference fetchedDoc() {
return fetchedDoc;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
documentType = in.readString();
documentSource = in.readBytesReference();
source = in.readBytesReference();
if (in.readBoolean()) {
fetchedDoc = in.readBytesReference();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(documentType);
out.writeBytesReference(documentSource);
out.writeBytesReference(source);
if (fetchedDoc != null) {
out.writeBoolean(true);
out.writeBytesReference(fetchedDoc);
} else {
out.writeBoolean(false);
}
}

}

0 comments on commit ebad9e5

Please sign in to comment.