Skip to content

Commit

Permalink
HSEARCH-3053 Move the Elasticsearch sequence context to a dedicated o…
Browse files Browse the repository at this point in the history
…bject to avoid further mistakes
  • Loading branch information
yrodiere committed Jan 29, 2019
1 parent 9f53162 commit 8881472
Showing 1 changed file with 114 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.hibernate.search.backend.elasticsearch.logging.impl.Log;
import org.hibernate.search.backend.elasticsearch.work.impl.BulkableElasticsearchWork;
import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork;
import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext;
import org.hibernate.search.backend.elasticsearch.work.result.impl.BulkResult;
import org.hibernate.search.backend.elasticsearch.work.result.impl.BulkResultItemExtractor;
import org.hibernate.search.util.impl.common.Futures;
Expand All @@ -33,10 +32,8 @@ class ElasticsearchDefaultWorkSequenceBuilder implements ElasticsearchWorkSequen
private final Supplier<ContextualErrorHandler> errorHandlerSupplier;
private final BulkResultExtractionStepImpl bulkResultExtractionStep = new BulkResultExtractionStepImpl();

private CompletableFuture<Void> refreshFuture;
private CompletableFuture<?> sequenceFuture;
private ElasticsearchRefreshableWorkExecutionContext executionContext;
private ContextualErrorHandler errorHandler;
private CompletableFuture<?> currentlyBuildingSequenceTail;
private SequenceContext currentlyBuildingSequenceContext;

