Skip to content

Commit

Permalink
Bulk Api support for global parameters (#34528)
Browse files Browse the repository at this point in the history
Bulk Request in High level rest client should be consistent with what is
possible in Rest API, therefore should support global parameters. Global
parameters are passed in URL in Rest API.

Some parameters are mandatory - index, type - and would fail validation
if not provided before before the bulk is executed.
Optional parameters - routing, pipeline.

The usage of these should be consistent across sync/async execution,
bulk processor and BulkRequestBuilder

closes #26026
  • Loading branch information
pgomulka committed Oct 30, 2018
1 parent d67c88f commit 995bf0e
Show file tree
Hide file tree
Showing 21 changed files with 712 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
Params parameters = new Params(request);
parameters.withTimeout(bulkRequest.timeout());
parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());

parameters.withPipeline(bulkRequest.pipeline());
parameters.withRouting(bulkRequest.routing());
// Bulk API only supports newline delimited JSON or Smile. Before executing
// the bulk, we need to check that all requests have the same content-type
// and this content-type is supported by the Bulk API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,18 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -44,10 +48,19 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import java.util.stream.IntStream;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.fieldFromSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasId;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasIndex;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasProperty;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.hasType;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -268,23 +281,124 @@ public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception
assertMultiGetResponse(highLevelClient().mget(multiGetRequest, RequestOptions.DEFAULT), testDocs);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
@SuppressWarnings("unchecked")
public void testGlobalParametersAndSingleRequest() throws Exception {
createIndexWithMultipleShards("test");

final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");

// tag::bulk-processor-mix-parameters
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
.setGlobalIndex("tweets")
.setGlobalType("_doc")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {


processor.add(new IndexRequest() // <1>
.source(XContentType.JSON, "user", "some user"));
processor.add(new IndexRequest("blogs", "post_type", "1") // <2>
.source(XContentType.JSON, "title", "some title"));
}
// end::bulk-request-mix-pipeline
latch.await();

Iterable<SearchHit> hits = searchAll(new SearchRequest("tweets").routing("routing"));
assertThat(hits, everyItem(hasProperty(fieldFromSource("user"), equalTo("some user"))));
assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));


Iterable<SearchHit> blogs = searchAll(new SearchRequest("blogs").routing("routing"));
assertThat(blogs, everyItem(hasProperty(fieldFromSource("title"), equalTo("some title"))));
assertThat(blogs, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
}

@SuppressWarnings("unchecked")
public void testGlobalParametersAndBulkProcessor() throws Exception {
createIndexWithMultipleShards("test");

final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
createFieldAddingPipleine("pipeline_id", "fieldNameXYZ", "valueXYZ");

int numDocs = randomIntBetween(10, 10);
try (BulkProcessor processor = initBulkProcessorBuilder(listener)
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
.setGlobalIndex("test")
.setGlobalType("test")
.setGlobalRouting("routing")
.setGlobalPipeline("pipeline_id")
.build()) {

indexDocs(processor, numDocs, null, null, "test", "test", "pipeline_id");
latch.await();

assertThat(listener.beforeCounts.get(), equalTo(1));
assertThat(listener.afterCounts.get(), equalTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);

Iterable<SearchHit> hits = searchAll(new SearchRequest("test").routing("routing"));

assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ"))));
assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType("test"))));
assertThat(hits, containsInAnyOrder(expectedIds(numDocs)));
}
}

@SuppressWarnings("unchecked")
private Matcher<SearchHit>[] expectedIds(int numDocs) {
return IntStream.rangeClosed(1, numDocs)
.boxed()
.map(n -> hasId(n.toString()))
.<Matcher<SearchHit>>toArray(Matcher[]::new);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType,
String globalIndex, String globalType, String globalPipeline) throws Exception {
MultiGetRequest multiGetRequest = new MultiGetRequest();
for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest("test", "test", Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
processor.add(new IndexRequest(localIndex, localType, Integer.toString(i))
.source(XContentType.JSON, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
+ Strings.toString(JsonXContent.contentBuilder()
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject()) + "\n";
processor.add(new BytesArray(source), null, null, XContentType.JSON);
BytesArray data = bytesBulkRequest(localIndex, localType, i);
processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON);
}
multiGetRequest.add("test", "test", Integer.toString(i));
multiGetRequest.add(localIndex, localType, Integer.toString(i));
}
return multiGetRequest;
}

private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException {
String action = Strings.toString(jsonBuilder()
.startObject()
.startObject("index")
.field("_index", localIndex)
.field("_type", localType)
.field("_id", Integer.toString(id))
.endObject()
.endObject()
);
String source = Strings.toString(jsonBuilder()
.startObject()
.field("field", randomRealisticUnicodeOfLengthBetween(1, 30))
.endObject()
);

String request = action + "\n" + source + "\n";
return new BytesArray(request);
}

private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception {
return indexDocs(processor, numDocs, "test", "test", null, null, null);
}

private static void assertResponseItems(List<BulkItemResponse> bulkItemResponses, int numDocs) {
assertThat(bulkItemResponses.size(), is(numDocs));
int i = 1;
Expand Down Expand Up @@ -343,4 +457,5 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}


}

0 comments on commit 995bf0e

Please sign in to comment.