Skip to content

Commit

Permalink
HSEARCH-2764 Remove the delay in BatchingSharedElasticsearchWorkOrche…
Browse files Browse the repository at this point in the history
…strator

There's no need for such a delay:

 * if works are submitted more slowly than they are processed, then
there's no need to try doing more bulking (especially if it means adding
an artificial delay)
 * if works are submitted faster than they are processed, then the queue
should progressively fill up, we'll start doing bulking, and we'll end
up ignoring the delay anyway.
  • Loading branch information
yrodiere authored and Sanne committed Aug 3, 2017
1 parent 8ae9cf7 commit a15d34a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 22 deletions.
Expand Up @@ -10,9 +10,8 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.hibernate.search.elasticsearch.logging.impl.Log;
Expand Down Expand Up @@ -42,12 +41,11 @@ class BatchingSharedElasticsearchWorkOrchestrator implements BarrierElasticsearc

private static final Log LOG = LoggerFactory.make( Log.class );

private final int delayMs;
private final FlushableElasticsearchWorkOrchestrator delegate;
private final ErrorHandler errorHandler;
private final int changesetsPerBatch;

private final ScheduledExecutorService scheduler;
private final ExecutorService executor;
private final BlockingQueue<Changeset> changesetQueue;
private final List<Changeset> changesetBuffer;
private final AtomicBoolean processingScheduled;
Expand All @@ -62,8 +60,6 @@ protected boolean onAdvance(int phase, int registeredParties) {

/**
* @param name The name of the orchestrator thread
* @param delayMs A delay before creating a batch when a work is submitted.
* Higher values mean bigger batch sizes, but higher latency.
* @param maxChangesetsPerBatch The maximum number of changesets to
* process in a single batch. Higher values mean lesser chance of transport
* thread starvation, but higher heap consumption.
Expand All @@ -74,16 +70,15 @@ protected boolean onAdvance(int phase, int registeredParties) {
* @param errorHandler An error handler to send orchestration errors to.
*/
public BatchingSharedElasticsearchWorkOrchestrator(
String name, int delayMs, int maxChangesetsPerBatch, boolean fair,
String name, int maxChangesetsPerBatch, boolean fair,
FlushableElasticsearchWorkOrchestrator delegate,
ErrorHandler errorHandler) {
this.delayMs = delayMs;
this.delegate = delegate;
this.errorHandler = errorHandler;
this.changesetsPerBatch = maxChangesetsPerBatch;
changesetQueue = new ArrayBlockingQueue<>( maxChangesetsPerBatch, fair );
changesetBuffer = CollectionHelper.newArrayList( maxChangesetsPerBatch );
scheduler = Executors.newScheduledThreadPool( name );
executor = Executors.newFixedThreadPool( 1, name );
processingScheduled = new AtomicBoolean( false );
}

Expand Down Expand Up @@ -119,11 +114,11 @@ private void ensureProcessingScheduled() {
try {
if ( processingScheduled.compareAndSet( false, true ) ) {
try {
scheduler.schedule( this::processBatch, delayMs, TimeUnit.MILLISECONDS );
executor.submit( this::processBatch );
}
catch (Throwable e) {
/*
* Make sure a failure to schedule the processing
* Make sure a failure to submit the processing task
* doesn't leave other threads waiting indefinitely
*/
try {
Expand All @@ -137,7 +132,7 @@ private void ensureProcessingScheduled() {
}
else {
/*
* Corner case: another thread scheduled processing
* Corner case: another thread submitted a processing task
* just after we registered the phaser.
* Cancel our own registration.
*/
Expand All @@ -146,7 +141,7 @@ private void ensureProcessingScheduled() {
}
catch (Throwable e) {
/*
* Make sure a failure to schedule the processing
* Make sure a failure to submit the processing task
* doesn't leave other threads waiting indefinitely
*/
try {
Expand All @@ -172,7 +167,7 @@ public void awaitCompletion() throws InterruptedException {
public void close() {
try ( Closer<RuntimeException> closer = new Closer<>() ) {
closer.push( () -> {
scheduler.shutdown();
executor.shutdown();
try {
awaitCompletion();
}
Expand Down
Expand Up @@ -49,9 +49,6 @@ public class ElasticsearchWorkProcessor implements AutoCloseable {
private static final int STREAM_MIN_BULK_SIZE = 1;
private static final int MAX_BULK_SIZE = 250;

private static final int SYNC_ORCHESTRATION_DELAY_MS = 0;
private static final int ASYNC_ORCHESTRATION_DELAY_MS = 100;

/*
* Setting the following constants involves a bit of guesswork.
* Basically we want the number to be large enough for the orchestrator
Expand Down Expand Up @@ -97,7 +94,6 @@ public ElasticsearchWorkProcessor(BuildContext context,
*/
this.streamOrchestrator = createBatchingSharedOrchestrator(
"Elasticsearch async stream work orchestrator",
ASYNC_ORCHESTRATION_DELAY_MS,
STREAM_MAX_CHANGESETS_PER_BATCH,
false, // Do not care about ordering when queuing changesets
createParallelOrchestrator( this::createIndexMonitorBufferingWorkExecutionContext, STREAM_MIN_BULK_SIZE, false ) );
Expand Down Expand Up @@ -192,7 +188,6 @@ public BarrierElasticsearchWorkOrchestrator createNonStreamOrchestrator(String i
if ( sync ) {
return createBatchingSharedOrchestrator(
"Elasticsearch sync non-stream work orchestrator for index " + indexName,
SYNC_ORCHESTRATION_DELAY_MS,
NON_STREAM_MAX_CHANGESETS_PER_BATCH,
true /* enqueue changesets in the order they were submitted */,
delegate
Expand All @@ -201,7 +196,6 @@ public BarrierElasticsearchWorkOrchestrator createNonStreamOrchestrator(String i
else {
return createBatchingSharedOrchestrator(
"Elasticsearch async non-stream work orchestrator for index " + indexName,
ASYNC_ORCHESTRATION_DELAY_MS,
NON_STREAM_MAX_CHANGESETS_PER_BATCH,
true /* enqueue changesets in the order they were submitted */,
delegate
Expand Down Expand Up @@ -235,9 +229,9 @@ private <T> CompletableFuture<T> start(ElasticsearchWork<T> work, ElasticsearchW
}

private BatchingSharedElasticsearchWorkOrchestrator createBatchingSharedOrchestrator(
String name, int delayMs, int maxChangesetsPerBatch, boolean fair,
String name, int maxChangesetsPerBatch, boolean fair,
FlushableElasticsearchWorkOrchestrator delegate) {
return new BatchingSharedElasticsearchWorkOrchestrator( name, delayMs, maxChangesetsPerBatch, fair,
return new BatchingSharedElasticsearchWorkOrchestrator( name, maxChangesetsPerBatch, fair,
delegate, errorHandler );
}

Expand Down

0 comments on commit a15d34a

Please sign in to comment.