Permalink
Browse files

add a simple bulk processor to simplify using the bulk API

  • Loading branch information...
1 parent 8c2840c commit 8fbf7e61fdf84683cecf2c19135724124f7cc01f @kimchy kimchy committed Aug 13, 2012
@@ -0,0 +1,207 @@
+/*
+ * Licensed to ElasticSearch and Shay Banon under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. ElasticSearch licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.bulk;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * A bulk processor is a thread safe bulk processing class, allowing to easily set when to "flush" a new bulk request
+ * (either based on number of actions, or based on the size), and to easily control the number of concurrent bulk
+ * requests allowed to be executed in parallel.
+ * <p/>
+ * In order to create a new bulk processor, use the {@link Builder}.
+ */
+public class BulkProcessor {
+
+ /**
+ * A builder used to create a build an instance of a bulk processor.
+ */
+ public static class Builder {
+
+ private final Client client;
+ private final ActionListener<BulkResponse> listener;
+
+ private int concurrentRequests = 1;
+ private int bulkActions = 1000;
+ private ByteSizeValue bulkSize = new ByteSizeValue(5, ByteSizeUnit.MB);
+
+ /**
+ * Creates a builder of bulk processor with the client to use and the listener that will be used
+ * to be notified on the completion of bulk requests.
+ */
+ public Builder(Client client, ActionListener<BulkResponse> listener) {
+ this.client = client;
+ this.listener = listener;
+ }
+
+ /**
+ * Sets the number of concurrent requests allowed to be executed. A value of 0 means that only a single
+ * request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed
+ * while accumulating new bulk requests. Defaults to <tt>1</tt>.
+ */
+ public Builder setConcurrentRequests(int concurrentRequests) {
+ this.concurrentRequests = concurrentRequests;
+ return this;
+ }
+
+ /**
+ * Sets when to flush a new bulk request based on the number of actions currently added. Defaults to
+ * <tt>1000</tt>. Can be set to <tt>-1</tt> to disable it.
+ */
+ public Builder setBulkActions(int bulkActions) {
+ this.bulkActions = bulkActions;
+ return this;
+ }
+
+ /**
+ * Sets when to flush a new bulk request based on the size of actions currently added. Defaults to
+ * <tt>5mb</tt>. Can be set to <tt>-1</tt> to disable it.
+ */
+ public Builder setBulkSize(ByteSizeValue bulkSize) {
+ this.bulkSize = bulkSize;
+ return this;
+ }
+
+ /**
+ * Builds a new bulk processor.
+ */
+ public BulkProcessor build() {
+ return new BulkProcessor(client, listener, concurrentRequests, bulkActions, bulkSize);
+ }
+ }
+
+ public static Builder builder(Client client, ActionListener<BulkResponse> listener) {
+ return new Builder(client, listener);
+ }
+
+ private final Client client;
+ private final ActionListener<BulkResponse> listener;
+
+ private int concurrentRequests;
+ private final int bulkActions;
+ private final int bulkSize;
+
+ private final Semaphore semaphore;
+
+ private BulkRequest bulkRequest;
+
+ BulkProcessor(Client client, ActionListener<BulkResponse> listener, int concurrentRequests, int bulkActions, ByteSizeValue bulkSize) {
+ this.client = client;
+ this.listener = listener;
+ this.concurrentRequests = concurrentRequests;
+ this.bulkActions = bulkActions;
+ this.bulkSize = bulkSize.bytesAsInt();
+
+ this.semaphore = new Semaphore(concurrentRequests);
+ this.bulkRequest = new BulkRequest();
+ }
+
+ /**
+ * Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
+ * (for example, if no id is provided, one will be generated, or usage of the create flag).
+ */
+ public BulkProcessor add(IndexRequest request) {
+ return add((ActionRequest) request);
+ }
+
+ /**
+ * Adds an {@link DeleteRequest} to the list of actions to execute.
+ */
+ public BulkProcessor add(DeleteRequest request) {
+ return add((ActionRequest) request);
+ }
+
+ public BulkProcessor add(ActionRequest request) {
+ internalAdd(request);
+ return this;
+ }
+
+ private synchronized void internalAdd(ActionRequest request) {
+ bulkRequest.add(request);
+ executeIfNeeded();
+ }
+
+ public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
+ bulkRequest.add(data, contentUnsafe, defaultIndex, defaultType);
+ executeIfNeeded();
+ return this;
+ }
+
+ private void executeIfNeeded() {
+ if (!isOverTheLimit()) {
+ return;
+ }
+ if (concurrentRequests == 0) {
+ // execute in a blocking fashion...
+ try {
+ listener.onResponse(client.bulk(bulkRequest).actionGet());
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ } else {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ listener.onFailure(e);
+ return;
+ }
+ client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
+ @Override
+ public void onResponse(BulkResponse response) {
+ try {
+ listener.onResponse(response);
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ try {
+ listener.onFailure(e);
+ } finally {
+ semaphore.release();
+ }
+ }
+ });
+ }
+ bulkRequest = new BulkRequest();
+ }
+
+ private boolean isOverTheLimit() {
+ if (bulkActions != -1 && bulkRequest.numberOfActions() > bulkActions) {
+ return true;
+ }
+ if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() > bulkSize) {
+ return true;
+ }
+ return false;
+ }
+}
@@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;
import com.google.common.collect.Lists;
+import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
@@ -50,6 +51,8 @@
*/
public class BulkRequest implements ActionRequest {
+ private static final int REQUEST_OVERHEAD = 50;
+
final List<ActionRequest> requests = Lists.newArrayList();
private boolean listenerThreaded = false;
@@ -58,6 +61,40 @@
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private boolean refresh = false;
+ private long sizeInBytes = 0;
+
+ /**
+ * Adds a list of requests to be executed. Either index or delete requests.
+ */
+ public BulkRequest add(ActionRequest... requests) {
+ for (ActionRequest request : requests) {
+ if (request instanceof IndexRequest) {
+ add((IndexRequest) request);
+ } else if (request instanceof DeleteRequest) {
+ add((DeleteRequest) request);
+ } else {
+ throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]");
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Adds a list of requests to be executed. Either index or delete requests.
+ */
+ public BulkRequest add(Iterable<ActionRequest> requests) {
+ for (ActionRequest request : requests) {
+ if (request instanceof IndexRequest) {
+ add((IndexRequest) request);
+ } else if (request instanceof DeleteRequest) {
+ add((DeleteRequest) request);
+ } else {
+ throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]");
+ }
+ }
+ return this;
+ }
+
/**
* Adds an {@link IndexRequest} to the list of actions to execute. Follows the same behavior of {@link IndexRequest}
* (for example, if no id is provided, one will be generated, or usage of the create flag).
@@ -67,8 +104,9 @@ public BulkRequest add(IndexRequest request) {
return internalAdd(request);
}
- private BulkRequest internalAdd(IndexRequest request) {
+ BulkRequest internalAdd(IndexRequest request) {
requests.add(request);
+ sizeInBytes += request.source().length() + REQUEST_OVERHEAD;
return this;
}
@@ -77,13 +115,22 @@ private BulkRequest internalAdd(IndexRequest request) {
*/
public BulkRequest add(DeleteRequest request) {
requests.add(request);
+ sizeInBytes += REQUEST_OVERHEAD;
return this;
}
public List<ActionRequest> requests() {
return this.requests;
}
+ public int numberOfActions() {
+ return requests.size();
+ }
+
+ public long estimatedSizeInBytes() {
+ return sizeInBytes;
+ }
+
/**
* Adds a framed data in binary format
*/
@@ -263,10 +310,6 @@ private int findNextMarker(byte marker, int from, BytesReference data, int lengt
return -1;
}
- public int numberOfActions() {
- return requests.size();
- }
-
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@@ -19,6 +19,7 @@
package org.elasticsearch.common.unit;
+import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -50,6 +51,14 @@ public ByteSizeValue(long size, ByteSizeUnit sizeUnit) {
this.sizeUnit = sizeUnit;
}
+ public int bytesAsInt() throws ElasticSearchIllegalArgumentException {
+ long bytes = bytes();
+ if (bytes > Integer.MAX_VALUE) {
+ throw new ElasticSearchIllegalArgumentException("size [" + toString() + "] is bigger than max int");
+ }
+ return (int) bytes;
+ }
+
public long bytes() {
return sizeUnit.toBytes(size);
}

0 comments on commit 8fbf7e6

Please sign in to comment.