Skip to content

Commit

Permalink
HSEARCH-2764 Use CompletableFuture for orchestration in Elasticsearch…
Browse files Browse the repository at this point in the history
…WorkProcessor

But still execute works sequentially for now.
  • Loading branch information
yrodiere authored and Sanne committed Aug 3, 2017
1 parent ffa77a2 commit 0aa11d9
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -35,6 +36,7 @@
import org.hibernate.search.spi.BuildContext;
import org.hibernate.search.util.impl.CollectionHelper;
import org.hibernate.search.util.impl.Executors;
import org.hibernate.search.util.impl.Futures;
import org.hibernate.search.util.impl.Throwables;
import org.hibernate.search.util.logging.impl.LoggerFactory;

Expand Down Expand Up @@ -89,7 +91,13 @@ public void close() {
* @return The result of the given work.
*/
public <T> T executeSyncUnsafe(ElasticsearchWork<T> work) {
return doExecuteSyncUnsafe( work, parallelWorkExecutionContext );
try {
// Note: timeout is handled by the client, so this "join" will not last forever
return executeAsyncUnsafe( work ).join();
}
catch (CompletionException e) {
throw Throwables.expectRuntimeException( e.getCause() );
}
}


Expand All @@ -112,7 +120,20 @@ public void executeSyncSafe(Iterable<ElasticsearchWork<?>> works) {
}

/**
* Execute a single work asynchronously.
* Execute a single work asynchronously,
* without bulking it with other asynchronous works,
* and potentially throwing exceptions (the error handler isn't used).
*
* @param work The work to be executed.
* @return The result of the given work.
*/
public <T> CompletableFuture<T> executeAsyncUnsafe(ElasticsearchWork<T> work) {
return start( work, parallelWorkExecutionContext );
}

/**
* Execute a single work asynchronously,
* potentially bulking it with other asynchronous works.
* <p>
* If the work throws an exception, this exception will be passed
* to the error handler with an {@link ErrorContext} spanning at least this work.
Expand All @@ -124,7 +145,8 @@ public void executeAsync(ElasticsearchWork<?> work) {
}

/**
* Execute a set of works asynchronously.
* Execute a set of works asynchronously,
* potentially bulking it with other asynchronous works.
* <p>
* Works submitted in the same list will be executed in the given order.
* <p>
Expand All @@ -146,65 +168,93 @@ public void awaitAsyncProcessingCompletion() {
asyncProcessor.awaitCompletion();
}

/**
/*
* Execute a list of works, bulking them as necessary, and passing any exception to the error handler.
* <p>
*
* After an exception, the remaining works in the list are not executed,
* though some may have already been executed if they were bulked with the failing work.
*
* @param nonBulkedWorks The works to be bulked (as much as possible) and executed
* @param refreshInBulkAPICall The parameter to pass to {@link #createRequestGroups(Iterable, boolean)}.
*/
private void doExecuteSyncSafe(SequentialWorkExecutionContext context, Iterable<ElasticsearchWork<?>> nonBulkedWorks,
boolean refreshInBulkAPICall) {
private void doExecuteSyncSafe(SequentialWorkExecutionContext context,
Iterable<ElasticsearchWork<?>> nonBulkedWorks, boolean refreshInBulkAPICall) {
ErrorContextBuilder errorContextBuilder = new ErrorContextBuilder();

CompletableFuture<?> workListFuture = CompletableFuture.completedFuture( null );

for ( ElasticsearchWork<?> work : createRequestGroups( nonBulkedWorks, refreshInBulkAPICall ) ) {
try {
doExecuteSyncUnsafe( work, context );
work.getLuceneWorks().forEach( errorContextBuilder::workCompleted );
}
catch (BulkRequestFailedException brfe) {
brfe.getSuccessfulItems().keySet().stream()
.flatMap( ElasticsearchWork::getLuceneWorks )
.forEach( errorContextBuilder::workCompleted );

handleError(
errorContextBuilder,
brfe,
nonBulkedWorks,
brfe.getErroneousItems().stream()
.flatMap( ElasticsearchWork::getLuceneWorks )
);
break;
}
catch (RuntimeException e) {
handleError(
errorContextBuilder,
e,
nonBulkedWorks,
work.getLuceneWorks()
);
break;
}
workListFuture = workListFuture.thenCompose( ignored ->
start( work, context )
/*
* Note that the handler is applied to the "inner" (to be composed) work,
* so that the handler is only executed if *this* work fails,
* not if a previous one fails.
*/
.handle( Futures.handler(
(result, throwable) -> {
handleWorkCompletion( errorContextBuilder, throwable, nonBulkedWorks, work );
return result;
}
) )
);
}

/*
* Ignore SequenceAbortedExceptions: if we get such an exception,
* it means the cause was correctly reported to the handler.
*/
workListFuture.exceptionally( Futures.handler( e -> {
if ( e instanceof SequenceAbortedException ) {
return null;
}
else {
throw Throwables.expectRuntimeException( e );
}
} ) )
// Note: timeout is handled by the client, so this "join" will not last forever
.join();
}

private <T> T doExecuteSyncUnsafe(ElasticsearchWork<T> work, ElasticsearchWorkExecutionContext context) {
if ( LOG.isTraceEnabled() ) {
LOG.tracef( "Processing %s", work );
}
private <T> CompletableFuture<T> start(ElasticsearchWork<T> work, ElasticsearchWorkExecutionContext context) {
LOG.tracef( "Processing %s", work );
return work.execute( context );
}

// Note: timeout is handled by the client, so this "join" will not last forever
try {
return work.execute( context ).join();
private void handleWorkCompletion(ErrorContextBuilder errorContextBuilder, Throwable throwable,
Iterable<ElasticsearchWork<?>> nonBulkedWorks, ElasticsearchWork<?> workThatFailed) {
if ( throwable instanceof BulkRequestFailedException ) {
BulkRequestFailedException brfe = (BulkRequestFailedException) throwable;
brfe.getSuccessfulItems().keySet().stream()
.flatMap( ElasticsearchWork::getLuceneWorks )
.forEach( errorContextBuilder::workCompleted );

handleError(
errorContextBuilder,
brfe,
nonBulkedWorks,
brfe.getErroneousItems().stream()
.flatMap( ElasticsearchWork::getLuceneWorks )
);
/*
* Note that we re-throw the throwable,
* so that the following works are not executed if this work failed.
*/
throw new SequenceAbortedException( throwable );
}
catch (CompletionException e) {
throw Throwables.expectRuntimeException( e.getCause() );
else if ( throwable != null ) {
handleError(
errorContextBuilder,
throwable,
nonBulkedWorks,
workThatFailed.getLuceneWorks()
);
// Same as above
throw new SequenceAbortedException( throwable );
}
else {
workThatFailed.getLuceneWorks().forEach( errorContextBuilder::workCompleted );
}
}

private void handleError(ErrorContextBuilder errorContextBuilder, Throwable e,
private void handleError(ErrorContextBuilder errorContextBuilder, Throwable throwable,
Iterable<ElasticsearchWork<?>> allWorks, Stream<LuceneWork> worksThatFailed) {
errorContextBuilder.allWorkToBeDone(
StreamSupport.stream( allWorks.spliterator(), false )
Expand All @@ -214,7 +264,7 @@ private void handleError(ErrorContextBuilder errorContextBuilder, Throwable e,

worksThatFailed.forEach( errorContextBuilder::addWorkThatFailed );

errorContextBuilder.errorThatOccurred( e );
errorContextBuilder.errorThatOccurred( throwable );

errorHandler.handle( errorContextBuilder.createErrorContext() );
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.elasticsearch.processor.impl;

import org.hibernate.search.exception.SearchException;


/**
* @author Yoann Rodiere
*/
public class SequenceAbortedException extends SearchException {

public SequenceAbortedException(Throwable cause) {
super( cause );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.elasticsearch.client.impl.ElasticsearchClient;
Expand All @@ -21,6 +22,7 @@
import org.hibernate.search.elasticsearch.work.impl.builder.RefreshWorkBuilder;
import org.hibernate.search.elasticsearch.work.impl.factory.ElasticsearchWorkFactory;
import org.hibernate.search.exception.ErrorHandler;
import org.hibernate.search.util.impl.Futures;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/**
Expand Down Expand Up @@ -86,25 +88,32 @@ public IndexingMonitor getBufferedIndexingMonitor(IndexingMonitor originalMonito
}

public void flush() {
CompletableFuture<?> future = CompletableFuture.completedFuture( null );

// Refresh dirty indexes
if ( !dirtyIndexes.isEmpty() ) {
refreshDirtyIndexes();
dirtyIndexes.clear();
future = future.thenCompose( ignored -> refreshDirtyIndexes() )
.thenRun( () -> dirtyIndexes.clear() );
}

// Flush the indexing monitors
for ( BufferedIndexingMonitor buffer : bufferedIndexMonitors.values() ) {
try {
buffer.flush();
}
catch (RuntimeException e) {
errorHandler.handleException( "Flushing an indexing monitor failed", e );
}
}
bufferedIndexMonitors.clear();
future = future.thenRun( () -> {
for ( BufferedIndexingMonitor buffer : bufferedIndexMonitors.values() ) {
try {
buffer.flush();
}
catch (RuntimeException e) {
errorHandler.handleException( "Flushing an indexing monitor failed", e );
}
}
bufferedIndexMonitors.clear();
} );

// Note: timeout is handled by the client, so this "join" will not last forever
future.join();
}

private void refreshDirtyIndexes() {
private CompletableFuture<?> refreshDirtyIndexes() {
if ( log.isTraceEnabled() ) {
log.tracef( "Refreshing index(es) %s", dirtyIndexes );
}
Expand All @@ -115,12 +124,15 @@ private void refreshDirtyIndexes() {
}
ElasticsearchWork<?> work = builder.build();

try {
workProcessor.executeSyncUnsafe( work );
}
catch (RuntimeException e) {
errorHandler.handleException( "Refresh failed", e );
}
return workProcessor.executeAsyncUnsafe( work )
.handle( Futures.handler(
(result, throwable) -> {
if ( throwable != null ) {
errorHandler.handleException( "Refresh failed", throwable );
}
return null;
}
) );
}

private static final class BufferedIndexingMonitor implements IndexingMonitor {
Expand Down
39 changes: 39 additions & 0 deletions engine/src/main/java/org/hibernate/search/util/impl/Futures.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -55,4 +57,41 @@ public static <T> Function<Throwable, T> handler(Function<Throwable, T> delegate
};
}


/**
* Creates a future handler that will delegate to the given {@link BiFunction}
* after having unwrapped the throwable passed as input if it is a {@link CompletionException}.
* <p>
* This method is meant to be used in conjunction with {@link CompletableFuture#handle(BiFunction)}.
*
* @param delegate The handler to delegate to
* @return The new, delegating handler.
*/
public static <T, R> BiFunction<T, Throwable, R> handler(BiFunction<T, Throwable, R> delegate) {
return (result, throwable) -> {
if ( throwable instanceof CompletionException ) {
throwable = throwable.getCause();
}
return delegate.apply( result, throwable );
};
}

/**
* Creates a future handler that will delegate to the given {@link BiConsumer}
* after having unwrapped the throwable passed as input if it is a {@link CompletionException}.
* <p>
* This method is meant to be used in conjunction with {@link CompletableFuture#whenComplete(BiConsumer)}.
*
* @param delegate The handler to delegate to
* @return The new, delegating handler.
*/
public static <T> BiConsumer<T, Throwable> handler(BiConsumer<T, Throwable> delegate) {
return (result, throwable) -> {
if ( throwable instanceof CompletionException ) {
throwable = throwable.getCause();
}
delegate.accept( result, throwable );
};
}

}

0 comments on commit 0aa11d9

Please sign in to comment.