diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/impl/ElasticsearchBackendImpl.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/impl/ElasticsearchBackendImpl.java index 7901be54355..6194c32ecd6 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/impl/ElasticsearchBackendImpl.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/impl/ElasticsearchBackendImpl.java @@ -23,8 +23,8 @@ import org.hibernate.search.backend.elasticsearch.logging.impl.Log; import org.hibernate.search.backend.elasticsearch.mapping.impl.TypeNameMapping; import org.hibernate.search.backend.elasticsearch.multitenancy.impl.MultiTenancyStrategy; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSimpleWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorImplementor; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorProvider; import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads; import org.hibernate.search.backend.elasticsearch.types.dsl.provider.impl.ElasticsearchIndexFieldTypeFactoryProvider; import org.hibernate.search.backend.elasticsearch.util.spi.URLEncodedString; @@ -55,15 +55,14 @@ class ElasticsearchBackendImpl implements BackendImplementor, private final BackendThreads threads; private final ElasticsearchLinkImpl link; - private final ElasticsearchWorkOrchestratorProvider orchestratorProvider; + private final ElasticsearchSimpleWorkOrchestrator generalPurposeOrchestrator; + private final ElasticsearchIndexFieldTypeFactoryProvider typeFactoryProvider; private final ElasticsearchAnalysisDefinitionRegistry analysisDefinitionRegistry; private final MultiTenancyStrategy multiTenancyStrategy; private final BeanHolder indexLayoutStrategyHolder; private final TypeNameMapping typeNameMapping; - private final ElasticsearchWorkOrchestratorImplementor queryOrchestrator; - private final EventContext eventContext; private final IndexManagerBackendContext indexManagerBackendContext; @@ -83,10 +82,9 @@ class ElasticsearchBackendImpl implements BackendImplementor, this.threads = threads; this.link = link; - this.orchestratorProvider = new ElasticsearchWorkOrchestratorProvider( - "Elasticsearch parallel work orchestrator for backend " + name, - threads, link, - failureHandler + this.generalPurposeOrchestrator = new ElasticsearchSimpleWorkOrchestrator( + "Elasticsearch general purpose orchestrator for backend " + name, + link ); this.analysisDefinitionRegistry = analysisDefinitionRegistry; this.multiTenancyStrategy = multiTenancyStrategy; @@ -94,17 +92,15 @@ class ElasticsearchBackendImpl implements BackendImplementor, this.indexLayoutStrategyHolder = indexLayoutStrategyHolder; this.typeNameMapping = typeNameMapping; - this.queryOrchestrator = orchestratorProvider.createParallelOrchestrator( "Elasticsearch query orchestrator for backend " + name ); - this.eventContext = EventContexts.fromBackendName( name ); this.indexManagerBackendContext = new IndexManagerBackendContext( - eventContext, link, + eventContext, threads, link, userFacingGson, multiTenancyStrategy, indexLayoutStrategyHolder.get(), typeNameMapping, - orchestratorProvider, - queryOrchestrator + failureHandler, + generalPurposeOrchestrator ); this.indexNamesRegistry = new IndexNamesRegistry(); } @@ -122,24 +118,18 @@ public String toString() { public void start(BackendStartContext context) { threads.onStart( context.getThreadPoolProvider() ); link.onStart( context.getConfigurationPropertySource() ); - orchestratorProvider.start(); - queryOrchestrator.start(); + generalPurposeOrchestrator.start(); } @Override public CompletableFuture preStop() { - return CompletableFuture.allOf( - queryOrchestrator.preStop(), - orchestratorProvider.preStop() - ); + return generalPurposeOrchestrator.preStop(); } @Override public void stop() { try ( Closer closer = new Closer<>() ) { - closer.push( ElasticsearchWorkOrchestratorImplementor::stop, queryOrchestrator ); - closer.push( ElasticsearchWorkOrchestratorProvider::stop, orchestratorProvider ); - // Close the client after the orchestrators, when we're sure all works have been performed + closer.push( ElasticsearchWorkOrchestratorImplementor::stop, generalPurposeOrchestrator ); closer.push( ElasticsearchLinkImpl::onStop, link ); closer.push( BeanHolder::close, indexLayoutStrategyHolder ); closer.push( BackendThreads::onStop, threads ); diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/ElasticsearchIndexManagerImpl.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/ElasticsearchIndexManagerImpl.java index a25685e2f8f..b3473e53626 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/ElasticsearchIndexManagerImpl.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/ElasticsearchIndexManagerImpl.java @@ -17,6 +17,7 @@ import org.hibernate.search.backend.elasticsearch.document.impl.ElasticsearchDocumentObjectBuilder; import org.hibernate.search.backend.elasticsearch.document.model.impl.ElasticsearchIndexModel; import org.hibernate.search.backend.elasticsearch.index.ElasticsearchIndexManager; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchBatchingWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.schema.management.impl.ElasticsearchIndexSchemaManager; import org.hibernate.search.backend.elasticsearch.schema.management.impl.ElasticsearchIndexLifecycleExecutionOptions; import org.hibernate.search.backend.elasticsearch.logging.impl.Log; @@ -77,8 +78,7 @@ class ElasticsearchIndexManagerImpl implements IndexManagerImplementor, private final ElasticsearchIndexModel model; private final List documentMetadataContributors; - private final ElasticsearchWorkOrchestratorImplementor serialOrchestrator; - private final ElasticsearchWorkOrchestratorImplementor parallelOrchestrator; + private final ElasticsearchBatchingWorkOrchestrator indexingOrchestrator; private ElasticsearchIndexSchemaManager schemaManager; @@ -88,8 +88,7 @@ class ElasticsearchIndexManagerImpl implements IndexManagerImplementor, this.backendContext = backendContext; this.model = model; this.documentMetadataContributors = documentMetadataContributors; - this.parallelOrchestrator = backendContext.createParallelOrchestrator( model.getHibernateSearchIndexName() ); - this.serialOrchestrator = backendContext.createSerialOrchestrator( model.getHibernateSearchIndexName() ); + this.indexingOrchestrator = backendContext.createIndexingOrchestrator( model.getHibernateSearchIndexName() ); } @Override @@ -122,30 +121,24 @@ model, createLifecycleExecutionOptions( context.getConfigurationPropertySource() } ); - serialOrchestrator.start(); - parallelOrchestrator.start(); + indexingOrchestrator.start(); } catch (RuntimeException e) { new SuppressingCloser( e ) - .push( ElasticsearchWorkOrchestratorImplementor::stop, parallelOrchestrator ) - .push( ElasticsearchWorkOrchestratorImplementor::stop, serialOrchestrator ); + .push( ElasticsearchWorkOrchestratorImplementor::stop, indexingOrchestrator ); throw e; } } @Override public CompletableFuture preStop() { - return CompletableFuture.allOf( - serialOrchestrator.preStop(), - parallelOrchestrator.preStop() - ); + return indexingOrchestrator.preStop(); } @Override public void stop() { try ( Closer closer = new Closer<>() ) { - closer.push( ElasticsearchWorkOrchestratorImplementor::stop, serialOrchestrator ); - closer.push( ElasticsearchWorkOrchestratorImplementor::stop, parallelOrchestrator ); + closer.push( ElasticsearchWorkOrchestratorImplementor::stop, indexingOrchestrator ); schemaManager = null; } catch (IOException e) { @@ -197,7 +190,7 @@ public IndexIndexingPlan createIndexingPlan(BackendSessionContext session DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) { // The commit strategy is ignored, because Elasticsearch always commits changes to its transaction log. return backendContext.createIndexingPlan( - serialOrchestrator, + indexingOrchestrator, this, sessionContext, entityReferenceFactory, @@ -208,14 +201,14 @@ public IndexIndexingPlan createIndexingPlan(BackendSessionContext session @Override public IndexIndexer createIndexer(BackendSessionContext sessionContext) { return backendContext.createIndexer( - serialOrchestrator, this, sessionContext + indexingOrchestrator, this, sessionContext ); } @Override public IndexWorkspace createWorkspace(DetachedBackendSessionContext sessionContext) { return backendContext.createWorkspace( - parallelOrchestrator, this, sessionContext + this, sessionContext ); } diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/IndexManagerBackendContext.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/IndexManagerBackendContext.java index a240e261c8d..e1d58522219 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/IndexManagerBackendContext.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/index/impl/IndexManagerBackendContext.java @@ -6,6 +6,9 @@ */ package org.hibernate.search.backend.elasticsearch.index.impl; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchBatchingWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads; import org.hibernate.search.backend.elasticsearch.schema.management.impl.ElasticsearchIndexLifecycleExecutionOptions; import org.hibernate.search.backend.elasticsearch.index.layout.IndexLayoutStrategy; import org.hibernate.search.backend.elasticsearch.document.model.impl.ElasticsearchIndexModel; @@ -15,9 +18,7 @@ import org.hibernate.search.backend.elasticsearch.lowlevel.index.impl.IndexMetadata; import org.hibernate.search.backend.elasticsearch.mapping.impl.TypeNameMapping; import org.hibernate.search.backend.elasticsearch.multitenancy.impl.MultiTenancyStrategy; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorImplementor; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorProvider; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.scope.model.impl.ElasticsearchScopeModel; import org.hibernate.search.backend.elasticsearch.search.impl.ElasticsearchSearchContext; import org.hibernate.search.backend.elasticsearch.search.projection.impl.ElasticsearchSearchProjection; @@ -37,6 +38,7 @@ import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexer; import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexingPlan; import org.hibernate.search.engine.backend.work.execution.spi.IndexWorkspace; +import org.hibernate.search.engine.reporting.FailureHandler; import org.hibernate.search.engine.search.loading.context.spi.LoadingContextBuilder; import org.hibernate.search.util.common.reporting.EventContext; @@ -45,28 +47,31 @@ public class IndexManagerBackendContext implements SearchBackendContext, WorkExecutionBackendContext { private final EventContext eventContext; + private final BackendThreads threads; private final ElasticsearchLink link; private final Gson userFacingGson; private final MultiTenancyStrategy multiTenancyStrategy; private final IndexLayoutStrategy indexLayoutStrategy; - private final ElasticsearchWorkOrchestratorProvider orchestratorProvider; - private final ElasticsearchWorkOrchestrator queryOrchestrator; + private final FailureHandler failureHandler; + private final ElasticsearchParallelWorkOrchestrator generalPurposeOrchestrator; private final SearchProjectionBackendContext searchProjectionBackendContext; - public IndexManagerBackendContext(EventContext eventContext, ElasticsearchLink link, Gson userFacingGson, + public IndexManagerBackendContext(EventContext eventContext, + BackendThreads threads, ElasticsearchLink link, Gson userFacingGson, MultiTenancyStrategy multiTenancyStrategy, IndexLayoutStrategy indexLayoutStrategy, TypeNameMapping typeNameMapping, - ElasticsearchWorkOrchestratorProvider orchestratorProvider, - ElasticsearchWorkOrchestrator queryOrchestrator) { + FailureHandler failureHandler, + ElasticsearchParallelWorkOrchestrator generalPurposeOrchestrator) { this.eventContext = eventContext; + this.threads = threads; this.link = link; this.userFacingGson = userFacingGson; this.multiTenancyStrategy = multiTenancyStrategy; this.indexLayoutStrategy = indexLayoutStrategy; - this.orchestratorProvider = orchestratorProvider; - this.queryOrchestrator = queryOrchestrator; + this.failureHandler = failureHandler; + this.generalPurposeOrchestrator = generalPurposeOrchestrator; this.searchProjectionBackendContext = new SearchProjectionBackendContext( typeNameMapping.getTypeNameExtractionHelper(), @@ -81,7 +86,7 @@ public String toString() { @Override public IndexIndexingPlan createIndexingPlan( - ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchSerialWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, BackendSessionContext sessionContext, EntityReferenceFactory entityReferenceFactory, DocumentRefreshStrategy refreshStrategy) { @@ -98,7 +103,7 @@ public IndexIndexingPlan createIndexingPlan( @Override public IndexIndexer createIndexer( - ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchSerialWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, BackendSessionContext sessionContext) { multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext ); @@ -108,14 +113,13 @@ public IndexIndexer createIndexer( } @Override - public IndexWorkspace createWorkspace(ElasticsearchWorkOrchestrator orchestrator, - WorkExecutionIndexManagerContext indexManagerContext, + public IndexWorkspace createWorkspace(WorkExecutionIndexManagerContext indexManagerContext, DetachedBackendSessionContext sessionContext) { multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext ); return new ElasticsearchIndexWorkspace( - link.getWorkBuilderFactory(), multiTenancyStrategy, orchestrator, indexManagerContext, - sessionContext + link.getWorkBuilderFactory(), multiTenancyStrategy, generalPurposeOrchestrator, + indexManagerContext, sessionContext ); } @@ -144,7 +148,7 @@ public ElasticsearchSearchQueryBuilder createSearchQueryBuilder( multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext ); return new ElasticsearchSearchQueryBuilder<>( link.getWorkBuilderFactory(), link.getSearchResultExtractorFactory(), - queryOrchestrator, + generalPurposeOrchestrator, searchContext, sessionContext, loadingContextBuilder, rootProjection ); } @@ -162,21 +166,17 @@ ElasticsearchIndexSchemaManager createSchemaManager(ElasticsearchIndexModel mode model.contributeLowLevelMetadata( builder ); IndexMetadata expectedMetadata = builder.build(); return new ElasticsearchIndexSchemaManager( - link.getWorkBuilderFactory(), orchestratorProvider.getRootParallelOrchestrator(), + link.getWorkBuilderFactory(), generalPurposeOrchestrator, indexLayoutStrategy, model.getNames(), expectedMetadata, lifecycleExecutionOptions ); } - ElasticsearchWorkOrchestratorImplementor createSerialOrchestrator(String indexName) { - return orchestratorProvider.createSerialOrchestrator( - "Elasticsearch serial work orchestrator for index " + indexName - ); - } - - ElasticsearchWorkOrchestratorImplementor createParallelOrchestrator(String indexName) { - return orchestratorProvider.createParallelOrchestrator( - "Elasticsearch parallel work orchestrator for index " + indexName + ElasticsearchBatchingWorkOrchestrator createIndexingOrchestrator(String indexName) { + return new ElasticsearchBatchingWorkOrchestrator( + "Elasticsearch indexing orchestrator for index " + indexName, + threads, link, + failureHandler ); } diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/AbstractElasticsearchWorkOrchestrator.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/AbstractElasticsearchWorkOrchestrator.java index 5418f7e230e..58e21f0f96e 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/AbstractElasticsearchWorkOrchestrator.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/AbstractElasticsearchWorkOrchestrator.java @@ -6,28 +6,25 @@ */ package org.hibernate.search.backend.elasticsearch.orchestration.impl; -import java.util.concurrent.CompletableFuture; - -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; +import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink; +import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext; import org.hibernate.search.engine.backend.orchestration.spi.AbstractWorkOrchestrator; -import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork; /** * An abstract base for {@link ElasticsearchWorkOrchestratorImplementor} implementations. */ -abstract class AbstractElasticsearchWorkOrchestrator - extends AbstractWorkOrchestrator> +abstract class AbstractElasticsearchWorkOrchestrator + extends AbstractWorkOrchestrator implements ElasticsearchWorkOrchestratorImplementor { - AbstractElasticsearchWorkOrchestrator(String name) { + protected final ElasticsearchLink link; + + AbstractElasticsearchWorkOrchestrator(String name, ElasticsearchLink link) { super( name ); + this.link = link; } - @Override - public CompletableFuture submit(ElasticsearchWork work) { - CompletableFuture future = new CompletableFuture<>(); - submit( new ElasticsearchBatchedWork<>( work, future ) ); - return future; + protected final ElasticsearchWorkExecutionContext createWorkExecutionContext() { + return new ElasticsearchWorkExecutionContextImpl( link.getClient(), link.getGsonProvider() ); } - } diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWork.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWork.java index b95e91259a0..1c601111c5f 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWork.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWork.java @@ -12,7 +12,7 @@ import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork; import org.hibernate.search.util.common.impl.Futures; -class ElasticsearchBatchedWork implements BatchedWork { +class ElasticsearchBatchedWork implements BatchedWork { private final ElasticsearchWork work; private final CompletableFuture future; @@ -22,7 +22,7 @@ class ElasticsearchBatchedWork implements BatchedWork + * Works are added by submitting as many works as necessary through {@link #submit(ElasticsearchWork)}. + * Execution starts as soon as possible, + * which may be as late as when {@link #endBatch()} is called. *

* Two works submitted to this orchestrator in the same batch will always be executed * one after the other, never in parallel. *

* This class is mutable and not thread-safe. */ -class ElasticsearchSerialWorkProcessor implements ElasticsearchWorkProcessor { +class ElasticsearchBatchedWorkProcessor implements BatchedWorkProcessor { private final BulkAndSequenceAggregator aggregator; - public ElasticsearchSerialWorkProcessor(ElasticsearchWorkSequenceBuilder sequenceBuilder, + public ElasticsearchBatchedWorkProcessor(ElasticsearchWorkSequenceBuilder sequenceBuilder, ElasticsearchWorkBulker bulker) { this.aggregator = new BulkAndSequenceAggregator( sequenceBuilder, bulker ); } @@ -36,7 +41,6 @@ public void beginBatch() { aggregator.reset(); } - @Override public CompletableFuture submit(ElasticsearchWork work) { return work.aggregate( aggregator ); } diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchingWorkOrchestrator.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchingWorkOrchestrator.java index 7dfc115b0fa..0be70780ec5 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchingWorkOrchestrator.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchingWorkOrchestrator.java @@ -8,65 +8,66 @@ import java.util.concurrent.CompletableFuture; +import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink; import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads; +import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork; import org.hibernate.search.engine.backend.orchestration.spi.BatchingExecutor; import org.hibernate.search.engine.reporting.FailureHandler; /** - * An orchestrator sharing context across multiple threads, - * allowing works from different threads to be executed in a single thread, - * thus producing bigger bulk works. - *

- * More precisely, the submitted works are sent to a queue which is processed periodically + * An orchestrator sending works to a queue which is processed periodically * in a separate thread. - * This allows processing more works when orchestrating, which allows using bulk works - * more extensively. + *

+ * Works are processed in the order they are submitted. + *

+ * Processing works in a single thread means more works can be processed at a time, + * which is a good thing when using bulk works. */ -class ElasticsearchBatchingWorkOrchestrator extends AbstractElasticsearchWorkOrchestrator - implements ElasticsearchWorkOrchestratorImplementor { +public class ElasticsearchBatchingWorkOrchestrator extends AbstractElasticsearchWorkOrchestrator> + implements ElasticsearchSerialWorkOrchestrator, ElasticsearchWorkOrchestratorImplementor { + + // TODO HSEARCH-3575 make this configurable + private static final int MAX_BULK_SIZE = 250; + /* + * Setting the following constant involves a bit of guesswork. + * Basically we want the number to be large enough for the orchestrator + * to create bulks of the maximum size defined above most of the time, + * and to avoid cases where the queue is full as much as possible, + * because threads submitting works will block when that happens. + * But we also want to keep the number as low as possible to avoid + * consuming too much memory with pending works. + */ + // TODO HSEARCH-3575 make this configurable + private static final int QUEUE_SIZE = 10 * MAX_BULK_SIZE; private final BackendThreads threads; - private final BatchingExecutor executor; + private final BatchingExecutor executor; /** * @param name The name of the orchestrator thread (and of this orchestrator when reporting errors) - * @param processor A work processor to use in the background thread. * @param threads The threads for this backend. - * @param maxWorksPerBatch The maximum number of works to - * process in a single batch. Higher values mean lesser chance of transport - * thread starvation, but higher heap consumption. - * @param fair if {@code true} works are always submitted to the - * delegate in FIFO order, if {@code false} works submitted - * when the internal queue is full may be submitted out of order. + * @param link The Elasticsearch link for this backend. * @param failureHandler A failure handler to report failures of the background thread. */ - ElasticsearchBatchingWorkOrchestrator( - String name, ElasticsearchWorkProcessor processor, BackendThreads threads, - int maxWorksPerBatch, boolean fair, + public ElasticsearchBatchingWorkOrchestrator( + String name, BackendThreads threads, ElasticsearchLink link, FailureHandler failureHandler) { - super( name ); + super( name, link ); this.threads = threads; + ElasticsearchBatchedWorkProcessor processor = createProcessor(); this.executor = new BatchingExecutor<>( - name, processor, maxWorksPerBatch, fair, + name, processor, QUEUE_SIZE, + true, /* enqueue works in the exact order they were submitted */ failureHandler ); } - /** - * Create a child orchestrator. - *

- * The child orchestrator will use the same resources and its works - * will be executed by the same background thread. - *

- * Closing the child will not close the parent, - * but will make the current thread wait for the completion of previously submitted works, - * and will prevent any more work to be submitted through the child. - * - * @param name The name of the child orchestrator when reporting errors - */ - public ElasticsearchWorkOrchestratorImplementor createChild(String name) { - return new ElasticsearchChildBatchingWorkOrchestrator( name ); + @Override + public CompletableFuture submit(ElasticsearchWork work) { + CompletableFuture future = new CompletableFuture<>(); + submit( new ElasticsearchBatchedWork<>( work, future ) ); + return future; } @Override @@ -75,7 +76,7 @@ protected void doStart() { } @Override - protected void doSubmit(BatchedWork work) throws InterruptedException { + protected void doSubmit(BatchedWork work) throws InterruptedException { executor.submit( work ); } @@ -89,36 +90,16 @@ protected void doStop() { executor.stop(); } - private class ElasticsearchChildBatchingWorkOrchestrator extends AbstractElasticsearchWorkOrchestrator - implements ElasticsearchWorkOrchestratorImplementor { - - protected ElasticsearchChildBatchingWorkOrchestrator(String name) { - super( name ); - } - - @Override - protected void doStart() { - // uses the resources of the parent orchestrator - } - - @Override - protected void doSubmit(BatchedWork work) { - ElasticsearchBatchingWorkOrchestrator.this.submit( work ); - } - - @Override - protected CompletableFuture getCompletion() { - /* - * TODO HSEARCH-3576 this will wait for *all* tasks to finish, including tasks from other children. - * We should do better. - */ - return ElasticsearchBatchingWorkOrchestrator.this.getCompletion(); - } - - @Override - protected void doStop() { - // uses the resources of the parent orchestrator - } + private ElasticsearchBatchedWorkProcessor createProcessor() { + ElasticsearchWorkSequenceBuilder sequenceBuilder = + new ElasticsearchDefaultWorkSequenceBuilder( this::createWorkExecutionContext ); + ElasticsearchWorkBulker bulker = new ElasticsearchDefaultWorkBulker( + sequenceBuilder, + (worksToBulk, refreshStrategy) -> + link.getWorkBuilderFactory().bulk( worksToBulk ).refresh( refreshStrategy ).build(), + MAX_BULK_SIZE + ); + return new ElasticsearchBatchedWorkProcessor( sequenceBuilder, bulker ); } } diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkOrchestrator.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkOrchestrator.java new file mode 100644 index 00000000000..85dff61440f --- /dev/null +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkOrchestrator.java @@ -0,0 +1,32 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.backend.elasticsearch.orchestration.impl; + +import java.util.concurrent.CompletableFuture; + +import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork; + +/** + * A thread-safe component planning the execution of works + * in parallel, without any consideration for the order they were submitted in. + *

+ * Parallel orchestrators are suitable when the client takes the responsibility + * of submitting works as needed to implement ordering: if work #2 must be executed after work #1, + * the client will take care of waiting until #1 is done before he submits #2. + *

+ * With a parallel orchestrator: + *

    + *
  • Works are executed in unpredictable order, irrespective of the order they were submitted in. + *
  • Two submitted works may be sent together in a single bulk request. + *
  • The application will not wait for already-submitted works to finish when shutting down. + *
+ */ +public interface ElasticsearchParallelWorkOrchestrator { + + CompletableFuture submit(NonBulkableWork work); + +} diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkProcessor.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkProcessor.java deleted file mode 100644 index e84a038ba16..00000000000 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkProcessor.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Hibernate Search, full-text search for your domain model - * - * License: GNU Lesser General Public License (LGPL), version 2.1 or later - * See the lgpl.txt file in the root directory or . - */ -package org.hibernate.search.backend.elasticsearch.orchestration.impl; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkAggregator; -import org.hibernate.search.backend.elasticsearch.work.impl.BulkableWork; -import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork; - - -/** - * Executes works in parallel, - * without any consideration for the order they were submitted in. - *

- * Two works submitted to this processor in the same batch will be executed - * in the same bulk if possible, or in parallel if they can't be bulked. - *

- * This class is mutable and not thread-safe. - */ -class ElasticsearchParallelWorkProcessor implements ElasticsearchWorkProcessor { - - private final BulkAndSequenceAggregator aggregator; - private final List> sequenceFutures = new ArrayList<>(); - - ElasticsearchParallelWorkProcessor(ElasticsearchWorkSequenceBuilder sequenceBuilder, - ElasticsearchWorkBulker bulker) { - this.aggregator = new BulkAndSequenceAggregator( sequenceBuilder, bulker ); - } - - @Override - public void beginBatch() { - aggregator.reset(); - sequenceFutures.clear(); - } - - @Override - public CompletableFuture submit(ElasticsearchWork work) { - aggregator.initSequence(); - CompletableFuture future = work.aggregate( aggregator ); - sequenceFutures.add( aggregator.buildSequence() ); - return future; - } - - @Override - public CompletableFuture endBatch() { - CompletableFuture future = - CompletableFuture.allOf( sequenceFutures.toArray( new CompletableFuture[0] ) ); - sequenceFutures.clear(); - aggregator.startSequences(); - // Sequence futures are not expected to fail even if one work fails, - // so we can safely return this future directly. - return future; - } - - @Override - public void complete() { - // Nothing to do: if all individual works have completed, we're done. - } - - private static class BulkAndSequenceAggregator implements ElasticsearchWorkAggregator { - - private final ElasticsearchWorkSequenceBuilder sequenceBuilder; - private final ElasticsearchWorkBulker bulker; - - public BulkAndSequenceAggregator(ElasticsearchWorkSequenceBuilder sequenceBuilder, - ElasticsearchWorkBulker bulker) { - this.sequenceBuilder = sequenceBuilder; - this.bulker = bulker; - } - - public void reset() { - bulker.reset(); - } - - public void initSequence() { - sequenceBuilder.init( CompletableFuture.completedFuture( null ) ); - } - - @Override - public CompletableFuture addBulkable(BulkableWork work) { - return bulker.add( work ); - } - - @Override - public CompletableFuture addNonBulkable(NonBulkableWork work) { - return sequenceBuilder.addNonBulkExecution( work ); - } - - public CompletableFuture buildSequence() { - return sequenceBuilder.build(); - } - - public void startSequences() { - bulker.finalizeBulkWork(); - } - - } -} \ No newline at end of file diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSerialWorkOrchestrator.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSerialWorkOrchestrator.java new file mode 100644 index 00000000000..838e32056bf --- /dev/null +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSerialWorkOrchestrator.java @@ -0,0 +1,52 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.backend.elasticsearch.orchestration.impl; + +import java.util.concurrent.CompletableFuture; + +import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; +import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork; + +/** + * A thread-safe component ordering and planning the execution of works + * serially, in the order they were submitted. + *

+ * Serial orchestrators are best used as index-scoped orchestrators, + * when many works are submitted from different threads: + * they allow easy implementation of a reasonably safe (though imperfect) concurrency control by expecting + * the most recent work to hold the most recent data to be indexed. + *

+ * With a serial orchestrator: + *

    + *
  • Works are executed in the order they were submitted in. + *
  • Two submitted works from the may be sent together in a single bulk request, + * but only if all the works between them are bulked too. + *
  • The application will wait for already-submitted works to finish when shutting down. + *
+ *

+ * Note that while serial orchestrators preserve ordering as best they can, + * they lead to a lesser throughput and can only guarantee ordering within a single JVM. + * When multiple JVMs with multiple instances of Hibernate Search target the same index, + * the relative order of indexing works might end up being different + * from the relative order of the database changes that triggered indexing, + * eventually leading to out-of-sync indexes. + */ +public interface ElasticsearchSerialWorkOrchestrator { + + default CompletableFuture submit(ElasticsearchWork work) { + CompletableFuture future = new CompletableFuture<>(); + submit( new ElasticsearchBatchedWork<>( work, future ) ); + return future; + } + + default void submit(CompletableFuture future, ElasticsearchWork work) { + submit( new ElasticsearchBatchedWork<>( work, future ) ); + } + + void submit(BatchedWork work); + +} diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSimpleWorkOrchestrator.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSimpleWorkOrchestrator.java new file mode 100644 index 00000000000..e55187b0484 --- /dev/null +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSimpleWorkOrchestrator.java @@ -0,0 +1,73 @@ +/* + * Hibernate Search, full-text search for your domain model + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.search.backend.elasticsearch.orchestration.impl; + +import java.util.concurrent.CompletableFuture; + +import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink; +import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext; +import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork; + +public class ElasticsearchSimpleWorkOrchestrator + extends AbstractElasticsearchWorkOrchestrator> + implements ElasticsearchParallelWorkOrchestrator, ElasticsearchWorkOrchestratorImplementor { + + private ElasticsearchWorkExecutionContext executionContext; + + public ElasticsearchSimpleWorkOrchestrator(String name, ElasticsearchLink link) { + super( name, link ); + } + + @Override + public CompletableFuture submit(NonBulkableWork work) { + WorkExecution workExecution = new WorkExecution<>( work ); + submit( workExecution ); + return workExecution.getResult(); + } + + @Override + protected void doStart() { + executionContext = createWorkExecutionContext(); + } + + @Override + protected void doSubmit(WorkExecution work) { + work.execute( executionContext ); + } + + @Override + protected CompletableFuture getCompletion() { + // We do not wait for these works to finish; + // callers were provided with a future and are responsible for waiting + // before they close the application. + return CompletableFuture.completedFuture( null ); + } + + @Override + protected void doStop() { + executionContext = null; + } + + static class WorkExecution { + private final NonBulkableWork work; + + private CompletableFuture result; + + WorkExecution(NonBulkableWork work) { + this.work = work; + } + + public void execute(ElasticsearchWorkExecutionContext executionContext) { + result = work.execute( executionContext ); + } + + public CompletableFuture getResult() { + return result; + } + } + +} diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestrator.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestrator.java deleted file mode 100644 index 8b8575e60dd..00000000000 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestrator.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Hibernate Search, full-text search for your domain model - * - * License: GNU Lesser General Public License (LGPL), version 2.1 or later - * See the lgpl.txt file in the root directory or . - */ -package org.hibernate.search.backend.elasticsearch.orchestration.impl; - -import java.util.concurrent.CompletableFuture; - -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; -import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork; - -/** - * A thread-safe component responsible for ordering and planning the execution of works. - */ -public interface ElasticsearchWorkOrchestrator { - - default CompletableFuture submit(ElasticsearchWork work) { - CompletableFuture future = new CompletableFuture<>(); - submit( new ElasticsearchBatchedWork<>( work, future ) ); - return future; - } - - default void submit(CompletableFuture future, ElasticsearchWork work) { - submit( new ElasticsearchBatchedWork<>( work, future ) ); - } - - void submit(BatchedWork work); - -} diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorImplementor.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorImplementor.java index 02ccdf415a8..7f4677386ac 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorImplementor.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorImplementor.java @@ -9,9 +9,9 @@ import java.util.concurrent.CompletableFuture; /** - * An extension of {@link ElasticsearchWorkOrchestrator} exposing methods to control its lifecycle. + * An orchestrator implementor, exposing methods to control its lifecycle. */ -public interface ElasticsearchWorkOrchestratorImplementor extends ElasticsearchWorkOrchestrator { +public interface ElasticsearchWorkOrchestratorImplementor { /** * Start any resource necessary to operate the orchestrator at runtime. diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorProvider.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorProvider.java deleted file mode 100644 index b671b85faf9..00000000000 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkOrchestratorProvider.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Hibernate Search, full-text search for your domain model - * - * License: GNU Lesser General Public License (LGPL), version 2.1 or later - * See the lgpl.txt file in the root directory or . - */ -package org.hibernate.search.backend.elasticsearch.orchestration.impl; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; - -import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink; -import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads; -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext; -import org.hibernate.search.engine.reporting.FailureHandler; - -/** - * Provides access to various orchestrators. - * - *

Orchestrator types

- * - * Each orchestrator has its own characteristics and is suitable for different use cases. - * We distinguish in particular between parallel orchestrators and serial orchestrators. - * - *

Parallel orchestrators

- * - * Parallel orchestrators execute works in no particular order. - *

- * They are suitable when the client takes the responsibility - * of submitting works as needed to implement ordering: if work #2 must be executed after work #1, - * the client will take care of waiting until #1 is done before he submits #2. - *

- * With a parallel orchestrator: - *

    - *
  • Works are executed in unpredictable order, irrespective of the order they were submitted in. - *
  • Two submitted works may be sent together in a single bulk request. - *
- *

- * Parallel orchestrators from a single {@link ElasticsearchWorkOrchestratorProvider} (i.e. from a single backend) - * rely on the same resources (same queue and consumer thread). - * - *

Serial orchestrators

- * - * Serial orchestrators execute works in the order they were submitted. - *

- * They are best used as index-scoped orchestrators, when many works are submitted from different threads: - * they allow easy implementation of a reasonably safe (though imperfect) concurrency control by expecting - * the most recent work to hold the most recent data to be indexed. - *

- * With a serial orchestrator: - *

    - *
  • Works are executed in the order they were submitted in. - *
  • Two submitted works from the may be sent together in a single bulk request, - * but only if all the works between them are bulked too. - *
- *

- * Serial orchestrators from a single {@link ElasticsearchWorkOrchestratorProvider} (i.e. from a single backend) - * rely on the separate resources (each has a dedicated queue and consumer thread). - *

- * Note that while serial orchestrators preserve ordering as best they can, - * they lead to a lesser throughput and can only guarantee ordering within a single JVM. - * When multiple JVMs with multiple instances of Hibernate Search target the same index - */ -public class ElasticsearchWorkOrchestratorProvider { - - private static final int MAX_BULK_SIZE = 250; - - /* - * Setting the following constants involves a bit of guesswork. - * Basically we want the number to be large enough for the orchestrator - * to create bulks of the maximum size defined above most of the time, - * and to avoid cases where the queue is full as much as possible, - * because threads submitting works will block when that happens. - * But we also want to keep the number as low as possible to avoid - * consuming too much memory with pending works. - * Here we set the number for parallel orchestrators higher than the number - * for serial orchestrators, because the parallel orchestrators rely on a single - * consumer thread shared between all index managers. - */ - private static final int SERIAL_MAX_WORKS_PER_BATCH = 10 * MAX_BULK_SIZE; - private static final int PARALLEL_MAX_WORKS_PER_BATCH = 20 * MAX_BULK_SIZE; - - private final BackendThreads threads; - private final ElasticsearchLink link; - private final FailureHandler failureHandler; - - private final ElasticsearchBatchingWorkOrchestrator rootParallelOrchestrator; - - public ElasticsearchWorkOrchestratorProvider(String rootParallelOrchestratorName, - BackendThreads threads, ElasticsearchLink link, - FailureHandler failureHandler) { - this.threads = threads; - this.link = link; - this.failureHandler = failureHandler; - - /* - * The following orchestrator doesn't require a strict execution ordering - * (because it's mainly used by the mass indexer, which already takes care of - * ordering works properly and waiting for pending works when necessary). - * Thus we use a parallel orchestrator to maximize throughput. - */ - this.rootParallelOrchestrator = createBatchingSharedOrchestrator( - rootParallelOrchestratorName, - createParallelWorkProcessor(), - PARALLEL_MAX_WORKS_PER_BATCH, - false // Do not care about ordering when queuing works - ); - } - - public void start() { - rootParallelOrchestrator.start(); - } - - public CompletableFuture preStop() { - return rootParallelOrchestrator.preStop(); - } - - public void stop() { - rootParallelOrchestrator.stop(); - } - - /** - * @return The root parallel orchestrator. Useful to execute operations after an index manager was closed, - * such as index dropping. - */ - public ElasticsearchBatchingWorkOrchestrator getRootParallelOrchestrator() { - return rootParallelOrchestrator; - } - - /** - * @param name The name of the orchestrator to create. - * @return A serial orchestrator. - */ - public ElasticsearchWorkOrchestratorImplementor createSerialOrchestrator(String name) { - ElasticsearchWorkProcessor processor = createSerialWorkProcessor(); - - return createBatchingSharedOrchestrator( - name, - processor, - SERIAL_MAX_WORKS_PER_BATCH, - true /* enqueue works in the exact order they were submitted */ - ); - } - - /** - * @param name The name of the orchestrator to create. - * @return A parallel orchestrator. - */ - public ElasticsearchWorkOrchestratorImplementor createParallelOrchestrator(String name) { - return rootParallelOrchestrator.createChild( name ); - } - - private ElasticsearchBatchingWorkOrchestrator createBatchingSharedOrchestrator( - String name, ElasticsearchWorkProcessor processor, - int maxWorksPerBatch, boolean fair) { - return new ElasticsearchBatchingWorkOrchestrator( - name, processor, threads, - maxWorksPerBatch, fair, - failureHandler - ); - } - - private ElasticsearchWorkProcessor createSerialWorkProcessor() { - ElasticsearchWorkSequenceBuilder sequenceBuilder = createSequenceBuilder( this::createWorkExecutionContext ); - ElasticsearchWorkBulker bulker = createBulker( sequenceBuilder ); - return new ElasticsearchSerialWorkProcessor( sequenceBuilder, bulker ); - } - - private ElasticsearchWorkProcessor createParallelWorkProcessor() { - ElasticsearchWorkSequenceBuilder sequenceBuilder = createSequenceBuilder( this::createWorkExecutionContext ); - ElasticsearchWorkBulker bulker = createBulker( sequenceBuilder ); - return new ElasticsearchParallelWorkProcessor( sequenceBuilder, bulker ); - } - - private ElasticsearchWorkSequenceBuilder createSequenceBuilder(Supplier contextSupplier) { - return new ElasticsearchDefaultWorkSequenceBuilder( contextSupplier ); - } - - private ElasticsearchWorkBulker createBulker(ElasticsearchWorkSequenceBuilder sequenceBuilder) { - return new ElasticsearchDefaultWorkBulker( - sequenceBuilder, - (worksToBulk, refreshStrategy) -> - link.getWorkBuilderFactory().bulk( worksToBulk ).refresh( refreshStrategy ).build(), - MAX_BULK_SIZE - ); - } - - private ElasticsearchWorkExecutionContext createWorkExecutionContext() { - return new ElasticsearchWorkExecutionContextImpl( link.getClient(), link.getGsonProvider() ); - } - -} diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkProcessor.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkProcessor.java deleted file mode 100644 index 1be9e0d148a..00000000000 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchWorkProcessor.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Hibernate Search, full-text search for your domain model - * - * License: GNU Lesser General Public License (LGPL), version 2.1 or later - * See the lgpl.txt file in the root directory or . - */ -package org.hibernate.search.backend.elasticsearch.orchestration.impl; - -import java.util.concurrent.CompletableFuture; - -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; -import org.hibernate.search.engine.backend.orchestration.spi.BatchedWorkProcessor; - -/** - * An thread-unsafe component responsible for accumulating works to be executed, - * then executing them according to an implementation-specific orchestration scheme. - *

- * Works are added by submitting as many works as necessary through {@link #submit(ElasticsearchWork)}. - * Execution starts as soon as possible, - * which may be as late as when {@link #endBatch()} is called. - *

- * Depending on the implementation, works may be executed serially, or in parallel. - */ -public interface ElasticsearchWorkProcessor extends BatchedWorkProcessor { - - CompletableFuture submit(ElasticsearchWork work); - -} diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchIndexSchemaManager.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchIndexSchemaManager.java index 8b894e5d9f0..3ef97c94f79 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchIndexSchemaManager.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchIndexSchemaManager.java @@ -11,7 +11,7 @@ import org.hibernate.search.backend.elasticsearch.index.layout.IndexLayoutStrategy; import org.hibernate.search.backend.elasticsearch.index.layout.impl.IndexNames; import org.hibernate.search.backend.elasticsearch.lowlevel.index.impl.IndexMetadata; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.util.spi.URLEncodedString; import org.hibernate.search.backend.elasticsearch.work.builder.factory.impl.ElasticsearchWorkBuilderFactory; import org.hibernate.search.engine.backend.schema.management.spi.IndexSchemaManager; @@ -30,7 +30,7 @@ public class ElasticsearchIndexSchemaManager implements IndexSchemaManager { private final ElasticsearchIndexLifecycleExecutionOptions executionOptions; public ElasticsearchIndexSchemaManager(ElasticsearchWorkBuilderFactory workBuilderFactory, - ElasticsearchWorkOrchestrator workOrchestrator, + ElasticsearchParallelWorkOrchestrator workOrchestrator, IndexLayoutStrategy indexLayoutStrategy, IndexNames indexNames, IndexMetadata expectedMetadata, ElasticsearchIndexLifecycleExecutionOptions executionOptions) { diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchSchemaAccessor.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchSchemaAccessor.java index a53766bc2d4..e894a5ddfd2 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchSchemaAccessor.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/schema/management/impl/ElasticsearchSchemaAccessor.java @@ -18,10 +18,10 @@ import org.hibernate.search.backend.elasticsearch.lowlevel.index.mapping.impl.RootTypeMapping; import org.hibernate.search.backend.elasticsearch.lowlevel.index.settings.impl.IndexSettings; import org.hibernate.search.backend.elasticsearch.lowlevel.index.aliases.impl.IndexAliasDefinition; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.util.spi.URLEncodedString; import org.hibernate.search.backend.elasticsearch.work.builder.factory.impl.ElasticsearchWorkBuilderFactory; -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; +import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork; import org.hibernate.search.backend.elasticsearch.work.result.impl.CreateIndexResult; import org.hibernate.search.backend.elasticsearch.work.result.impl.ExistingIndexMetadata; import org.hibernate.search.util.common.impl.Futures; @@ -38,17 +38,17 @@ public class ElasticsearchSchemaAccessor { private final ElasticsearchWorkBuilderFactory workBuilderFactory; - private final ElasticsearchWorkOrchestrator orchestrator; + private final ElasticsearchParallelWorkOrchestrator orchestrator; public ElasticsearchSchemaAccessor(ElasticsearchWorkBuilderFactory workBuilderFactory, - ElasticsearchWorkOrchestrator orchestrator) { + ElasticsearchParallelWorkOrchestrator orchestrator) { this.workBuilderFactory = workBuilderFactory; this.orchestrator = orchestrator; } public CompletableFuture createIndexAssumeNonExisting(URLEncodedString primaryIndexName, Map aliases, IndexSettings settings, RootTypeMapping mapping) { - ElasticsearchWork work = getWorkFactory().createIndex( primaryIndexName ) + NonBulkableWork work = getWorkFactory().createIndex( primaryIndexName ) .aliases( aliases ) .settings( settings ) .mapping( mapping ) @@ -65,7 +65,7 @@ public CompletableFuture createIndexAssumeNonExisting(URLEncodedString primar */ public CompletableFuture createIndexIgnoreExisting(URLEncodedString primaryIndexName, Map aliases, IndexSettings settings, RootTypeMapping mapping) { - ElasticsearchWork work = getWorkFactory().createIndex( primaryIndexName ) + NonBulkableWork work = getWorkFactory().createIndex( primaryIndexName ) .aliases( aliases ) .settings( settings ) .mapping( mapping ) @@ -83,7 +83,7 @@ public CompletableFuture getCurrentIndexMetadataOrNull(In } private CompletableFuture getCurrentIndexMetadata(IndexNames indexNames, boolean allowNull) { - ElasticsearchWork> work = getWorkFactory().getIndexMetadata() + NonBulkableWork> work = getWorkFactory().getIndexMetadata() .index( indexNames.getWrite() ) .index( indexNames.getRead() ) .build(); @@ -113,7 +113,7 @@ private CompletableFuture getCurrentIndexMetadata(IndexNa } public CompletableFuture putAliases(URLEncodedString indexName, Map aliases) { - ElasticsearchWork work = getWorkFactory().putIndexAliases( indexName, aliases ).build(); + NonBulkableWork work = getWorkFactory().putIndexAliases( indexName, aliases ).build(); return execute( work ) .exceptionally( Futures.handler( e -> { throw log.elasticsearchSettingsUpdateFailed( @@ -123,7 +123,7 @@ public CompletableFuture putAliases(URLEncodedString indexName, Map updateSettings(URLEncodedString indexName, IndexSettings settings) { - ElasticsearchWork work = getWorkFactory().putIndexSettings( indexName, settings ).build(); + NonBulkableWork work = getWorkFactory().putIndexSettings( indexName, settings ).build(); return execute( work ) .exceptionally( Futures.handler( e -> { throw log.elasticsearchSettingsUpdateFailed( @@ -133,7 +133,7 @@ public CompletableFuture updateSettings(URLEncodedString indexName, IndexSett } public CompletableFuture putMapping(URLEncodedString indexName, RootTypeMapping mapping) { - ElasticsearchWork work = getWorkFactory().putIndexTypeMapping( indexName, mapping ).build(); + NonBulkableWork work = getWorkFactory().putIndexTypeMapping( indexName, mapping ).build(); return execute( work ) .exceptionally( Futures.handler( e -> { throw log.elasticsearchMappingCreationFailed( @@ -148,7 +148,7 @@ public CompletableFuture waitForIndexStatus(IndexNames indexNames, Elasticsea URLEncodedString alias = indexNames.getWrite(); - ElasticsearchWork work = + NonBulkableWork work = getWorkFactory().waitForIndexStatusWork( alias, requiredIndexStatus, timeoutAndUnit ) .build(); return execute( work ) @@ -161,18 +161,18 @@ public CompletableFuture waitForIndexStatus(IndexNames indexNames, Elasticsea } public CompletableFuture dropIndexIfExisting(URLEncodedString indexName) { - ElasticsearchWork work = getWorkFactory().dropIndex( indexName ).ignoreIndexNotFound().build(); + NonBulkableWork work = getWorkFactory().dropIndex( indexName ).ignoreIndexNotFound().build(); return execute( work ); } public CompletableFuture closeIndex(URLEncodedString indexName) { - ElasticsearchWork work = getWorkFactory().closeIndex( indexName ).build(); + NonBulkableWork work = getWorkFactory().closeIndex( indexName ).build(); return execute( work ) .thenRun( () -> log.closedIndex( indexName ) ); } public CompletableFuture openIndex(URLEncodedString indexName) { - ElasticsearchWork work = getWorkFactory().openIndex( indexName ).build(); + NonBulkableWork work = getWorkFactory().openIndex( indexName ).build(); return execute( work ) .thenRun( () -> log.openedIndex( indexName ) ); } @@ -181,7 +181,7 @@ private ElasticsearchWorkBuilderFactory getWorkFactory() { return workBuilderFactory; } - private CompletableFuture execute(ElasticsearchWork work) { + private CompletableFuture execute(NonBulkableWork work) { return orchestrator.submit( work ); } } diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryBuilder.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryBuilder.java index 0c0f21af758..e28275ac8a4 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryBuilder.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryBuilder.java @@ -16,9 +16,9 @@ import org.hibernate.search.backend.elasticsearch.gson.impl.JsonAccessor; import org.hibernate.search.backend.elasticsearch.lowlevel.query.impl.Queries; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.search.query.ElasticsearchSearchRequestTransformer; import org.hibernate.search.backend.elasticsearch.logging.impl.Log; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.search.aggregation.impl.ElasticsearchSearchAggregation; import org.hibernate.search.backend.elasticsearch.search.impl.ElasticsearchSearchContext; import org.hibernate.search.backend.elasticsearch.search.impl.ElasticsearchSearchQueryElementCollector; @@ -53,7 +53,7 @@ public class ElasticsearchSearchQueryBuilder private final ElasticsearchWorkBuilderFactory workFactory; private final ElasticsearchSearchResultExtractorFactory searchResultExtractorFactory; - private final ElasticsearchWorkOrchestrator queryOrchestrator; + private final ElasticsearchParallelWorkOrchestrator queryOrchestrator; private final ElasticsearchSearchContext searchContext; private final BackendSessionContext sessionContext; @@ -75,7 +75,7 @@ public class ElasticsearchSearchQueryBuilder public ElasticsearchSearchQueryBuilder( ElasticsearchWorkBuilderFactory workFactory, ElasticsearchSearchResultExtractorFactory searchResultExtractorFactory, - ElasticsearchWorkOrchestrator queryOrchestrator, + ElasticsearchParallelWorkOrchestrator queryOrchestrator, ElasticsearchSearchContext searchContext, BackendSessionContext sessionContext, LoadingContextBuilder loadingContextBuilder, diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryImpl.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryImpl.java index d8b2083b393..384ca8dc43c 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryImpl.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/search/query/impl/ElasticsearchSearchQueryImpl.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.search.query.ElasticsearchSearchRequestTransformer; import org.hibernate.search.backend.elasticsearch.gson.impl.JsonAccessor; import org.hibernate.search.backend.elasticsearch.logging.impl.Log; @@ -19,10 +20,9 @@ import org.hibernate.search.backend.elasticsearch.search.query.ElasticsearchSearchQuery; import org.hibernate.search.backend.elasticsearch.search.query.ElasticsearchSearchResult; import org.hibernate.search.backend.elasticsearch.util.spi.URLEncodedString; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.work.builder.factory.impl.ElasticsearchWorkBuilderFactory; import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchSearchResultExtractor; -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; +import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork; import org.hibernate.search.backend.elasticsearch.work.result.impl.ExplainResult; import org.hibernate.search.engine.common.dsl.spi.DslExtensionState; import org.hibernate.search.engine.backend.session.spi.BackendSessionContext; @@ -49,7 +49,7 @@ public class ElasticsearchSearchQueryImpl extends AbstractSearchQuery loadingContext; @@ -63,7 +63,7 @@ public class ElasticsearchSearchQueryImpl extends AbstractSearchQuery loadingContext, @@ -106,7 +106,7 @@ public Q extension(SearchQueryExtension extension) { @Override public ElasticsearchSearchResult fetch(Integer offset, Integer limit) { // TODO restore scrolling support. See HSEARCH-3323 - ElasticsearchWork> work = workFactory.search( payload, searchResultExtractor ) + NonBulkableWork> work = workFactory.search( payload, searchResultExtractor ) .indexes( searchContext.getHibernateSearchIndexNamesToIndexReadNames().values() ) .paging( defaultedLimit( limit, offset ), offset ) .routingKeys( routingKeys ) @@ -135,7 +135,7 @@ public long fetchTotalHitCount() { filteredPayload.add( "query", querySubTree.get() ); } - ElasticsearchWork work = workFactory.count( searchContext.getHibernateSearchIndexNamesToIndexReadNames().values() ) + NonBulkableWork work = workFactory.count( searchContext.getHibernateSearchIndexNamesToIndexReadNames().values() ) .query( filteredPayload ) .routingKeys( routingKeys ) .timeout( timeoutValue, timeoutUnit, exceptionOnTimeout ) @@ -201,7 +201,7 @@ private JsonObject doExplain(URLEncodedString encodedIndexName, String id) { queryOnlyPayload.add( "query", query ); } - ElasticsearchWork work = workFactory.explain( encodedIndexName, elasticsearchId, queryOnlyPayload ) + NonBulkableWork work = workFactory.explain( encodedIndexName, elasticsearchId, queryOnlyPayload ) .routingKeys( routingKeys ) .requestTransformer( ElasticsearchSearchRequestTransformerContextImpl.createTransformerFunction( requestTransformer ) diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexer.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexer.java index 11d76ea717f..520a078f748 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexer.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexer.java @@ -8,7 +8,7 @@ import java.util.concurrent.CompletableFuture; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork; import org.hibernate.search.backend.elasticsearch.work.builder.factory.impl.ElasticsearchWorkBuilderFactory; import org.hibernate.search.engine.backend.work.execution.spi.DocumentContributor; @@ -21,12 +21,12 @@ public class ElasticsearchIndexIndexer implements IndexIndexer { private final ElasticsearchWorkBuilderFactory factory; - private final ElasticsearchWorkOrchestrator orchestrator; + private final ElasticsearchSerialWorkOrchestrator orchestrator; private final WorkExecutionIndexManagerContext indexManagerContext; private final String tenantId; public ElasticsearchIndexIndexer(ElasticsearchWorkBuilderFactory factory, - ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchSerialWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, BackendSessionContext sessionContext) { this.factory = factory; diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlan.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlan.java index 0eb05bba295..5de76def041 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlan.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlan.java @@ -10,7 +10,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.work.builder.factory.impl.ElasticsearchWorkBuilderFactory; import org.hibernate.search.backend.elasticsearch.work.impl.SingleDocumentWork; import org.hibernate.search.engine.backend.common.spi.EntityReferenceFactory; @@ -28,7 +28,7 @@ public class ElasticsearchIndexIndexingPlan implements IndexIndexingPlan { private final ElasticsearchWorkBuilderFactory builderFactory; - private final ElasticsearchWorkOrchestrator orchestrator; + private final ElasticsearchSerialWorkOrchestrator orchestrator; private final WorkExecutionIndexManagerContext indexManagerContext; private final String tenantId; private final EntityReferenceFactory entityReferenceFactory; @@ -37,7 +37,7 @@ public class ElasticsearchIndexIndexingPlan implements IndexIndexingPlan { private final List works = new ArrayList<>(); public ElasticsearchIndexIndexingPlan(ElasticsearchWorkBuilderFactory builderFactory, - ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchSerialWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, BackendSessionContext sessionContext, EntityReferenceFactory entityReferenceFactory, diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecution.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecution.java index 5b006ca2e33..f428f8655da 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecution.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecution.java @@ -9,7 +9,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.work.impl.SingleDocumentWork; import org.hibernate.search.engine.backend.common.spi.EntityReferenceFactory; import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexingPlanExecutionReport; @@ -22,13 +22,13 @@ */ class ElasticsearchIndexIndexingPlanExecution { - private final ElasticsearchWorkOrchestrator orchestrator; + private final ElasticsearchSerialWorkOrchestrator orchestrator; private final EntityReferenceFactory entityReferenceFactory; private final List works; private final CompletableFuture[] futures; - ElasticsearchIndexIndexingPlanExecution(ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchIndexIndexingPlanExecution(ElasticsearchSerialWorkOrchestrator orchestrator, EntityReferenceFactory entityReferenceFactory, List works) { this.orchestrator = orchestrator; diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexWorkspace.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexWorkspace.java index ca7b560f09e..764d534ae5e 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexWorkspace.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexWorkspace.java @@ -11,7 +11,7 @@ import org.hibernate.search.backend.elasticsearch.lowlevel.query.impl.Queries; import org.hibernate.search.backend.elasticsearch.multitenancy.impl.MultiTenancyStrategy; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.util.spi.URLEncodedString; import org.hibernate.search.backend.elasticsearch.work.builder.factory.impl.ElasticsearchWorkBuilderFactory; import org.hibernate.search.engine.backend.work.execution.spi.IndexWorkspace; @@ -24,12 +24,12 @@ public class ElasticsearchIndexWorkspace implements IndexWorkspace { private final ElasticsearchWorkBuilderFactory builderFactory; private final MultiTenancyStrategy multiTenancyStrategy; - private final ElasticsearchWorkOrchestrator orchestrator; + private final ElasticsearchParallelWorkOrchestrator orchestrator; private final URLEncodedString indexName; private final DetachedBackendSessionContext sessionContext; public ElasticsearchIndexWorkspace(ElasticsearchWorkBuilderFactory builderFactory, - MultiTenancyStrategy multiTenancyStrategy, ElasticsearchWorkOrchestrator orchestrator, + MultiTenancyStrategy multiTenancyStrategy, ElasticsearchParallelWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, DetachedBackendSessionContext sessionContext) { this.builderFactory = builderFactory; diff --git a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/WorkExecutionBackendContext.java b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/WorkExecutionBackendContext.java index 25d6c269d92..d333fb3caa6 100644 --- a/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/WorkExecutionBackendContext.java +++ b/backend/elasticsearch/src/main/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/WorkExecutionBackendContext.java @@ -6,7 +6,7 @@ */ package org.hibernate.search.backend.elasticsearch.work.execution.impl; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator; import org.hibernate.search.engine.backend.common.spi.EntityReferenceFactory; import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy; import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexer; @@ -29,18 +29,17 @@ public interface WorkExecutionBackendContext { IndexIndexingPlan createIndexingPlan( - ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchSerialWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, BackendSessionContext sessionContext, EntityReferenceFactory entityReferenceFactory, DocumentRefreshStrategy refreshStrategy); IndexIndexer createIndexer( - ElasticsearchWorkOrchestrator orchestrator, + ElasticsearchSerialWorkOrchestrator orchestrator, WorkExecutionIndexManagerContext indexManagerContext, BackendSessionContext sessionContext); - IndexWorkspace createWorkspace(ElasticsearchWorkOrchestrator orchestrator, - WorkExecutionIndexManagerContext indexManagerContext, + IndexWorkspace createWorkspace(WorkExecutionIndexManagerContext indexManagerContext, DetachedBackendSessionContext sessionContext); } diff --git a/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSerialWorkProcessorTest.java b/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWorkProcessorTest.java similarity index 92% rename from backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSerialWorkProcessorTest.java rename to backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWorkProcessorTest.java index 57e3c8408cb..a2e287ef832 100644 --- a/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchSerialWorkProcessorTest.java +++ b/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchBatchedWorkProcessorTest.java @@ -24,7 +24,7 @@ import org.easymock.IAnswer; -public class ElasticsearchSerialWorkProcessorTest extends EasyMockSupport { +public class ElasticsearchBatchedWorkProcessorTest extends EasyMockSupport { /** * @return A value that should not matter, because it should not be used. @@ -49,8 +49,8 @@ public void simple_singleWork() { CompletableFuture sequenceFuture = new CompletableFuture<>(); replayAll(); - ElasticsearchSerialWorkProcessor processor = - new ElasticsearchSerialWorkProcessor( sequenceBuilderMock, bulkerMock ); + ElasticsearchBatchedWorkProcessor processor = + new ElasticsearchBatchedWorkProcessor( sequenceBuilderMock, bulkerMock ); verifyAll(); resetAll(); @@ -95,8 +95,8 @@ public void simple_multipleWorks() { CompletableFuture sequenceFuture = new CompletableFuture<>(); replayAll(); - ElasticsearchSerialWorkProcessor processor = - new ElasticsearchSerialWorkProcessor( sequenceBuilderMock, bulkerMock ); + ElasticsearchBatchedWorkProcessor processor = + new ElasticsearchBatchedWorkProcessor( sequenceBuilderMock, bulkerMock ); verifyAll(); resetAll(); @@ -147,8 +147,8 @@ public void newSequenceBetweenBatches() { CompletableFuture sequence2Future = new CompletableFuture<>(); replayAll(); - ElasticsearchSerialWorkProcessor processor = - new ElasticsearchSerialWorkProcessor( sequenceBuilderMock, bulkerMock ); + ElasticsearchBatchedWorkProcessor processor = + new ElasticsearchBatchedWorkProcessor( sequenceBuilderMock, bulkerMock ); verifyAll(); resetAll(); @@ -218,8 +218,8 @@ public void newBulkIfNonBulkable() { CompletableFuture sequenceFuture = new CompletableFuture<>(); replayAll(); - ElasticsearchSerialWorkProcessor processor = - new ElasticsearchSerialWorkProcessor( sequenceBuilderMock, bulkerMock ); + ElasticsearchBatchedWorkProcessor processor = + new ElasticsearchBatchedWorkProcessor( sequenceBuilderMock, bulkerMock ); verifyAll(); resetAll(); @@ -259,7 +259,7 @@ public void newBulkIfNonBulkable() { checkComplete( processor ); } - private void checkComplete(ElasticsearchSerialWorkProcessor processor) { + private void checkComplete(ElasticsearchBatchedWorkProcessor processor) { resetAll(); replayAll(); processor.complete(); diff --git a/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkProcessorTest.java b/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkProcessorTest.java deleted file mode 100644 index a7826767e17..00000000000 --- a/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/orchestration/impl/ElasticsearchParallelWorkProcessorTest.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Hibernate Search, full-text search for your domain model - * - * License: GNU Lesser General Public License (LGPL), version 2.1 or later - * See the lgpl.txt file in the root directory or . - */ -package org.hibernate.search.backend.elasticsearch.orchestration.impl; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.getCurrentArguments; -import static org.hibernate.search.util.impl.test.FutureAssert.assertThat; - -import java.util.concurrent.CompletableFuture; - -import org.hibernate.search.backend.elasticsearch.work.impl.BulkableWork; -import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork; -import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkAggregator; - -import org.junit.Before; -import org.junit.Test; - -import org.easymock.EasyMockSupport; -import org.easymock.IAnswer; - - -public class ElasticsearchParallelWorkProcessorTest extends EasyMockSupport { - - /** - * @return A value that should not matter, because it should not be used. - */ - private static T unusedReturnValue() { - return null; - } - - private ElasticsearchWorkSequenceBuilder sequenceBuilderMock; - private ElasticsearchWorkBulker bulkerMock; - - @Before - public void initMocks() { - sequenceBuilderMock = createStrictMock( ElasticsearchWorkSequenceBuilder.class ); - bulkerMock = createStrictMock( ElasticsearchWorkBulker.class ); - } - - @Test - public void simple_singleWork() { - NonBulkableWork work = work( 1 ); - - CompletableFuture sequenceFuture = new CompletableFuture<>(); - - replayAll(); - ElasticsearchParallelWorkProcessor processor = - new ElasticsearchParallelWorkProcessor( sequenceBuilderMock, bulkerMock ); - verifyAll(); - - resetAll(); - bulkerMock.reset(); - replayAll(); - processor.beginBatch(); - verifyAll(); - - CompletableFuture workFuture = new CompletableFuture<>(); - resetAll(); - sequenceBuilderMock.init( anyObject() ); - expect( work.aggregate( anyObject() ) ).andAnswer( nonBulkableAggregateAnswer( work ) ); - expect( sequenceBuilderMock.addNonBulkExecution( work ) ).andReturn( workFuture ); - expect( sequenceBuilderMock.build() ).andReturn( sequenceFuture ); - replayAll(); - CompletableFuture returnedWorkFuture = processor.submit( work ); - verifyAll(); - assertThat( returnedWorkFuture ).isSameAs( workFuture ); - - resetAll(); - bulkerMock.finalizeBulkWork(); - replayAll(); - CompletableFuture batchFuture = processor.endBatch(); - verifyAll(); - assertThat( batchFuture ).isPending(); - - resetAll(); - replayAll(); - sequenceFuture.complete( null ); - verifyAll(); - assertThat( batchFuture ).isSuccessful(); - - checkComplete( processor ); - } - - @Test - public void simple_multipleWorks() { - NonBulkableWork work1 = work( 1 ); - BulkableWork work2 = bulkableWork( 2 ); - - CompletableFuture sequence1Future = new CompletableFuture<>(); - CompletableFuture sequence2Future = new CompletableFuture<>(); - - replayAll(); - ElasticsearchParallelWorkProcessor processor = - new ElasticsearchParallelWorkProcessor( sequenceBuilderMock, bulkerMock ); - verifyAll(); - - resetAll(); - bulkerMock.reset(); - replayAll(); - processor.beginBatch(); - verifyAll(); - - CompletableFuture work1Future = new CompletableFuture<>(); - CompletableFuture work2Future = new CompletableFuture<>(); - resetAll(); - sequenceBuilderMock.init( anyObject() ); - expect( work1.aggregate( anyObject() ) ).andAnswer( nonBulkableAggregateAnswer( work1 ) ); - expect( sequenceBuilderMock.addNonBulkExecution( work1 ) ).andReturn( work1Future ); - expect( sequenceBuilderMock.build() ).andReturn( sequence1Future ); - sequenceBuilderMock.init( anyObject() ); - expect( work2.aggregate( anyObject() ) ).andAnswer( bulkableAggregateAnswer( work2 ) ); - expect( bulkerMock.add( work2 ) ).andReturn( work2Future ); - expect( sequenceBuilderMock.build() ).andReturn( sequence2Future ); - replayAll(); - CompletableFuture returnedWork1Future = processor.submit( work1 ); - CompletableFuture returnedWork2Future = processor.submit( work2 ); - verifyAll(); - assertThat( returnedWork1Future ).isSameAs( work1Future ); - assertThat( returnedWork2Future ).isSameAs( work2Future ); - - resetAll(); - bulkerMock.finalizeBulkWork(); - replayAll(); - CompletableFuture batchFuture = processor.endBatch(); - verifyAll(); - assertThat( batchFuture ).isPending(); - - resetAll(); - replayAll(); - sequence2Future.complete( null ); - verifyAll(); - assertThat( batchFuture ).isPending(); - - resetAll(); - replayAll(); - sequence1Future.complete( null ); - verifyAll(); - assertThat( batchFuture ).isSuccessful(); - - checkComplete( processor ); - } - - @Test - public void reuseBulkAcrossSequences() { - BulkableWork work1 = bulkableWork( 1 ); - - BulkableWork work2 = bulkableWork( 2 ); - - CompletableFuture sequence1Future = new CompletableFuture<>(); - CompletableFuture sequence2Future = new CompletableFuture<>(); - - replayAll(); - ElasticsearchParallelWorkProcessor processor = - new ElasticsearchParallelWorkProcessor( sequenceBuilderMock, bulkerMock ); - verifyAll(); - - resetAll(); - bulkerMock.reset(); - replayAll(); - processor.beginBatch(); - verifyAll(); - - resetAll(); - sequenceBuilderMock.init( anyObject() ); - expect( work1.aggregate( anyObject() ) ).andAnswer( bulkableAggregateAnswer( work1 ) ); - expect( bulkerMock.add( work1 ) ).andReturn( unusedReturnValue() ); - expect( sequenceBuilderMock.build() ).andReturn( sequence1Future ); - replayAll(); - processor.submit( work1 ); - verifyAll(); - - resetAll(); - sequenceBuilderMock.init( anyObject() ); - expect( work2.aggregate( anyObject() ) ).andAnswer( bulkableAggregateAnswer( work2 ) ); - expect( bulkerMock.add( work2 ) ).andReturn( unusedReturnValue() ); - expect( sequenceBuilderMock.build() ).andReturn( sequence2Future ); - replayAll(); - processor.submit( work2 ); - verifyAll(); - - resetAll(); - bulkerMock.finalizeBulkWork(); - replayAll(); - CompletableFuture batchFuture = processor.endBatch(); - verifyAll(); - assertThat( batchFuture ).isPending(); - - resetAll(); - replayAll(); - sequence2Future.complete( null ); - verifyAll(); - assertThat( batchFuture ).isPending(); - - resetAll(); - replayAll(); - sequence1Future.complete( null ); - verifyAll(); - assertThat( batchFuture ).isSuccessful(); - - checkComplete( processor ); - } - - private void checkComplete(ElasticsearchParallelWorkProcessor processor) { - resetAll(); - replayAll(); - processor.complete(); - verifyAll(); - } - - private NonBulkableWork work(int index) { - return createStrictMock( "work" + index, NonBulkableWork.class ); - } - - private BulkableWork bulkableWork(int index) { - return createStrictMock( "bulkableWork" + index, BulkableWork.class ); - } - - private IAnswer> nonBulkableAggregateAnswer(NonBulkableWork mock) { - return () -> { - ElasticsearchWorkAggregator aggregator = (ElasticsearchWorkAggregator) getCurrentArguments()[0]; - return aggregator.addNonBulkable( mock ); - }; - } - - private IAnswer> bulkableAggregateAnswer(BulkableWork mock) { - return () -> { - ElasticsearchWorkAggregator aggregator = (ElasticsearchWorkAggregator) getCurrentArguments()[0]; - return aggregator.addBulkable( mock ); - }; - } -} diff --git a/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecutionTest.java b/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecutionTest.java index 1bd60c823ca..1cbb3a38eb2 100644 --- a/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecutionTest.java +++ b/backend/elasticsearch/src/test/java/org/hibernate/search/backend/elasticsearch/work/execution/impl/ElasticsearchIndexIndexingPlanExecutionTest.java @@ -15,7 +15,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; -import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator; +import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator; import org.hibernate.search.backend.elasticsearch.work.impl.SingleDocumentWork; import org.hibernate.search.engine.backend.common.spi.EntityReferenceFactory; import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexingPlanExecutionReport; @@ -33,7 +33,7 @@ public class ElasticsearchIndexIndexingPlanExecutionTest extends EasyMockSupport private static final String TYPE_NAME = "SomeTypeName"; - private final ElasticsearchWorkOrchestrator orchestratorMock = createStrictMock( ElasticsearchWorkOrchestrator.class ); + private final ElasticsearchSerialWorkOrchestrator orchestratorMock = createStrictMock( ElasticsearchSerialWorkOrchestrator.class ); private final EntityReferenceFactory entityReferenceFactoryMock = createStrictMock( EntityReferenceFactory.class );