From d5aa35fc12976e9e5f9e2016929b134463ea9780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yoann=20Rodi=C3=A8re?= Date: Tue, 18 Jul 2017 16:39:36 +0200 Subject: [PATCH] HSEARCH-2764 Add a parallel orchestrator for streamed work This could lead to better performance with large Elasticsearch connection pools when works affect multiple indexes. --- .../impl/ElasticsearchIndexManager.java | 37 +- .../BarrierElasticsearchWorkOrchestrator.java | 26 ++ ...ngSharedElasticsearchWorkOrchestrator.java | 3 +- .../impl/ElasticsearchWorkProcessor.java | 121 +++--- ...angesetsElasticsearchWorkOrchestrator.java | 123 ++++++ ...setsElasticsearchWorkOrchestratorTest.java | 350 ++++++++++++++++++ 6 files changed, 596 insertions(+), 64 deletions(-) create mode 100644 elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BarrierElasticsearchWorkOrchestrator.java create mode 100644 elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestrator.java create mode 100644 elasticsearch/src/test/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestratorTest.java diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexManager.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexManager.java index 535e1cf4e96..d71477dec6b 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexManager.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/impl/ElasticsearchIndexManager.java @@ -7,7 +7,6 @@ package org.hibernate.search.elasticsearch.impl; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Properties; @@ -26,6 +25,7 @@ import org.hibernate.search.elasticsearch.cfg.IndexSchemaManagementStrategy; import org.hibernate.search.elasticsearch.client.impl.URLEncodedString; import org.hibernate.search.elasticsearch.logging.impl.Log; +import org.hibernate.search.elasticsearch.processor.impl.BarrierElasticsearchWorkOrchestrator; import org.hibernate.search.elasticsearch.processor.impl.ElasticsearchWorkProcessor; import org.hibernate.search.elasticsearch.schema.impl.ElasticsearchSchemaCreator; import org.hibernate.search.elasticsearch.schema.impl.ElasticsearchSchemaDropper; @@ -420,8 +420,10 @@ public void flushAndReleaseResources() { ElasticsearchWork flushWork = elasticsearchService.getWorkFactory().flush() .index( actualIndexName ) .build(); - workProcessor.awaitAsyncProcessingCompletion(); - workProcessor.executeSyncSafe( Collections.singletonList( flushWork ) ); + awaitAsyncProcessingCompletion(); + workProcessor.getSyncNonStreamOrchestrator() + .submit( flushWork ) + .join(); } @Override @@ -443,10 +445,13 @@ public void performOperations(List workList, IndexingMonitor monitor } if ( sync ) { - workProcessor.executeSyncSafe( elasticsearchWorks ); + workProcessor.getSyncNonStreamOrchestrator() + .submit( elasticsearchWorks ) + .join(); } else { - workProcessor.executeAsync( elasticsearchWorks ); + workProcessor.getAsyncNonStreamOrchestrator() + .submit( elasticsearchWorks ); } } @@ -454,17 +459,31 @@ public void performOperations(List workList, IndexingMonitor monitor public void performStreamOperation(LuceneWork singleOperation, IndexingMonitor monitor, boolean forceAsync) { ElasticsearchWork elasticsearchWork = singleOperation.acceptIndexWorkVisitor( visitor, monitor ); if ( singleOperation instanceof FlushLuceneWork ) { - workProcessor.awaitAsyncProcessingCompletion(); - workProcessor.executeSyncSafe( Collections.singleton( elasticsearchWork ) ); + awaitAsyncProcessingCompletion(); + workProcessor.getSyncNonStreamOrchestrator() + .submit( elasticsearchWork ) + .join(); } else { - workProcessor.executeAsync( elasticsearchWork ); + workProcessor.getStreamOrchestrator() + .submit( elasticsearchWork ); } } @Override public void awaitAsyncProcessingCompletion() { - workProcessor.awaitAsyncProcessingCompletion(); + awaitCompletion( workProcessor.getAsyncNonStreamOrchestrator() ); + awaitCompletion( workProcessor.getStreamOrchestrator() ); + } + + private void awaitCompletion(BarrierElasticsearchWorkOrchestrator orchestrator) { + try { + orchestrator.awaitCompletion(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw LOG.interruptedWhileWaitingForRequestCompletion( e ); + } } @Override diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BarrierElasticsearchWorkOrchestrator.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BarrierElasticsearchWorkOrchestrator.java new file mode 100644 index 00000000000..2d391f30b17 --- /dev/null +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BarrierElasticsearchWorkOrchestrator.java @@ -0,0 +1,26 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.elasticsearch.processor.impl; + +/** + * An {@link ElasticsearchWorkOrchestrator} providing a synchronization barrier + * through its {@link #awaitCompletion()} method. + * + * @author Yoann Rodiere + */ +public interface BarrierElasticsearchWorkOrchestrator extends ElasticsearchWorkOrchestrator { + + /** + * Block until there is no more work to execute. + *

+ * N.B. if more works are submitted in the meantime, this might delay the wait. + * + * @throws InterruptedException if thread interrupted while waiting + */ + void awaitCompletion() throws InterruptedException; + +} diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BatchingSharedElasticsearchWorkOrchestrator.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BatchingSharedElasticsearchWorkOrchestrator.java index e1391cfff85..c14488928fb 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BatchingSharedElasticsearchWorkOrchestrator.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/BatchingSharedElasticsearchWorkOrchestrator.java @@ -35,7 +35,7 @@ * * @author Yoann Rodiere */ -class BatchingSharedElasticsearchWorkOrchestrator implements ElasticsearchWorkOrchestrator, AutoCloseable { +class BatchingSharedElasticsearchWorkOrchestrator implements BarrierElasticsearchWorkOrchestrator, AutoCloseable { private static final Log LOG = LoggerFactory.make( Log.class ); @@ -131,6 +131,7 @@ private void ensureProcessingScheduled() { } } + @Override public void awaitCompletion() throws InterruptedException { int phaseBeforeUnarrivedPartiesCheck = phaser.getPhase(); if ( phaser.getUnarrivedParties() > 0 ) { diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java index ca1377b0753..0599ae82069 100644 --- a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ElasticsearchWorkProcessor.java @@ -6,8 +6,6 @@ */ package org.hibernate.search.elasticsearch.processor.impl; -import java.util.Collections; -import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.function.Supplier; @@ -45,8 +43,9 @@ public class ElasticsearchWorkProcessor implements AutoCloseable { private final ElasticsearchWorkFactory workFactory; private final ElasticsearchWorkExecutionContext parallelWorkExecutionContext; - private final ElasticsearchWorkOrchestrator syncOrchestrator; - private final BatchingSharedElasticsearchWorkOrchestrator asyncOrchestrator; + private final ElasticsearchWorkOrchestrator syncNonStreamOrchestrator; + private final BatchingSharedElasticsearchWorkOrchestrator asyncNonStreamOrchestrator; + private final BatchingSharedElasticsearchWorkOrchestrator streamOrchestrator; public ElasticsearchWorkProcessor(BuildContext context, ElasticsearchClient client, GsonProvider gsonProvider, ElasticsearchWorkFactory workFactory) { @@ -57,14 +56,29 @@ public ElasticsearchWorkProcessor(BuildContext context, this.parallelWorkExecutionContext = new ParallelWorkExecutionContext( client, gsonProvider ); - this.syncOrchestrator = createIsolatedSharedOrchestrator( () -> this.createSerialOrchestrator() ); - this.asyncOrchestrator = createBatchingSharedOrchestrator( "Elasticsearch async work orchestrator", createSerialOrchestrator() ); + this.syncNonStreamOrchestrator = createIsolatedSharedOrchestrator( () -> this.createSerialOrchestrator() ); + this.asyncNonStreamOrchestrator = createBatchingSharedOrchestrator( "Elasticsearch async non-stream work orchestrator", createSerialOrchestrator() ); + this.streamOrchestrator = createBatchingSharedOrchestrator( "Elasticsearch async stream work orchestrator", createParallelOrchestrator() ); } @Override public void close() { - awaitProcessingCompletion( asyncOrchestrator ); - asyncOrchestrator.close(); + try { + asyncNonStreamOrchestrator.awaitCompletion(); + streamOrchestrator.awaitCompletion(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw LOG.interruptedWhileWaitingForRequestCompletion( e ); + } + finally { + try { + asyncNonStreamOrchestrator.close(); + } + finally { + streamOrchestrator.close(); + } + } } /** @@ -84,24 +98,6 @@ public T executeSyncUnsafe(ElasticsearchWork work) { } } - - /** - * Execute a set of works synchronously. - *

- * Works submitted in the same list will be executed in the given order. - *

- * If any work throws an exception, this exception will be passed - * to the error handler with an {@link ErrorContext} spanning at least the given works, - * and the remaining works will not be executed. - * - * @param works The works to be executed. - */ - public void executeSyncSafe(Iterable> works) { - syncOrchestrator.submit( works ) - // Note: timeout is handled by the client, so this "join" will not last forever - .join(); - } - /** * Execute a single work asynchronously, * without bulking it with other asynchronous works, @@ -115,50 +111,61 @@ public CompletableFuture executeAsyncUnsafe(ElasticsearchWork work) { } /** - * Execute a single work asynchronously, - * potentially bulking it with other asynchronous works. + * Return the orchestrator for synchronous, non-stream background works. + *

+ * Works submitted in the same changeset will be executed in the given order. + * Relative execution order between changesets is undefined. *

- * If the work throws an exception, this exception will be passed - * to the error handler with an {@link ErrorContext} spanning at least this work. + * Works submitted to this orchestrator + * will only be bulked with subsequent works of the same changeset. + *

+ * If any work throws an exception, this exception will be passed + * to the error handler with an {@link ErrorContext} spanning exactly the given works, + * and the remaining works will not be executed. * - * @param work The work to be executed. + * @return the orchestrator for synchronous, non-stream background works. */ - public void executeAsync(ElasticsearchWork work) { - asyncOrchestrator.submit( Collections.singleton( work ) ); + public ElasticsearchWorkOrchestrator getSyncNonStreamOrchestrator() { + return syncNonStreamOrchestrator; } /** - * Execute a set of works asynchronously, - * potentially bulking it with other asynchronous works. + * Return the orchestrator for asynchronous, non-stream background works. + *

+ * Works submitted in the same changeset will be executed in the given order. + * Changesets will be executed in the order they are submitted. *

- * Works submitted in the same list will be executed in the given order. + * Works submitted to this orchestrator + * will only be bulked with subsequent works (possibly of a different changeset). *

* If any work throws an exception, this exception will be passed - * to the error handler with an {@link ErrorContext} spanning at least the given works, + * to the error handler with an {@link ErrorContext} spanning exactly the given works, * and the remaining works will not be executed. * - * @param works The works to be executed. + * @return the orchestrator for asynchronous, non-stream background works. */ - public void executeAsync(List> works) { - asyncOrchestrator.submit( works ); + public BarrierElasticsearchWorkOrchestrator getAsyncNonStreamOrchestrator() { + return asyncNonStreamOrchestrator; } /** - * Blocks until the queue of requests scheduled for asynchronous processing has been fully processed. - * N.B. if more work is added to the queue in the meantime, this might delay the wait. + * Return the orchestrator for asynchronous, non-stream background works. + *

+ * Works submitted in the same changeset will be executed in the given order. + * Relative execution order between changesets is undefined. + *

+ * Works submitted to this orchestrator + * will only be bulked with subsequent works from the same changeset + * or with works from a different changeset. + *

+ * If any work throws an exception, this exception will be passed + * to the error handler with an {@link ErrorContext} spanning exactly the given works, + * and the remaining works will not be executed. + * + * @return the orchestrator for stream background works. */ - public void awaitAsyncProcessingCompletion() { - awaitProcessingCompletion( asyncOrchestrator ); - } - - private void awaitProcessingCompletion(BatchingSharedElasticsearchWorkOrchestrator orchestrator) { - try { - asyncOrchestrator.awaitCompletion(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw LOG.interruptedWhileWaitingForRequestCompletion( e ); - } + public BarrierElasticsearchWorkOrchestrator getStreamOrchestrator() { + return streamOrchestrator; } private CompletableFuture start(ElasticsearchWork work, ElasticsearchWorkExecutionContext context) { @@ -180,6 +187,12 @@ private FlushableElasticsearchWorkOrchestrator createSerialOrchestrator() { return new SerialChangesetsElasticsearchWorkOrchestrator( sequenceBuilder, bulker ); } + private FlushableElasticsearchWorkOrchestrator createParallelOrchestrator() { + ElasticsearchWorkSequenceBuilder sequenceBuilder = createSequenceBuilder(); + ElasticsearchWorkBulker bulker = createBulker( sequenceBuilder, false ); + return new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilder, bulker ); + } + private ElasticsearchWorkSequenceBuilder createSequenceBuilder() { return new DefaultElasticsearchWorkSequenceBuilder( this::start, diff --git a/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestrator.java b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestrator.java new file mode 100644 index 00000000000..e7a7a00dcc2 --- /dev/null +++ b/elasticsearch/src/main/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestrator.java @@ -0,0 +1,123 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.elasticsearch.processor.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.hibernate.search.elasticsearch.work.impl.BulkableElasticsearchWork; +import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWork; +import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkAggregator; + + +/** + * Aggregates works from changesets into multiple sequences. + *

+ * Two works will be executed sequentially if they are part of the same changeset. + * Two works from different changesets will be executed in parallel. + *

+ * This class is mutable and not thread-safe. + * + * @author Yoann Rodiere + */ +class ParallelChangesetsElasticsearchWorkOrchestrator implements FlushableElasticsearchWorkOrchestrator { + + private final BulkAndSequenceAggregator aggregator; + private final List> sequenceFutures = new ArrayList<>(); + + public ParallelChangesetsElasticsearchWorkOrchestrator(ElasticsearchWorkSequenceBuilder sequenceBuilder, + ElasticsearchWorkBulker bulker) { + this.aggregator = new BulkAndSequenceAggregator( sequenceBuilder, bulker ); + } + + @Override + public CompletableFuture submit(Iterable> nonBulkedWorks) { + aggregator.initSequence(); + for ( ElasticsearchWork work : nonBulkedWorks ) { + work.aggregate( aggregator ); + } + CompletableFuture future = aggregator.flushSequence(); + sequenceFutures.add( future ); + return future; + } + + @Override + public CompletableFuture flush() { + CompletableFuture future = + CompletableFuture.allOf( sequenceFutures.toArray( new CompletableFuture[ sequenceFutures.size()] ) ); + sequenceFutures.clear(); + aggregator.startSequences(); + return future; + } + + @Override + public void reset() { + aggregator.reset(); + sequenceFutures.clear(); + } + + private static class BulkAndSequenceAggregator implements ElasticsearchWorkAggregator { + + private final ElasticsearchWorkSequenceBuilder sequenceBuilder; + private final ElasticsearchWorkBulker bulker; + + private CompletableFuture rootFuture; + private boolean currentBulkIsUsableInSameSequence = true; + + public BulkAndSequenceAggregator(ElasticsearchWorkSequenceBuilder sequenceBuilder, + ElasticsearchWorkBulker bulker) { + super(); + this.rootFuture = CompletableFuture.completedFuture( null ); + this.sequenceBuilder = sequenceBuilder; + this.bulker = bulker; + } + + public void initSequence() { + sequenceBuilder.init( rootFuture ); + } + + @Override + public void addBulkable(BulkableElasticsearchWork work) { + if ( !currentBulkIsUsableInSameSequence ) { + bulker.flushBulk(); + currentBulkIsUsableInSameSequence = true; + } + bulker.add( work ); + } + + @Override + public void addNonBulkable(ElasticsearchWork work) { + if ( bulker.flushBulked() ) { + /* + * A non-bulkable work follows bulked works, + * so we won't be able to re-use the same bulk in the same sequence + * (otherwise the relative order of works would be altered). + */ + currentBulkIsUsableInSameSequence = false; + } + sequenceBuilder.addNonBulkExecution( work ); + } + + public CompletableFuture flushSequence() { + bulker.flushBulked(); + CompletableFuture future = sequenceBuilder.build(); + currentBulkIsUsableInSameSequence = true; + return future; + } + + public void startSequences() { + bulker.flushBulk(); + } + + public void reset() { + bulker.reset(); + rootFuture = CompletableFuture.completedFuture( null ); + sequenceBuilder.init( rootFuture ); + } + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestratorTest.java b/elasticsearch/src/test/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestratorTest.java new file mode 100644 index 00000000000..396ed735189 --- /dev/null +++ b/elasticsearch/src/test/java/org/hibernate/search/elasticsearch/processor/impl/ParallelChangesetsElasticsearchWorkOrchestratorTest.java @@ -0,0 +1,350 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.elasticsearch.processor.impl; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.hibernate.search.test.util.FutureAssert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.hibernate.search.elasticsearch.work.impl.BulkableElasticsearchWork; +import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWork; +import org.hibernate.search.elasticsearch.work.impl.ElasticsearchWorkAggregator; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Yoann Rodiere + */ +public class ParallelChangesetsElasticsearchWorkOrchestratorTest { + + private ElasticsearchWorkSequenceBuilder sequenceBuilderMock; + private ElasticsearchWorkBulker bulkerMock; + + private final List mocks = new ArrayList<>(); + + @Before + public void initMocks() { + sequenceBuilderMock = EasyMock.createStrictMock( ElasticsearchWorkSequenceBuilder.class ); + mocks.add( sequenceBuilderMock ); + bulkerMock = EasyMock.createStrictMock( ElasticsearchWorkBulker.class ); + mocks.add( bulkerMock ); + } + + @Test + public void simple() { + ElasticsearchWork work1 = work( 1 ); + BulkableElasticsearchWork work2 = bulkableWork( 2 ); + List> changeset1 = Arrays.asList( work1, work2 ); + + CompletableFuture sequenceFuture = new CompletableFuture<>(); + + replay(); + ParallelChangesetsElasticsearchWorkOrchestrator orchestrator = + new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilderMock, bulkerMock ); + verify(); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work1.aggregate( anyObject() ); + expectLastCall().andAnswer( nonBulkableAggregateAnswer( work1 ) ); + expect( bulkerMock.flushBulked() ).andReturn( false ); + sequenceBuilderMock.addNonBulkExecution( work1 ); + work2.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work2 ) ); + bulkerMock.add( work2 ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + expect( sequenceBuilderMock.build() ).andReturn( sequenceFuture ); + replay(); + CompletableFuture returnedSequenceFuture = orchestrator.submit( changeset1 ); + verify(); + assertThat( returnedSequenceFuture ).isSameAs( sequenceFuture ); + + reset(); + bulkerMock.flushBulk(); + replay(); + CompletableFuture futureAll = orchestrator.flush(); + verify(); + assertThat( futureAll ).isPending(); + sequenceFuture.complete( null ); + assertThat( futureAll ).isSuccessful( (Void) null ); + } + + @Test + public void parallelSequenceBetweenChangeset() { + ElasticsearchWork work1 = work( 1 ); + List> changeset1 = Arrays.asList( work1 ); + + BulkableElasticsearchWork work2 = bulkableWork( 2 ); + List> changeset2 = Arrays.asList( work2 ); + + CompletableFuture sequence1Future = new CompletableFuture<>(); + CompletableFuture sequence2Future = new CompletableFuture<>(); + + replay(); + ParallelChangesetsElasticsearchWorkOrchestrator orchestrator = + new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilderMock, bulkerMock ); + verify(); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work1.aggregate( anyObject() ); + expectLastCall().andAnswer( nonBulkableAggregateAnswer( work1 ) ); + expect( bulkerMock.flushBulked() ).andReturn( false ); + sequenceBuilderMock.addNonBulkExecution( work1 ); + expect( bulkerMock.flushBulked() ).andReturn( false ); + expect( sequenceBuilderMock.build() ).andReturn( sequence1Future ); + replay(); + CompletableFuture returnedSequence1Future = orchestrator.submit( changeset1 ); + verify(); + assertThat( returnedSequence1Future ).isSameAs( sequence1Future ); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work2.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work2 ) ); + bulkerMock.add( work2 ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + expect( sequenceBuilderMock.build() ).andReturn( sequence2Future ); + replay(); + CompletableFuture returnedSequence2Future = orchestrator.submit( changeset2 ); + verify(); + assertThat( returnedSequence2Future ).isSameAs( sequence2Future ); + + reset(); + bulkerMock.flushBulk(); + replay(); + CompletableFuture futureAll = orchestrator.flush(); + verify(); + assertThat( futureAll ).isPending(); + sequence2Future.complete( null ); + assertThat( futureAll ).isPending(); + sequence1Future.complete( null ); + assertThat( futureAll ).isSuccessful( (Void) null ); + } + + @Test + public void reuseBulkAccrossSequences() { + BulkableElasticsearchWork work1 = bulkableWork( 1 ); + List> changeset1 = Arrays.asList( work1 ); + + BulkableElasticsearchWork work2 = bulkableWork( 2 ); + List> changeset2 = Arrays.asList( work2 ); + + CompletableFuture sequence1Future = new CompletableFuture<>(); + CompletableFuture sequence2Future = new CompletableFuture<>(); + + replay(); + ParallelChangesetsElasticsearchWorkOrchestrator orchestrator = + new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilderMock, bulkerMock ); + verify(); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work1.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work1 ) ); + bulkerMock.add( work1 ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + expect( sequenceBuilderMock.build() ).andReturn( sequence1Future ); + replay(); + CompletableFuture returnedSequence1Future = orchestrator.submit( changeset1 ); + verify(); + assertThat( returnedSequence1Future ).isSameAs( sequence1Future ); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work2.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work2 ) ); + bulkerMock.add( work2 ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + expect( sequenceBuilderMock.build() ).andReturn( sequence2Future ); + replay(); + CompletableFuture returnedSequence2Future = orchestrator.submit( changeset2 ); + verify(); + assertThat( returnedSequence2Future ).isSameAs( sequence2Future ); + + reset(); + bulkerMock.flushBulk(); + replay(); + CompletableFuture futureAll = orchestrator.flush(); + verify(); + assertThat( futureAll ).isPending(); + + reset(); + replay(); + sequence2Future.complete( null ); + verify(); + assertThat( futureAll ).isPending(); + + reset(); + replay(); + sequence1Future.complete( null ); + verify(); + assertThat( futureAll ).isSuccessful( (Void) null ); + } + + @Test + public void newBulkIfNonBulkable_sameChangeset() { + BulkableElasticsearchWork work1 = bulkableWork( 1 ); + ElasticsearchWork work2 = work( 2 ); + BulkableElasticsearchWork work3 = bulkableWork( 3 ); + List> changeset1 = Arrays.asList( work1, work2, work3 ); + + CompletableFuture sequence1Future = new CompletableFuture<>(); + + replay(); + ParallelChangesetsElasticsearchWorkOrchestrator orchestrator = + new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilderMock, bulkerMock ); + verify(); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work1.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work1 ) ); + bulkerMock.add( work1 ); + work2.aggregate( anyObject() ); + expectLastCall().andAnswer( nonBulkableAggregateAnswer( work2 ) ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + sequenceBuilderMock.addNonBulkExecution( work2 ); + work3.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work3 ) ); + bulkerMock.flushBulk(); + bulkerMock.add( work3 ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + expect( sequenceBuilderMock.build() ).andReturn( sequence1Future ); + replay(); + CompletableFuture returnedSequence1Future = orchestrator.submit( changeset1 ); + verify(); + assertThat( returnedSequence1Future ).isSameAs( sequence1Future ); + + reset(); + bulkerMock.flushBulk(); + replay(); + CompletableFuture futureAll = orchestrator.flush(); + verify(); + assertThat( futureAll ).isPending(); + + reset(); + replay(); + sequence1Future.complete( null ); + verify(); + assertThat( futureAll ).isSuccessful( (Void) null ); + } + + @Test + public void newBulkIfNonBulkable_differenceChangesets() { + BulkableElasticsearchWork work1 = bulkableWork( 1 ); + List> changeset1 = Arrays.asList( work1 ); + ElasticsearchWork work2 = work( 2 ); + BulkableElasticsearchWork work3 = bulkableWork( 3 ); + List> changeset2 = Arrays.asList( work2, work3 ); + + CompletableFuture sequence1Future = new CompletableFuture<>(); + CompletableFuture sequence2Future = new CompletableFuture<>(); + + replay(); + ParallelChangesetsElasticsearchWorkOrchestrator orchestrator = + new ParallelChangesetsElasticsearchWorkOrchestrator( sequenceBuilderMock, bulkerMock ); + verify(); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work1.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work1 ) ); + bulkerMock.add( work1 ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + expect( sequenceBuilderMock.build() ).andReturn( sequence1Future ); + replay(); + CompletableFuture returnedSequence1Future = orchestrator.submit( changeset1 ); + verify(); + assertThat( returnedSequence1Future ).isSameAs( sequence1Future ); + + reset(); + sequenceBuilderMock.init( anyObject() ); + work2.aggregate( anyObject() ); + expectLastCall().andAnswer( nonBulkableAggregateAnswer( work2 ) ); + expect( bulkerMock.flushBulked() ).andReturn( true ); + sequenceBuilderMock.addNonBulkExecution( work2 ); + work3.aggregate( anyObject() ); + expectLastCall().andAnswer( bulkableAggregateAnswer( work3 ) ); + bulkerMock.flushBulk(); + bulkerMock.add( work3 ); + expect( bulkerMock.flushBulked() ).andReturn( false ); + expect( sequenceBuilderMock.build() ).andReturn( sequence2Future ); + replay(); + CompletableFuture returnedSequence2Future = orchestrator.submit( changeset2 ); + verify(); + assertThat( returnedSequence2Future ).isSameAs( sequence2Future ); + + reset(); + bulkerMock.flushBulk(); + replay(); + CompletableFuture futureAll = orchestrator.flush(); + verify(); + assertThat( futureAll ).isPending(); + + reset(); + replay(); + sequence2Future.complete( null ); + verify(); + assertThat( futureAll ).isPending(); + + reset(); + replay(); + sequence1Future.complete( null ); + verify(); + assertThat( futureAll ).isSuccessful( (Void) null ); + } + + private void reset() { + EasyMock.reset( mocks.toArray() ); + } + + private void replay() { + EasyMock.replay( mocks.toArray() ); + } + + private void verify() { + EasyMock.verify( mocks.toArray() ); + } + + private ElasticsearchWork work(int index) { + ElasticsearchWork mock = EasyMock.createStrictMock( "work" + index, ElasticsearchWork.class ); + mocks.add( mock ); + return mock; + } + + private BulkableElasticsearchWork bulkableWork(int index) { + BulkableElasticsearchWork mock = EasyMock.createStrictMock( "bulkableWork" + index, BulkableElasticsearchWork.class ); + mocks.add( mock ); + return mock; + } + + private IAnswer nonBulkableAggregateAnswer(ElasticsearchWork mock) { + return () -> { + ElasticsearchWorkAggregator aggregator = (ElasticsearchWorkAggregator) getCurrentArguments()[0]; + aggregator.addNonBulkable( mock ); + return null; + }; + } + + private IAnswer bulkableAggregateAnswer(BulkableElasticsearchWork mock) { + return () -> { + ElasticsearchWorkAggregator aggregator = (ElasticsearchWorkAggregator) getCurrentArguments()[0]; + aggregator.addBulkable( mock ); + return null; + }; + } +}