Skip to content

Commit

Permalink
HSEARCH-2764 Add a parallel orchestrator for streamed work
Browse files Browse the repository at this point in the history
This could lead to better performance with large Elasticsearch
connection pools when works affect multiple indexes.
  • Loading branch information
yrodiere authored and Sanne committed Aug 3, 2017
1 parent 0625df3 commit d5aa35f
Show file tree
Hide file tree
Showing 6 changed files with 596 additions and 64 deletions.
Expand Up @@ -7,7 +7,6 @@
package org.hibernate.search.elasticsearch.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
Expand All @@ -26,6 +25,7 @@
import org.hibernate.search.elasticsearch.cfg.IndexSchemaManagementStrategy;
import org.hibernate.search.elasticsearch.client.impl.URLEncodedString;
import org.hibernate.search.elasticsearch.logging.impl.Log;
import org.hibernate.search.elasticsearch.processor.impl.BarrierElasticsearchWorkOrchestrator;
import org.hibernate.search.elasticsearch.processor.impl.ElasticsearchWorkProcessor;
import org.hibernate.search.elasticsearch.schema.impl.ElasticsearchSchemaCreator;
import org.hibernate.search.elasticsearch.schema.impl.ElasticsearchSchemaDropper;
Expand Down Expand Up @@ -420,8 +420,10 @@ public void flushAndReleaseResources() {
ElasticsearchWork<?> flushWork = elasticsearchService.getWorkFactory().flush()
.index( actualIndexName )
.build();
workProcessor.awaitAsyncProcessingCompletion();
workProcessor.executeSyncSafe( Collections.singletonList( flushWork ) );
awaitAsyncProcessingCompletion();
workProcessor.getSyncNonStreamOrchestrator()
.submit( flushWork )
.join();
}

@Override
Expand All @@ -443,28 +445,45 @@ public void performOperations(List<LuceneWork> workList, IndexingMonitor monitor
}

if ( sync ) {
workProcessor.executeSyncSafe( elasticsearchWorks );
workProcessor.getSyncNonStreamOrchestrator()
.submit( elasticsearchWorks )
.join();
}
else {
workProcessor.executeAsync( elasticsearchWorks );
workProcessor.getAsyncNonStreamOrchestrator()
.submit( elasticsearchWorks );
}
}

@Override
public void performStreamOperation(LuceneWork singleOperation, IndexingMonitor monitor, boolean forceAsync) {
ElasticsearchWork<?> elasticsearchWork = singleOperation.acceptIndexWorkVisitor( visitor, monitor );
if ( singleOperation instanceof FlushLuceneWork ) {
workProcessor.awaitAsyncProcessingCompletion();
workProcessor.executeSyncSafe( Collections.singleton( elasticsearchWork ) );
awaitAsyncProcessingCompletion();
workProcessor.getSyncNonStreamOrchestrator()
.submit( elasticsearchWork )
.join();
}
else {
workProcessor.executeAsync( elasticsearchWork );
workProcessor.getStreamOrchestrator()
.submit( elasticsearchWork );
}
}

@Override
public void awaitAsyncProcessingCompletion() {
workProcessor.awaitAsyncProcessingCompletion();
awaitCompletion( workProcessor.getAsyncNonStreamOrchestrator() );
awaitCompletion( workProcessor.getStreamOrchestrator() );
}

private void awaitCompletion(BarrierElasticsearchWorkOrchestrator orchestrator) {
try {
orchestrator.awaitCompletion();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw LOG.interruptedWhileWaitingForRequestCompletion( e );
}
}

@Override
Expand Down
@@ -0,0 +1,26 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.elasticsearch.processor.impl;