public ElasticsearchDefaultWorkSequenceBuilder(Supplier<ElasticsearchRefreshableWorkExecutionContext> contextSupplier,
Supplier<ContextualErrorHandler> errorHandlerSupplier) {
Expand All @@ -46,11 +43,11 @@ public ElasticsearchDefaultWorkSequenceBuilder(Supplier<ElasticsearchRefreshable

@Override
public void init(CompletableFuture<?> previous) {
CompletableFuture<Void> rootSequenceFuture = previous.handle( (ignoredResult, ignoredThrowable) -> null );
this.refreshFuture = new CompletableFuture<>();
this.sequenceFuture = rootSequenceFuture;
this.executionContext = contextSupplier.get();
this.errorHandler = errorHandlerSupplier.get();
// We only use the previous stage to delay the execution of the sequence, but we ignore its result
this.currentlyBuildingSequenceTail = previous.handle( (ignoredResult, ignoredThrowable) -> null );
this.currentlyBuildingSequenceContext = new SequenceContext(
contextSupplier.get(), errorHandlerSupplier.get()
);
}

/**
Expand All @@ -64,10 +61,8 @@ public void init(CompletableFuture<?> previous) {
*/
@Override
public <T> CompletableFuture<T> addNonBulkExecution(ElasticsearchWork<T> work) {
// Use local variables to make sure the lambdas won't be affected by a reset()
final ElasticsearchRefreshableWorkExecutionContext context = this.executionContext;
final ContextualErrorHandler errorHandler = this.errorHandler;
final CompletableFuture<Void> refreshFuture = this.refreshFuture;
// Use a local variable to make sure lambdas (if any) won't be affected by a reset()
final SequenceContext sequenceContext = this.currentlyBuildingSequenceContext;

/*
* Use a different future for the caller than the one used in the sequence,
Expand All @@ -77,16 +72,17 @@ public <T> CompletableFuture<T> addNonBulkExecution(ElasticsearchWork<T> work) {
CompletableFuture<T> workFutureForCaller = new CompletableFuture<>();

// If the previous work failed, then skip the new work and notify the caller and error handler as necessary.
CompletableFuture<T> handledWorkExecutionFuture = this.sequenceFuture.whenComplete( Futures.handler( (ignoredResult, throwable) -> {
if ( throwable != null ) {
notifySkipping( work, throwable, errorHandler, workFutureForCaller );
}
} ) )
CompletableFuture<T> handledWorkExecutionFuture = currentlyBuildingSequenceTail
.whenComplete( Futures.handler( (ignoredResult, throwable) -> {
if ( throwable != null ) {
sequenceContext.notifyWorkSkipped( work, throwable, workFutureForCaller );
}
} ) )
// If the previous work completed normally, then execute the new work
.thenCompose( Futures.safeComposer(
ignoredPreviousResult -> {
CompletableFuture<T> workExecutionFuture = work.execute( context );
return addPostExecutionHandlers( work, workExecutionFuture, errorHandler, refreshFuture, workFutureForCaller );
CompletableFuture<T> workExecutionFuture = work.execute( sequenceContext.executionContext );
return addPostExecutionHandlers( work, workExecutionFuture, workFutureForCaller, sequenceContext );
}
) );

Expand All @@ -95,7 +91,7 @@ public <T> CompletableFuture<T> addNonBulkExecution(ElasticsearchWork<T> work) {
* after both the work and *all* the handlers are executed,
* because otherwise errorHandler.handle() could be called before all failed/skipped works are reported.
*/
this.sequenceFuture = handledWorkExecutionFuture;
currentlyBuildingSequenceTail = handledWorkExecutionFuture;

return workFutureForCaller;
}
Expand All @@ -110,70 +106,45 @@ public <T> CompletableFuture<T> addNonBulkExecution(ElasticsearchWork<T> work) {
*/
@Override
public CompletableFuture<BulkResult> addBulkExecution(CompletableFuture<? extends ElasticsearchWork<BulkResult>> workFuture) {
// Use local variables to make sure the lambdas won't be affected by a reset()
final ElasticsearchRefreshableWorkExecutionContext context = this.executionContext;
// Use a local variable to make sure lambdas (if any) won't be affected by a reset()
final SequenceContext currentSequenceAttributes = this.currentlyBuildingSequenceContext;

CompletableFuture<BulkResult> bulkWorkResultFuture =
// When the previous work completes successfully *and* the bulk work is available...
sequenceFuture.thenCombine( workFuture, (ignored, work) -> work )
currentlyBuildingSequenceTail.thenCombine( workFuture, (ignored, work) -> work )
// ... execute the bulk work
.thenCompose( work -> work.execute( context ) );
this.sequenceFuture = bulkWorkResultFuture;
.thenCompose( work -> work.execute( currentSequenceAttributes.executionContext ) );
currentlyBuildingSequenceTail = bulkWorkResultFuture;
return bulkWorkResultFuture;
}

@Override
public BulkResultExtractionStep addBulkResultExtraction(CompletableFuture<BulkResult> bulkResultFuture) {
// Use local variables to make sure the lambdas won't be affected by a reset()
final ElasticsearchWorkExecutionContext context = this.executionContext;
// Use a local variable to make sure lambdas (if any) won't be affected by a reset()
final SequenceContext currentSequenceAttributes = this.currentlyBuildingSequenceContext;

CompletableFuture<BulkResultItemExtractor> extractorFuture =
bulkResultFuture.thenApply( bulkResult -> bulkResult.withContext( context ) );
bulkResultFuture.thenApply( bulkResult -> bulkResult.withContext( currentSequenceAttributes.executionContext ) );
bulkResultExtractionStep.init( extractorFuture );
return bulkResultExtractionStep;
}

@Override
public CompletableFuture<Void> build() {
// Use local variables to make sure the lambdas won't be affected by a reset()
final ElasticsearchRefreshableWorkExecutionContext context = this.executionContext;
final ContextualErrorHandler errorHandler = this.errorHandler;
final CompletableFuture<Void> refreshFuture = this.refreshFuture;

CompletableFuture<Void> futureWithRefresh = Futures.whenCompleteExecute(
sequenceFuture,
() -> context.executePendingRefreshes().whenComplete( Futures.copyHandler( refreshFuture ) )
)
.exceptionally( Futures.handler( throwable -> {
if ( !( throwable instanceof PreviousWorkException) ) {
// Something else than a work failed, mention it
errorHandler.addThrowable( throwable );
}
errorHandler.handle();
return null;
} ) );

return futureWithRefresh;
}

private <R> void notifySkipping(ElasticsearchWork<R> work, Throwable throwable,
ContextualErrorHandler errorHandler, CompletableFuture<R> workFutureForCaller) {
Throwable skippingCause =
throwable instanceof PreviousWorkException ? throwable.getCause() : throwable;
workFutureForCaller.completeExceptionally(
log.elasticsearchSkippedBecauseOfPreviousWork( skippingCause )
);
errorHandler.markAsSkipped( work );
}

private <R> void notifyFailure(ElasticsearchWork<R> work, Throwable throwable,
ContextualErrorHandler errorHandler, CompletableFuture<R> workFutureForCaller) {
workFutureForCaller.completeExceptionally( throwable );
errorHandler.markAsFailed( work, throwable );
// Use a local variable to make sure lambdas (if any) won't be affected by a reset()
final SequenceContext sequenceContext = currentlyBuildingSequenceContext;

return Futures.whenCompleteExecute(
currentlyBuildingSequenceTail,
() -> sequenceContext.executionContext.executePendingRefreshes()
.whenComplete( Futures.copyHandler( sequenceContext.refreshFuture ) )
)
.exceptionally( Futures.handler( sequenceContext::notifySequenceFailed ) );
}

private <T> CompletableFuture<T> addPostExecutionHandlers(ElasticsearchWork<T> work,
CompletableFuture<T> workExecutionFuture,
ContextualErrorHandler errorHandler, CompletableFuture<Void> refreshFuture,
CompletableFuture<T> workFutureForCaller) {
<T> CompletableFuture<T> addPostExecutionHandlers(ElasticsearchWork<T> work,
CompletableFuture<T> workExecutionFuture, CompletableFuture<T> workFutureForCaller,
SequenceContext sequenceContext) {
/*
* In case of success, wait for the refresh and propagate the result to the client.
* We ABSOLUTELY DO NOT WANT the resulting future to be included in the sequence,
Expand All @@ -182,7 +153,7 @@ private <T> CompletableFuture<T> addPostExecutionHandlers(ElasticsearchWork<T> w
* which will only happen when the sequence ends,
* which will only happen after A completes...
*/
workExecutionFuture.thenCombine( refreshFuture, (workResult, refreshResult) -> workResult )
workExecutionFuture.thenCombine( sequenceContext.refreshFuture, (workResult, refreshResult) -> workResult )
.whenComplete( Futures.copyHandler( workFutureForCaller ) );
/*
* In case of error, propagate the exception immediately to both the error handler and the client.
Expand All @@ -195,32 +166,23 @@ private <T> CompletableFuture<T> addPostExecutionHandlers(ElasticsearchWork<T> w
* meaning errorHandler.markAsFailed() is guaranteed to be called before errorHandler.handle().
*/
return workExecutionFuture.exceptionally( Futures.handler( throwable -> {
notifyFailure( work, throwable, errorHandler, workFutureForCaller );
sequenceContext.notifyWorkFailed( work, throwable, workFutureForCaller );
throw new PreviousWorkException( throwable );
} ) );
}

private static final class PreviousWorkException extends RuntimeException {

public PreviousWorkException(Throwable cause) {
super( cause );
}

}

private final class BulkResultExtractionStepImpl implements BulkResultExtractionStep {

private CompletableFuture<BulkResultItemExtractor> extractorFuture;

public void init(CompletableFuture<BulkResultItemExtractor> extractorFuture) {
void init(CompletableFuture<BulkResultItemExtractor> extractorFuture) {
this.extractorFuture = extractorFuture;
}

@Override
public <T> CompletableFuture<T> add(BulkableElasticsearchWork<T> bulkedWork, int index) {
// Use local variables to make sure the lambdas won't be affected by a reset()
final ContextualErrorHandler errorHandler = ElasticsearchDefaultWorkSequenceBuilder.this.errorHandler;
final CompletableFuture<Void> refreshFuture = ElasticsearchDefaultWorkSequenceBuilder.this.refreshFuture;
final SequenceContext sequenceContext = ElasticsearchDefaultWorkSequenceBuilder.this.currentlyBuildingSequenceContext;

/*
* Use a different future for the caller than the one used in the sequence,
Expand All @@ -230,35 +192,36 @@ public <T> CompletableFuture<T> add(BulkableElasticsearchWork<T> bulkedWork, int
CompletableFuture<T> workFutureForCaller = new CompletableFuture<>();

// If the bulk work fails, make sure to notify the caller and error handler as necessary.
CompletableFuture<T> handledWorkExecutionFuture = extractorFuture.whenComplete( Futures.handler( (result, throwable) -> {
if ( throwable == null ) {
return;
}
else if ( throwable instanceof PreviousWorkException ) {
// The bulk work itself was skipped; mark the bulked work as skipped too
notifySkipping( bulkedWork, throwable, errorHandler, workFutureForCaller );
}
else {
// The bulk work failed; mark the bulked work as failed too
notifyFailure( bulkedWork, throwable, errorHandler, workFutureForCaller );
}
} ) )
CompletableFuture<T> handledWorkExecutionFuture = extractorFuture
.whenComplete( Futures.handler( (result, throwable) -> {
if ( throwable == null ) {
return;
}
else if ( throwable instanceof PreviousWorkException ) {
// The bulk work itself was skipped; mark the bulked work as skipped too
sequenceContext.notifyWorkSkipped( bulkedWork, throwable, workFutureForCaller );
}
else {
// The bulk work failed; mark the bulked work as failed too
sequenceContext.notifyWorkFailed( bulkedWork, throwable, workFutureForCaller );
}
} ) )
// If the bulk work succeeds, then extract the bulked work result and notify as necessary
.thenCompose( extractor -> {
// Use Futures.create to catch any exception thrown by extractor.extract
CompletableFuture<T> workExecutionFuture = Futures.create(
() -> extractor.extract( bulkedWork, index )
);
return addPostExecutionHandlers( bulkedWork, workExecutionFuture, errorHandler, refreshFuture, workFutureForCaller );
return addPostExecutionHandlers( bulkedWork, workExecutionFuture, workFutureForCaller, sequenceContext );
} );

/*
* Make sure that the sequence will only advance to the next work
* after both the work and *all* the handlers are executed,
* because otherwise errorHandler.handle() could be called before all failed/skipped works are reported.
*/
ElasticsearchDefaultWorkSequenceBuilder.this.sequenceFuture = CompletableFuture.allOf(
ElasticsearchDefaultWorkSequenceBuilder.this.sequenceFuture,
currentlyBuildingSequenceTail = CompletableFuture.allOf(
currentlyBuildingSequenceTail,
handledWorkExecutionFuture
);

Expand All @@ -267,4 +230,57 @@ else if ( throwable instanceof PreviousWorkException ) {

}

private static final class PreviousWorkException extends RuntimeException {

public PreviousWorkException(Throwable cause) {
super( cause );
}

}

/**
* Regroups all objects that may be shared among multiple steps in the same sequence.
* <p>
* This was introduced to make references to data from a previous sequence less likely;
* see
* org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchDefaultWorkSequenceBuilderTest#intertwinedSequenceExecution()
* for an example of what can go wrong if we don't take care to avoid that.
*/
private static final class SequenceContext {
private final ElasticsearchRefreshableWorkExecutionContext executionContext;
private final ContextualErrorHandler errorHandler;
private final CompletableFuture<Void> refreshFuture;

SequenceContext(
ElasticsearchRefreshableWorkExecutionContext executionContext,
ContextualErrorHandler errorHandler) {
this.executionContext = executionContext;
this.errorHandler = errorHandler;
this.refreshFuture = new CompletableFuture<>();
}

<R> void notifyWorkSkipped(ElasticsearchWork<R> work, Throwable throwable,
CompletableFuture<R> workFutureForCaller) {
Throwable skippingCause = throwable instanceof PreviousWorkException ? throwable.getCause() : throwable;
workFutureForCaller.completeExceptionally(
log.elasticsearchSkippedBecauseOfPreviousWork( skippingCause )
);
errorHandler.markAsSkipped( work );
}

<R> void notifyWorkFailed(ElasticsearchWork<R> work, Throwable throwable,
CompletableFuture<R> workFutureForCaller) {
workFutureForCaller.completeExceptionally( throwable );
errorHandler.markAsFailed( work, throwable );
}

<T> T notifySequenceFailed(Throwable throwable) {
if ( !( throwable instanceof PreviousWorkException) ) {
// Something else than a work failed, mention it
errorHandler.addThrowable( throwable );
}
errorHandler.handle();
return null;
}
}
}

0 comments on commit 8881472

Please sign in to comment.