Skip to content

Commit

Permalink
HSEARCH-3872 Do not batch Elasticsearch works when executing them in …
Browse files Browse the repository at this point in the history
…parallel

No need for a batching orchestrator, as we don't ever bulk these works
and we always expect the user to wait on the works to be finished before
stopping the application.
  • Loading branch information
yrodiere committed Mar 30, 2020
1 parent 11a7019 commit f150c2e
Show file tree
Hide file tree
Showing 27 changed files with 332 additions and 804 deletions.
Expand Up @@ -23,8 +23,8 @@
import org.hibernate.search.backend.elasticsearch.logging.impl.Log;
import org.hibernate.search.backend.elasticsearch.mapping.impl.TypeNameMapping;
import org.hibernate.search.backend.elasticsearch.multitenancy.impl.MultiTenancyStrategy;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSimpleWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorImplementor;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorProvider;
import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads;
import org.hibernate.search.backend.elasticsearch.types.dsl.provider.impl.ElasticsearchIndexFieldTypeFactoryProvider;
import org.hibernate.search.backend.elasticsearch.util.spi.URLEncodedString;
Expand Down Expand Up @@ -55,15 +55,14 @@ class ElasticsearchBackendImpl implements BackendImplementor,
private final BackendThreads threads;
private final ElasticsearchLinkImpl link;

private final ElasticsearchWorkOrchestratorProvider orchestratorProvider;
private final ElasticsearchSimpleWorkOrchestrator generalPurposeOrchestrator;

private final ElasticsearchIndexFieldTypeFactoryProvider typeFactoryProvider;
private final ElasticsearchAnalysisDefinitionRegistry analysisDefinitionRegistry;
private final MultiTenancyStrategy multiTenancyStrategy;
private final BeanHolder<? extends IndexLayoutStrategy> indexLayoutStrategyHolder;
private final TypeNameMapping typeNameMapping;

private final ElasticsearchWorkOrchestratorImplementor queryOrchestrator;

private final EventContext eventContext;

private final IndexManagerBackendContext indexManagerBackendContext;
Expand All @@ -83,28 +82,25 @@ class ElasticsearchBackendImpl implements BackendImplementor,
this.threads = threads;
this.link = link;