/**
* An {@link ElasticsearchWorkOrchestrator} providing a synchronization barrier
* through its {@link #awaitCompletion()} method.
*
* @author Yoann Rodiere
*/
public interface BarrierElasticsearchWorkOrchestrator extends ElasticsearchWorkOrchestrator {

/**
* Block until there is no more work to execute.
* <p>
* N.B. if more works are submitted in the meantime, this might delay the wait.
*
* @throws InterruptedException if thread interrupted while waiting
*/
void awaitCompletion() throws InterruptedException;

}
Expand Up @@ -35,7 +35,7 @@
*
* @author Yoann Rodiere
*/
class BatchingSharedElasticsearchWorkOrchestrator implements ElasticsearchWorkOrchestrator, AutoCloseable {
class BatchingSharedElasticsearchWorkOrchestrator implements BarrierElasticsearchWorkOrchestrator, AutoCloseable {

private static final Log LOG = LoggerFactory.make( Log.class );

Expand Down Expand Up @@ -131,6 +131,7 @@ private void ensureProcessingScheduled() {
}
}

@Override
public void awaitCompletion() throws InterruptedException {
int phaseBeforeUnarrivedPartiesCheck = phaser.getPhase();
if ( phaser.getUnarrivedParties() > 0 ) {
Expand Down
Expand Up @@ -6,8 +6,6 @@
*/
package org.hibernate.search.elasticsearch.processor.impl;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -45,8 +43,9 @@ public class ElasticsearchWorkProcessor implements AutoCloseable {
private final ElasticsearchWorkFactory workFactory;

private final ElasticsearchWorkExecutionContext parallelWorkExecutionContext;
private final ElasticsearchWorkOrchestrator syncOrchestrator;
private final BatchingSharedElasticsearchWorkOrchestrator asyncOrchestrator;
private final ElasticsearchWorkOrchestrator syncNonStreamOrchestrator;
private final BatchingSharedElasticsearchWorkOrchestrator asyncNonStreamOrchestrator;
private final BatchingSharedElasticsearchWorkOrchestrator streamOrchestrator;

public ElasticsearchWorkProcessor(BuildContext context,
ElasticsearchClient client, GsonProvider gsonProvider, ElasticsearchWorkFactory workFactory) {
Expand All @@ -57,14 +56,29 @@ public ElasticsearchWorkProcessor(BuildContext context,

this.parallelWorkExecutionContext =
new ParallelWorkExecutionContext( client, gsonProvider );
this.syncOrchestrator = createIsolatedSharedOrchestrator( () -> this.createSerialOrchestrator() );
this.asyncOrchestrator = createBatchingSharedOrchestrator( "Elasticsearch async work orchestrator", createSerialOrchestrator() );
this.syncNonStreamOrchestrator = createIsolatedSharedOrchestrator( () -> this.createSerialOrchestrator() );
this.asyncNonStreamOrchestrator = createBatchingSharedOrchestrator( "Elasticsearch async non-stream work orchestrator", createSerialOrchestrator() );
this.streamOrchestrator = createBatchingSharedOrchestrator( "Elasticsearch async stream work orchestrator", createParallelOrchestrator() );
}

@Override
public void close() {
awaitProcessingCompletion( asyncOrchestrator );
asyncOrchestrator.close();
try {
asyncNonStreamOrchestrator.awaitCompletion();
streamOrchestrator.awaitCompletion();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw LOG.interruptedWhileWaitingForRequestCompletion( e );
}
finally {
try {
asyncNonStreamOrchestrator.close();
}
finally {
streamOrchestrator.close();
}
}
}

/**
Expand All @@ -84,24 +98,6 @@ public <T> T executeSyncUnsafe(ElasticsearchWork<T> work) {
}
}


/**
* Execute a set of works synchronously.
* <p>
* Works submitted in the same list will be executed in the given order.
* <p>
* If any work throws an exception, this exception will be passed
* to the error handler with an {@link ErrorContext} spanning at least the given works,
* and the remaining works will not be executed.
*
* @param works The works to be executed.
*/
public void executeSyncSafe(Iterable<ElasticsearchWork<?>> works) {
syncOrchestrator.submit( works )
// Note: timeout is handled by the client, so this "join" will not last forever
.join();
}

/**
* Execute a single work asynchronously,
* without bulking it with other asynchronous works,
Expand All @@ -115,50 +111,61 @@ public <T> CompletableFuture<T> executeAsyncUnsafe(ElasticsearchWork<T> work) {
}

/**
* Execute a single work asynchronously,
* potentially bulking it with other asynchronous works.
* Return the orchestrator for synchronous, non-stream background works.
* <p>
* Works submitted in the same changeset will be executed in the given order.
* Relative execution order between changesets is undefined.
* <p>
* If the work throws an exception, this exception will be passed
* to the error handler with an {@link ErrorContext} spanning at least this work.
* Works submitted to this orchestrator
* will only be bulked with subsequent works of the same changeset.
* <p>
* If any work throws an exception, this exception will be passed
* to the error handler with an {@link ErrorContext} spanning exactly the given works,
* and the remaining works will not be executed.
*
* @param work The work to be executed.
* @return the orchestrator for synchronous, non-stream background works.
*/
public void executeAsync(ElasticsearchWork<?> work) {
asyncOrchestrator.submit( Collections.singleton( work ) );
public ElasticsearchWorkOrchestrator getSyncNonStreamOrchestrator() {
return syncNonStreamOrchestrator;
}

/**
* Execute a set of works asynchronously,
* potentially bulking it with other asynchronous works.
* Return the orchestrator for asynchronous, non-stream background works.
* <p>
* Works submitted in the same changeset will be executed in the given order.
* Changesets will be executed in the order they are submitted.
* <p>
* Works submitted in the same list will be executed in the given order.
* Works submitted to this orchestrator
* will only be bulked with subsequent works (possibly of a different changeset).
* <p>
* If any work throws an exception, this exception will be passed
* to the error handler with an {@link ErrorContext} spanning at least the given works,
* to the error handler with an {@link ErrorContext} spanning exactly the given works,
* and the remaining works will not be executed.
*
* @param works The works to be executed.
* @return the orchestrator for asynchronous, non-stream background works.
*/
public void executeAsync(List<ElasticsearchWork<?>> works) {
asyncOrchestrator.submit( works );
public BarrierElasticsearchWorkOrchestrator getAsyncNonStreamOrchestrator() {
return asyncNonStreamOrchestrator;
}

/**
* Blocks until the queue of requests scheduled for asynchronous processing has been fully processed.
* N.B. if more work is added to the queue in the meantime, this might delay the wait.
* Return the orchestrator for asynchronous, non-stream background works.
* <p>
* Works submitted in the same changeset will be executed in the given order.
* Relative execution order between changesets is undefined.
* <p>
* Works submitted to this orchestrator
* will only be bulked with subsequent works from the same changeset
* or with works from a different changeset.
* <p>
* If any work throws an exception, this exception will be passed
* to the error handler with an {@link ErrorContext} spanning exactly the given works,
* and the remaining works will not be executed.
*
* @return the orchestrator for stream background works.
*/
public void awaitAsyncProcessingCompletion() {
awaitProcessingCompletion( asyncOrchestrator );
}

private void awaitProcessingCompletion(BatchingSharedElasticsearchWorkOrchestrator orchestrator) {
try {
asyncOrchestrator.awaitCompletion();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw LOG.interruptedWhileWaitingForRequestCompletion( e );
}
public BarrierElasticsearchWorkOrchestrator getStreamOrchestrator() {
return streamOrchestrator;
}

private <T> CompletableFuture<T> start(ElasticsearchWork<T> work, ElasticsearchWorkExecutionContext context) {
Expand All @@ -180,6 +187,12 @@ private FlushableElasticsearchWorkOrchestrator createSerialOrchestrator() {
return new SerialChangesetsElasticsearchWorkOrchestrator( sequenceBuilder, bulker );
}

private FlushableElasticsearchWorkOrchestrator createParallelOrchestrator() {
ElasticsearchWorkSequenceBuilder sequenceBuilder = createSequenceBuilder();
ElasticsearchWorkBulker bulker = createBulker( sequenceBuilder, false );
return new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilder, bulker );
}

private ElasticsearchWorkSequenceBuilder createSequenceBuilder() {
return new DefaultElasticsearchWorkSequenceBuilder(
this::start,
Expand Down

0 comments on commit d5aa35f

Please sign in to comment.