Skip to content

Commit

Permalink
Merge pull request #12114 from jasontedor/feature/11527
Browse files Browse the repository at this point in the history
Add support for retrieving fields in bulk updates

This commit adds support to retrieve fields when using the bulk update API. This functionality was previously available for the update API
but not for the bulk update API.

Closes #11527
  • Loading branch information
jasontedor committed Jul 8, 2015
2 parents 72bf32e + b61709c commit cbcc553
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 6 deletions.
Expand Up @@ -285,7 +285,7 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
}

public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, payload, true);
bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true);
executeIfNeeded();
return this;
}
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -239,17 +240,17 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
return add(data, defaultIndex, defaultType, null, null, true);
return add(data, defaultIndex, defaultType, null, null, null, true);
}

/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex) throws Exception {
return add(data, defaultIndex, defaultType, null, null, allowExplicitIndex);
return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex);
}

public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
XContent xContent = XContentFactory.xContent(data);
int line = 0;
int from = 0;
Expand Down Expand Up @@ -283,6 +284,7 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
String id = null;
String routing = defaultRouting;
String parent = null;
String[] fields = defaultFields;
String timestamp = null;
Long ttl = null;
String opType = null;
Expand Down Expand Up @@ -329,6 +331,9 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
versionType = VersionType.fromString(parser.text());
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
} else if ("fields".equals(currentFieldName)) {
List<Object> values = parser.list();
fields = values.toArray(new String[values.size()]);
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]");
}
Expand Down Expand Up @@ -372,6 +377,9 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
.routing(routing)
.parent(parent)
.source(data.slice(from, nextMarker - from));
if (fields != null) {
updateRequest.fields(fields);
}

IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
Expand Down
Expand Up @@ -44,6 +44,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -667,6 +668,10 @@ public UpdateRequest source(BytesReference source) throws Exception {
docAsUpsert(parser.booleanValue());
} else if ("detect_noop".equals(currentFieldName)) {
detectNoop(parser.booleanValue());
} else if ("fields".equals(currentFieldName)) {
List<Object> values = parser.list();
String[] fields = values.toArray(new String[values.size()]);
fields(fields);
} else {
//here we don't have settings available, unable to throw deprecation exceptions
scriptParameterParser.token(currentFieldName, token, parser, ParseFieldMatcher.EMPTY);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -75,14 +76,16 @@ public void handleRequest(final RestRequest request, final RestChannel channel,
String defaultIndex = request.param("index");
String defaultType = request.param("type");
String defaultRouting = request.param("routing");
String fieldsParam = request.param("fields");
String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null;

String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, null, allowExplicitIndex);
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields, null, allowExplicitIndex);

client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {
@Override
Expand Down Expand Up @@ -131,6 +134,11 @@ public RestResponse buildResponse(BulkResponse response, XContentBuilder builder
} else {
builder.field(Fields.STATUS, shardInfo.status().getStatus());
}
if (updateResponse.getGetResult() != null) {
builder.startObject(Fields.GET);
updateResponse.getGetResult().toXContentEmbedded(builder, request);
builder.endObject();
}
}
}
builder.endObject();
Expand All @@ -155,6 +163,7 @@ static final class Fields {
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString FOUND = new XContentBuilderString("found");
static final XContentBuilderString GET = new XContentBuilderString("get");
}

}
6 changes: 5 additions & 1 deletion docs/reference/docs/bulk.asciidoc
Expand Up @@ -180,7 +180,7 @@ times an update should be retried in the case of a version conflict.

The `update` action payload, supports the following options: `doc`
(partial document), `upsert`, `doc_as_upsert`, `script`, `params` (for
script), `lang` (for script). See update documentation for details on
script), `lang` (for script) and `fields`. See update documentation for details on
the options. Curl example with update actions:

[source,js]
Expand All @@ -191,6 +191,10 @@ the options. Curl example with update actions:
{ "script" : { "inline": "ctx._source.counter += param1", "lang" : "js", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_type" : "type1", "_index" : "index1", "fields" : ["_source"]} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "fields": ["_source"]}
--------------------------------------------------

[float]
Expand Down
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
Expand Up @@ -36,6 +36,10 @@
"type": {
"type" : "string",
"description" : "Default document type for items which don't provide one"
},
"fields": {
"type": "string",
"description" : "Default comma-separated list of fields to return in the response for updates"
}
}
},
Expand Down
@@ -0,0 +1,49 @@
---
"Fields":
- do:
index:
refresh: true
index: test_index
type: test_type
id: test_id_1
body: { "foo": "bar" }

- do:
index:
refresh: true
index: test_index
type: test_type
id: test_id_2
body: { "foo": "qux" }

- do:
index:
refresh: true
index: test_index
type: test_type
id: test_id_3
body: { "foo": "corge" }


- do:
bulk:
refresh: true
body: |
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "test_id_1", "fields": ["_source"] } }
{ "doc": { "foo": "baz" } }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "test_id_2" } }
{ "fields": ["_source"], "doc": { "foo": "quux" } }
- match: { items.0.update.get._source.foo: baz }
- match: { items.1.update.get._source.foo: quux }

- do:
bulk:
index: test_index
type: test_type
fields: _source
body: |
{ "update": { "_id": "test_id_3" } }
{ "doc": { "foo": "garply" } }
- match: { items.0.update.get._source.foo: garply }

0 comments on commit cbcc553

Please sign in to comment.