this.orchestratorProvider = new ElasticsearchWorkOrchestratorProvider(
"Elasticsearch parallel work orchestrator for backend " + name,
threads, link,
failureHandler
this.generalPurposeOrchestrator = new ElasticsearchSimpleWorkOrchestrator(
"Elasticsearch general purpose orchestrator for backend " + name,
link
);
this.analysisDefinitionRegistry = analysisDefinitionRegistry;
this.multiTenancyStrategy = multiTenancyStrategy;
this.typeFactoryProvider = typeFactoryProvider;
this.indexLayoutStrategyHolder = indexLayoutStrategyHolder;
this.typeNameMapping = typeNameMapping;

this.queryOrchestrator = orchestratorProvider.createParallelOrchestrator( "Elasticsearch query orchestrator for backend " + name );

this.eventContext = EventContexts.fromBackendName( name );
this.indexManagerBackendContext = new IndexManagerBackendContext(
eventContext, link,
eventContext, threads, link,
userFacingGson,
multiTenancyStrategy,
indexLayoutStrategyHolder.get(),
typeNameMapping,
orchestratorProvider,
queryOrchestrator
failureHandler,
generalPurposeOrchestrator
);
this.indexNamesRegistry = new IndexNamesRegistry();
}
Expand All @@ -122,24 +118,18 @@ public String toString() {
public void start(BackendStartContext context) {
threads.onStart( context.getThreadPoolProvider() );
link.onStart( context.getConfigurationPropertySource() );
orchestratorProvider.start();
queryOrchestrator.start();
generalPurposeOrchestrator.start();
}

@Override
public CompletableFuture<?> preStop() {
return CompletableFuture.allOf(
queryOrchestrator.preStop(),
orchestratorProvider.preStop()
);
return generalPurposeOrchestrator.preStop();
}

@Override
public void stop() {
try ( Closer<IOException> closer = new Closer<>() ) {
closer.push( ElasticsearchWorkOrchestratorImplementor::stop, queryOrchestrator );
closer.push( ElasticsearchWorkOrchestratorProvider::stop, orchestratorProvider );
// Close the client after the orchestrators, when we're sure all works have been performed
closer.push( ElasticsearchWorkOrchestratorImplementor::stop, generalPurposeOrchestrator );
closer.push( ElasticsearchLinkImpl::onStop, link );
closer.push( BeanHolder::close, indexLayoutStrategyHolder );
closer.push( BackendThreads::onStop, threads );
Expand Down
Expand Up @@ -17,6 +17,7 @@
import org.hibernate.search.backend.elasticsearch.document.impl.ElasticsearchDocumentObjectBuilder;
import org.hibernate.search.backend.elasticsearch.document.model.impl.ElasticsearchIndexModel;
import org.hibernate.search.backend.elasticsearch.index.ElasticsearchIndexManager;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchBatchingWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.schema.management.impl.ElasticsearchIndexSchemaManager;
import org.hibernate.search.backend.elasticsearch.schema.management.impl.ElasticsearchIndexLifecycleExecutionOptions;
import org.hibernate.search.backend.elasticsearch.logging.impl.Log;
Expand Down Expand Up @@ -77,8 +78,7 @@ class ElasticsearchIndexManagerImpl implements IndexManagerImplementor,
private final ElasticsearchIndexModel model;
private final List<DocumentMetadataContributor> documentMetadataContributors;

private final ElasticsearchWorkOrchestratorImplementor serialOrchestrator;
private final ElasticsearchWorkOrchestratorImplementor parallelOrchestrator;
private final ElasticsearchBatchingWorkOrchestrator indexingOrchestrator;

private ElasticsearchIndexSchemaManager schemaManager;

Expand All @@ -88,8 +88,7 @@ class ElasticsearchIndexManagerImpl implements IndexManagerImplementor,
this.backendContext = backendContext;
this.model = model;
this.documentMetadataContributors = documentMetadataContributors;
this.parallelOrchestrator = backendContext.createParallelOrchestrator( model.getHibernateSearchIndexName() );
this.serialOrchestrator = backendContext.createSerialOrchestrator( model.getHibernateSearchIndexName() );
this.indexingOrchestrator = backendContext.createIndexingOrchestrator( model.getHibernateSearchIndexName() );
}

@Override
Expand Down Expand Up @@ -122,30 +121,24 @@ model, createLifecycleExecutionOptions( context.getConfigurationPropertySource()
}
);

serialOrchestrator.start();
parallelOrchestrator.start();
indexingOrchestrator.start();
}
catch (RuntimeException e) {
new SuppressingCloser( e )
.push( ElasticsearchWorkOrchestratorImplementor::stop, parallelOrchestrator )
.push( ElasticsearchWorkOrchestratorImplementor::stop, serialOrchestrator );
.push( ElasticsearchWorkOrchestratorImplementor::stop, indexingOrchestrator );
throw e;
}
}

@Override
public CompletableFuture<?> preStop() {
return CompletableFuture.allOf(
serialOrchestrator.preStop(),
parallelOrchestrator.preStop()
);
return indexingOrchestrator.preStop();
}

