Skip to content

Commit

Permalink
Add flush method for BulkProcessor class
Browse files Browse the repository at this point in the history
There is no explicit method `flush/execute` in `BulkProcessor` class. This can be useful in certain scenarios.
Currently it requires to close and create a new BulkProcessor if one wants an immediate flush.

Closes #5575.
Closes #5570.
(cherry picked from commit dc19e06)
  • Loading branch information
kul authored and dadoonet committed Apr 2, 2014
1 parent ca1f992 commit ddcbfa1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 11 deletions.
25 changes: 21 additions & 4 deletions src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;

import java.io.Closeable;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -43,7 +44,7 @@
* <p/>
* In order to create a new bulk processor, use the {@link Builder}.
*/
public class BulkProcessor {
public class BulkProcessor implements Closeable {

/**
* A listener for the execution.
Expand Down Expand Up @@ -191,6 +192,7 @@ public static Builder builder(Client client, Listener listener) {
}
}

@Override
/**
* Closes the processor. If flushing by time is enabled, then its shutdown. Any remaining bulk actions are flushed.
*/
Expand Down Expand Up @@ -235,7 +237,14 @@ public BulkProcessor add(ActionRequest request, @Nullable Object payload) {
return this;
}

public void ensureOpen() {
if (closed) {
throw new ElasticsearchIllegalStateException("bulk process already closed");
}
}

private synchronized void internalAdd(ActionRequest request, @Nullable Object payload) {
ensureOpen();
bulkRequest.add(request, payload);
executeIfNeeded();
}
Expand All @@ -251,9 +260,7 @@ public synchronized BulkProcessor add(BytesReference data, boolean contentUnsafe
}

private void executeIfNeeded() {
if (closed) {
throw new ElasticsearchIllegalStateException("bulk process already closed");
}
ensureOpen();
if (!isOverTheLimit()) {
return;
}
Expand Down Expand Up @@ -322,6 +329,16 @@ private boolean isOverTheLimit() {
return false;
}

/**
* Flush pending delete or index requests.
*/
public synchronized void flush() {
ensureOpen();
if (bulkRequest.numberOfActions() > 0) {
execute();
}
}

class Flush implements Runnable {

@Override
Expand Down
45 changes: 38 additions & 7 deletions src/test/java/org/elasticsearch/document/BulkTests.java
Expand Up @@ -587,9 +587,8 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};

BulkProcessor processor = null;
try {
processor = BulkProcessor.builder(client(), listener).setBulkActions(5).setConcurrentRequests(1).setName("foo").build();
try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(5)
.setConcurrentRequests(1).setName("foo").build()) {
Map<String, Object> data = Maps.newHashMap();
data.put("foo", "bar");

Expand All @@ -602,10 +601,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
BulkResponse response = responseQueue.poll(5, TimeUnit.SECONDS);
assertThat("Could not get a bulk response in 5 seconds", response, is(notNullValue()));
assertThat(response.getItems().length, is(5));
} finally {
if (processor != null) {
processor.close();
}
}
}

Expand All @@ -626,4 +621,40 @@ public void testThatInvalidIndexNamesShouldNotBreakCompleteBulkRequest() {
assertThat(bulkResponse.getItems()[i].isFailed(), is(expectedFailures[i]));
}
}

@Test
public void testBulkProcessorFlush() throws InterruptedException {
final BlockingQueue<BulkResponse> responseQueue = new SynchronousQueue();

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
responseQueue.add(response);
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};

try (BulkProcessor processor = BulkProcessor.builder(client(), listener).setBulkActions(6)
.setConcurrentRequests(1).setName("foo").build()) {
Map<String, Object> data = Maps.newHashMap();
data.put("foo", "bar");

processor.add(new IndexRequest("test", "test", "1").source(data));
processor.add(new IndexRequest("test", "test", "2").source(data));
processor.add(new IndexRequest("test", "test", "3").source(data));
processor.add(new IndexRequest("test", "test", "4").source(data));
processor.add(new IndexRequest("test", "test", "5").source(data));

processor.flush();

BulkResponse response = responseQueue.poll(1, TimeUnit.SECONDS);
assertThat("Could not get a bulk response even after an explicit flush.", response, is(notNullValue()));
assertThat(response.getItems().length, is(5));
}
}
}

0 comments on commit ddcbfa1

Please sign in to comment.