@Override
public void stop() {
try ( Closer<IOException> closer = new Closer<>() ) {
closer.push( ElasticsearchWorkOrchestratorImplementor::stop, serialOrchestrator );
closer.push( ElasticsearchWorkOrchestratorImplementor::stop, parallelOrchestrator );
closer.push( ElasticsearchWorkOrchestratorImplementor::stop, indexingOrchestrator );
schemaManager = null;
}
catch (IOException e) {
Expand Down Expand Up @@ -197,7 +190,7 @@ public <R> IndexIndexingPlan<R> createIndexingPlan(BackendSessionContext session
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
// The commit strategy is ignored, because Elasticsearch always commits changes to its transaction log.
return backendContext.createIndexingPlan(
serialOrchestrator,
indexingOrchestrator,
this,
sessionContext,
entityReferenceFactory,
Expand All @@ -208,14 +201,14 @@ public <R> IndexIndexingPlan<R> createIndexingPlan(BackendSessionContext session
@Override
public IndexIndexer createIndexer(BackendSessionContext sessionContext) {
return backendContext.createIndexer(
serialOrchestrator, this, sessionContext
indexingOrchestrator, this, sessionContext
);
}

@Override
public IndexWorkspace createWorkspace(DetachedBackendSessionContext sessionContext) {
return backendContext.createWorkspace(
parallelOrchestrator, this, sessionContext
this, sessionContext
);
}

Expand Down
Expand Up @@ -6,6 +6,9 @@
*/
package org.hibernate.search.backend.elasticsearch.index.impl;

import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchBatchingWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchParallelWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads;
import org.hibernate.search.backend.elasticsearch.schema.management.impl.ElasticsearchIndexLifecycleExecutionOptions;
import org.hibernate.search.backend.elasticsearch.index.layout.IndexLayoutStrategy;
import org.hibernate.search.backend.elasticsearch.document.model.impl.ElasticsearchIndexModel;
Expand All @@ -15,9 +18,7 @@
import org.hibernate.search.backend.elasticsearch.lowlevel.index.impl.IndexMetadata;
import org.hibernate.search.backend.elasticsearch.mapping.impl.TypeNameMapping;
import org.hibernate.search.backend.elasticsearch.multitenancy.impl.MultiTenancyStrategy;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorImplementor;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchWorkOrchestratorProvider;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.scope.model.impl.ElasticsearchScopeModel;
import org.hibernate.search.backend.elasticsearch.search.impl.ElasticsearchSearchContext;
import org.hibernate.search.backend.elasticsearch.search.projection.impl.ElasticsearchSearchProjection;
Expand All @@ -37,6 +38,7 @@
import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexer;
import org.hibernate.search.engine.backend.work.execution.spi.IndexIndexingPlan;
import org.hibernate.search.engine.backend.work.execution.spi.IndexWorkspace;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.engine.search.loading.context.spi.LoadingContextBuilder;
import org.hibernate.search.util.common.reporting.EventContext;

Expand All @@ -45,28 +47,31 @@
public class IndexManagerBackendContext implements SearchBackendContext, WorkExecutionBackendContext {

private final EventContext eventContext;
private final BackendThreads threads;
private final ElasticsearchLink link;
private final Gson userFacingGson;
private final MultiTenancyStrategy multiTenancyStrategy;
private final IndexLayoutStrategy indexLayoutStrategy;
private final ElasticsearchWorkOrchestratorProvider orchestratorProvider;
private final ElasticsearchWorkOrchestrator queryOrchestrator;
private final FailureHandler failureHandler;
private final ElasticsearchParallelWorkOrchestrator generalPurposeOrchestrator;

private final SearchProjectionBackendContext searchProjectionBackendContext;

public IndexManagerBackendContext(EventContext eventContext, ElasticsearchLink link, Gson userFacingGson,
public IndexManagerBackendContext(EventContext eventContext,
BackendThreads threads, ElasticsearchLink link, Gson userFacingGson,
MultiTenancyStrategy multiTenancyStrategy,
IndexLayoutStrategy indexLayoutStrategy,
TypeNameMapping typeNameMapping,
ElasticsearchWorkOrchestratorProvider orchestratorProvider,
ElasticsearchWorkOrchestrator queryOrchestrator) {
FailureHandler failureHandler,
ElasticsearchParallelWorkOrchestrator generalPurposeOrchestrator) {
this.eventContext = eventContext;
this.threads = threads;
this.link = link;
this.userFacingGson = userFacingGson;
this.multiTenancyStrategy = multiTenancyStrategy;
this.indexLayoutStrategy = indexLayoutStrategy;
this.orchestratorProvider = orchestratorProvider;
this.queryOrchestrator = queryOrchestrator;
this.failureHandler = failureHandler;
this.generalPurposeOrchestrator = generalPurposeOrchestrator;

this.searchProjectionBackendContext = new SearchProjectionBackendContext(
typeNameMapping.getTypeNameExtractionHelper(),
Expand All @@ -81,7 +86,7 @@ public String toString() {

@Override
public <R> IndexIndexingPlan<R> createIndexingPlan(
ElasticsearchWorkOrchestrator orchestrator,
ElasticsearchSerialWorkOrchestrator orchestrator,
WorkExecutionIndexManagerContext indexManagerContext,
BackendSessionContext sessionContext, EntityReferenceFactory<R> entityReferenceFactory,
DocumentRefreshStrategy refreshStrategy) {
Expand All @@ -98,7 +103,7 @@ public <R> IndexIndexingPlan<R> createIndexingPlan(

@Override
public IndexIndexer createIndexer(
ElasticsearchWorkOrchestrator orchestrator,
ElasticsearchSerialWorkOrchestrator orchestrator,
WorkExecutionIndexManagerContext indexManagerContext,
BackendSessionContext sessionContext) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );
Expand All @@ -108,14 +113,13 @@ public IndexIndexer createIndexer(
}

@Override
public IndexWorkspace createWorkspace(ElasticsearchWorkOrchestrator orchestrator,
WorkExecutionIndexManagerContext indexManagerContext,
public IndexWorkspace createWorkspace(WorkExecutionIndexManagerContext indexManagerContext,
DetachedBackendSessionContext sessionContext) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );

return new ElasticsearchIndexWorkspace(
link.getWorkBuilderFactory(), multiTenancyStrategy, orchestrator, indexManagerContext,
sessionContext
link.getWorkBuilderFactory(), multiTenancyStrategy, generalPurposeOrchestrator,
indexManagerContext, sessionContext
);
}

Expand Down Expand Up @@ -144,7 +148,7 @@ public <H> ElasticsearchSearchQueryBuilder<H> createSearchQueryBuilder(
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );
return new ElasticsearchSearchQueryBuilder<>(
link.getWorkBuilderFactory(), link.getSearchResultExtractorFactory(),
queryOrchestrator,
generalPurposeOrchestrator,
searchContext, sessionContext, loadingContextBuilder, rootProjection
);
}
Expand All @@ -162,21 +166,17 @@ ElasticsearchIndexSchemaManager createSchemaManager(ElasticsearchIndexModel mode
model.contributeLowLevelMetadata( builder );
IndexMetadata expectedMetadata = builder.build();
return new ElasticsearchIndexSchemaManager(
link.getWorkBuilderFactory(), orchestratorProvider.getRootParallelOrchestrator(),
link.getWorkBuilderFactory(), generalPurposeOrchestrator,
indexLayoutStrategy, model.getNames(), expectedMetadata,
lifecycleExecutionOptions
);
}

ElasticsearchWorkOrchestratorImplementor createSerialOrchestrator(String indexName) {
return orchestratorProvider.createSerialOrchestrator(
"Elasticsearch serial work orchestrator for index " + indexName
);
}

ElasticsearchWorkOrchestratorImplementor createParallelOrchestrator(String indexName) {
return orchestratorProvider.createParallelOrchestrator(
"Elasticsearch parallel work orchestrator for index " + indexName
ElasticsearchBatchingWorkOrchestrator createIndexingOrchestrator(String indexName) {
return new ElasticsearchBatchingWorkOrchestrator(
"Elasticsearch indexing orchestrator for index " + indexName,
threads, link,
failureHandler
);
}

Expand Down
Expand Up @@ -6,28 +6,25 @@
*/
package org.hibernate.search.backend.elasticsearch.orchestration.impl;

import java.util.concurrent.CompletableFuture;

import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWork;
import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink;
import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext;
import org.hibernate.search.engine.backend.orchestration.spi.AbstractWorkOrchestrator;
import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork;

/**
* An abstract base for {@link ElasticsearchWorkOrchestratorImplementor} implementations.
*/
abstract class AbstractElasticsearchWorkOrchestrator
extends AbstractWorkOrchestrator<BatchedWork<ElasticsearchWorkProcessor>>
abstract class AbstractElasticsearchWorkOrchestrator<W>
extends AbstractWorkOrchestrator<W>
implements ElasticsearchWorkOrchestratorImplementor {

AbstractElasticsearchWorkOrchestrator(String name) {
protected final ElasticsearchLink link;

AbstractElasticsearchWorkOrchestrator(String name, ElasticsearchLink link) {
super( name );
this.link = link;
}

@Override
public <T> CompletableFuture<T> submit(ElasticsearchWork<T> work) {
CompletableFuture<T> future = new CompletableFuture<>();
submit( new ElasticsearchBatchedWork<>( work, future ) );
return future;
protected final ElasticsearchWorkExecutionContext createWorkExecutionContext() {
return new ElasticsearchWorkExecutionContextImpl( link.getClient(), link.getGsonProvider() );
}

}
Expand Up @@ -12,7 +12,7 @@
import org.hibernate.search.engine.backend.orchestration.spi.BatchedWork;
import org.hibernate.search.util.common.impl.Futures;

class ElasticsearchBatchedWork<T> implements BatchedWork<ElasticsearchWorkProcessor> {
class ElasticsearchBatchedWork<T> implements BatchedWork<ElasticsearchBatchedWorkProcessor> {
private final ElasticsearchWork<T> work;
private final CompletableFuture<T> future;

Expand All @@ -22,7 +22,7 @@ class ElasticsearchBatchedWork<T> implements BatchedWork<ElasticsearchWorkProces
}

@Override
public void submitTo(ElasticsearchWorkProcessor delegate) {
public void submitTo(ElasticsearchBatchedWorkProcessor delegate) {
delegate.submit( work ).whenComplete( Futures.copyHandler( future ) );
}

Expand Down

0 comments on commit f150c2e

Please sign in to comment.