Skip to content

Commit

Permalink
HSEARCH-3118 Take into account the commit and refresh strategy in Luc…
Browse files Browse the repository at this point in the history
…ene writes
  • Loading branch information
yrodiere committed May 22, 2019
1 parent e1e20e6 commit b5db2f0
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 44 deletions.
Expand Up @@ -11,6 +11,8 @@
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneBatchingWriteWorkOrchestrator;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkOrchestratorImplementor;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkProcessor;
import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.index.spi.IndexWorkExecutor;
import org.hibernate.search.engine.backend.index.spi.IndexDocumentWorkExecutor;
import org.hibernate.search.engine.backend.index.spi.IndexWorkPlan;
Expand Down Expand Up @@ -61,11 +63,15 @@ Directory createDirectory(String indexName) throws IOException {

IndexWorkPlan<LuceneRootDocumentBuilder> createWorkPlan(
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext) {
String indexName, SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );

return new LuceneIndexWorkPlan( workFactory, multiTenancyStrategy, orchestrator,
indexName, sessionContext );
return new LuceneIndexWorkPlan(
workFactory, multiTenancyStrategy, orchestrator,
indexName, sessionContext,
commitStrategy, refreshStrategy
);
}

LuceneWriteWorkOrchestratorImplementor createOrchestrator(String indexName, IndexWriter indexWriter) {
Expand All @@ -78,11 +84,15 @@ LuceneWriteWorkOrchestratorImplementor createOrchestrator(String indexName, Inde

IndexDocumentWorkExecutor<LuceneRootDocumentBuilder> createDocumentWorkExecutor(
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext) {
String indexName, SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy) {
multiTenancyStrategy.checkTenantId( sessionContext.getTenantIdentifier(), eventContext );

return new LuceneIndexDocumentWorkExecutor( workFactory, multiTenancyStrategy, orchestrator,
indexName, sessionContext );
return new LuceneIndexDocumentWorkExecutor(
workFactory, multiTenancyStrategy, orchestrator,
indexName, sessionContext,
commitStrategy
);
}

IndexWorkExecutor createWorkExecutor(LuceneWriteWorkOrchestrator orchestrator, String indexName) {
Expand Down
Expand Up @@ -13,6 +13,8 @@
import org.hibernate.search.backend.lucene.multitenancy.impl.MultiTenancyStrategy;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkOrchestrator;
import org.hibernate.search.backend.lucene.work.impl.LuceneWorkFactory;
import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.index.spi.DocumentContributor;
import org.hibernate.search.engine.backend.index.spi.DocumentReferenceProvider;
import org.hibernate.search.engine.backend.index.spi.IndexDocumentWorkExecutor;
Expand All @@ -25,15 +27,18 @@ class LuceneIndexDocumentWorkExecutor implements IndexDocumentWorkExecutor<Lucen
private final LuceneWriteWorkOrchestrator orchestrator;
private final String indexName;
private final String tenantId;
private final DocumentCommitStrategy commitStrategy;

LuceneIndexDocumentWorkExecutor(LuceneWorkFactory factory, MultiTenancyStrategy multiTenancyStrategy,
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext) {
String indexName, SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy) {
this.factory = factory;
this.multiTenancyStrategy = multiTenancyStrategy;
this.orchestrator = orchestrator;
this.indexName = indexName;
this.tenantId = sessionContext.getTenantIdentifier();
this.commitStrategy = commitStrategy;
}

@Override
Expand All @@ -45,6 +50,10 @@ public CompletableFuture<?> add(DocumentReferenceProvider referenceProvider, Doc
documentContributor.contribute( builder );
LuceneIndexEntry indexEntry = builder.build( indexName, multiTenancyStrategy, tenantId, id );

return orchestrator.submit( factory.add( indexName, tenantId, id, routingKey, indexEntry ) );
return orchestrator.submit(
factory.add( indexName, tenantId, id, routingKey, indexEntry ),
commitStrategy,
DocumentRefreshStrategy.NONE
);
}
}
Expand Up @@ -83,15 +83,19 @@ public void start(IndexManagerStartContext context) {
@Override
public IndexWorkPlan<LuceneRootDocumentBuilder> createWorkPlan(SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
// FIXME take the commit strategy and refresh strategy into account
return indexingBackendContext.createWorkPlan( writeOrchestrator, indexName, sessionContext );
return indexingBackendContext.createWorkPlan(
writeOrchestrator, indexName, sessionContext,
commitStrategy, refreshStrategy
);
}

@Override
public IndexDocumentWorkExecutor<LuceneRootDocumentBuilder> createDocumentWorkExecutor(
SessionContextImplementor sessionContext, DocumentCommitStrategy commitStrategy) {
// FIXME take the commit strategy into account
return indexingBackendContext.createDocumentWorkExecutor( writeOrchestrator, indexName, sessionContext );
return indexingBackendContext.createDocumentWorkExecutor(
writeOrchestrator, indexName, sessionContext,
commitStrategy
);
}

@Override
Expand Down
Expand Up @@ -11,6 +11,8 @@
import org.hibernate.search.backend.lucene.multitenancy.impl.MultiTenancyStrategy;
import org.hibernate.search.backend.lucene.orchestration.impl.LuceneWriteWorkOrchestrator;
import org.hibernate.search.backend.lucene.work.impl.LuceneWorkFactory;
import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.index.spi.IndexWorkExecutor;
import org.hibernate.search.util.common.reporting.EventContext;

Expand All @@ -33,17 +35,29 @@ class LuceneIndexWorkExecutor implements IndexWorkExecutor {

@Override
public CompletableFuture<?> optimize() {
return orchestrator.submit( factory.optimize( indexName ) );
return orchestrator.submit(
factory.optimize( indexName ),
DocumentCommitStrategy.FORCE,
DocumentRefreshStrategy.NONE
);
}

@Override
public CompletableFuture<?> purge(String tenantId) {
multiTenancyStrategy.checkTenantId( tenantId, eventContext );
return orchestrator.submit( factory.deleteAll( indexName, tenantId ) );
return orchestrator.submit(
factory.deleteAll( indexName, tenantId ),
DocumentCommitStrategy.FORCE,
DocumentRefreshStrategy.NONE
);
}

@Override
public CompletableFuture<?> flush() {
return orchestrator.submit( factory.flush( indexName ) );
return orchestrator.submit(
factory.flush( indexName ),
DocumentCommitStrategy.FORCE,
DocumentRefreshStrategy.NONE
);
}
}
Expand Up @@ -10,6 +10,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.index.spi.IndexWorkPlan;
import org.hibernate.search.engine.backend.index.spi.DocumentContributor;
import org.hibernate.search.engine.backend.index.spi.DocumentReferenceProvider;
Expand All @@ -32,17 +34,22 @@ class LuceneIndexWorkPlan implements IndexWorkPlan<LuceneRootDocumentBuilder> {
private final LuceneWriteWorkOrchestrator orchestrator;
private final String indexName;
private final String tenantId;
private final DocumentCommitStrategy commitStrategy;
private final DocumentRefreshStrategy refreshStrategy;

private final List<LuceneWriteWork<?>> works = new ArrayList<>();

LuceneIndexWorkPlan(LuceneWorkFactory factory, MultiTenancyStrategy multiTenancyStrategy,
LuceneWriteWorkOrchestrator orchestrator,
String indexName, SessionContextImplementor sessionContext) {
String indexName, SessionContextImplementor sessionContext,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
this.factory = factory;
this.multiTenancyStrategy = multiTenancyStrategy;
this.orchestrator = orchestrator;
this.indexName = indexName;
this.tenantId = sessionContext.getTenantIdentifier();
this.commitStrategy = commitStrategy;
this.refreshStrategy = refreshStrategy;
}

@Override
Expand Down Expand Up @@ -87,7 +94,7 @@ public void prepare() {
@Override
public CompletableFuture<?> execute() {
try {
return orchestrator.submit( works );
return orchestrator.submit( works, commitStrategy, refreshStrategy );
}
finally {
works.clear();
Expand Down
Expand Up @@ -457,4 +457,8 @@ SearchException conflictingIdentifierTypesForPredicate(ToDocumentIdentifierValue
@Message(id = ID_OFFSET_2 + 77,
value = "Document with id '%2$s' does not exist in index '%1$s' and thus its match cannot be explained." )
SearchException explainUnkownDocument(String indexName, String d);

@Message(id = ID_OFFSET_2 + 78,
value = "Unable to optimize.")
SearchException unableToOptimizeIndex(@Param EventContext context, @Cause Exception e);
}
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.backend.lucene.work.impl.LuceneWriteWork;
import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.orchestration.spi.AbstractWorkOrchestrator;
import org.hibernate.search.engine.backend.orchestration.spi.BatchingExecutor;

Expand All @@ -26,16 +28,18 @@ abstract class AbstractLuceneWriteWorkOrchestrator
}

@Override
public CompletableFuture<?> submit(List<LuceneWriteWork<?>> works) {
public CompletableFuture<?> submit(List<LuceneWriteWork<?>> works,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
CompletableFuture<Object> future = new CompletableFuture<>();
submit( new LuceneMultipleWorkSet( works, future ) );
submit( new LuceneMultipleWorkSet( works, future, commitStrategy, refreshStrategy ) );
return future;
}

@Override
public <T> CompletableFuture<T> submit(LuceneWriteWork<T> work) {
public <T> CompletableFuture<T> submit(LuceneWriteWork<T> work,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
CompletableFuture<T> future = new CompletableFuture<>();
submit( new LuceneSingleWorkSet<>( work, future ) );
submit( new LuceneSingleWorkSet<>( work, future, commitStrategy, refreshStrategy ) );
return future;
}

Expand All @@ -45,15 +49,20 @@ interface LuceneWorkSet extends BatchingExecutor.Task<LuceneWriteWorkProcessor>
static class LuceneMultipleWorkSet implements LuceneWorkSet {
private final List<LuceneWriteWork<?>> works;
private final CompletableFuture<Object> future;
private final DocumentCommitStrategy commitStrategy;
private final DocumentRefreshStrategy refreshStrategy;

LuceneMultipleWorkSet(List<LuceneWriteWork<?>> works, CompletableFuture<Object> future) {
LuceneMultipleWorkSet(List<LuceneWriteWork<?>> works, CompletableFuture<Object> future,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
this.works = new ArrayList<>( works );
this.future = future;
this.commitStrategy = commitStrategy;
this.refreshStrategy = refreshStrategy;
}

@Override
public void submitTo(LuceneWriteWorkProcessor processor) {
processor.beforeWorkSet();
processor.beforeWorkSet( commitStrategy, refreshStrategy );
for ( LuceneWriteWork<?> work : works ) {
processor.submit( work );
}
Expand All @@ -69,15 +78,20 @@ public void markAsFailed(Throwable t) {
static class LuceneSingleWorkSet<T> implements LuceneWorkSet {
private final LuceneWriteWork<T> work;
private final CompletableFuture<T> future;
private final DocumentCommitStrategy commitStrategy;
private final DocumentRefreshStrategy refreshStrategy;

LuceneSingleWorkSet(LuceneWriteWork<T> work, CompletableFuture<T> future) {
LuceneSingleWorkSet(LuceneWriteWork<T> work, CompletableFuture<T> future,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
this.work = work;
this.future = future;
this.commitStrategy = commitStrategy;
this.refreshStrategy = refreshStrategy;
}

@Override
public void submitTo(LuceneWriteWorkProcessor processor) {
processor.beforeWorkSet();
processor.beforeWorkSet( commitStrategy, refreshStrategy );
T result = processor.submit( work );
processor.afterWorkSet( future, result );
}
Expand Down
Expand Up @@ -10,14 +10,18 @@
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.backend.lucene.work.impl.LuceneWriteWork;
import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;

/**
* @author Guillaume Smet
*/
public interface LuceneWriteWorkOrchestrator {

<T> CompletableFuture<T> submit(LuceneWriteWork<T> work);
<T> CompletableFuture<T> submit(LuceneWriteWork<T> work,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy);

CompletableFuture<?> submit(List<LuceneWriteWork<?>> work);
CompletableFuture<?> submit(List<LuceneWriteWork<?>> work,
DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy);

}
Expand Up @@ -12,6 +12,8 @@

import org.hibernate.search.backend.lucene.logging.impl.Log;
import org.hibernate.search.backend.lucene.work.impl.LuceneWriteWork;
import org.hibernate.search.engine.backend.index.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.index.DocumentRefreshStrategy;
import org.hibernate.search.engine.backend.orchestration.spi.BatchingExecutor;
import org.hibernate.search.engine.common.spi.ContextualErrorHandler;
import org.hibernate.search.engine.common.spi.ErrorHandler;
Expand All @@ -33,8 +35,11 @@ public class LuceneWriteWorkProcessor implements BatchingExecutor.Processor {
private final LuceneWriteWorkExecutionContextImpl context;
private final ErrorHandler errorHandler;

private boolean hasUncommittedWorks;

private Throwable workSetFailure;
private ContextualErrorHandler workSetContextualErrorHandler;
private boolean workSetForcesCommit;

public LuceneWriteWorkProcessor(EventContext indexEventContext, IndexWriter indexWriter, ErrorHandler errorHandler) {
this.indexEventContext = indexEventContext;
Expand All @@ -49,19 +54,29 @@ public void beginBatch() {

@Override
public CompletableFuture<?> endBatch() {
// FIXME move the commit here when works do not require immediate commit? (e.g. mass indexing)
try {
commitIfNecessary();
}
catch (RuntimeException e) {
errorHandler.handleException( e.getMessage(), e );
}
// Everything was already executed, so just return a completed future.
return CompletableFuture.completedFuture( null );
}

void beforeWorkSet() {
void beforeWorkSet(DocumentCommitStrategy commitStrategy, DocumentRefreshStrategy refreshStrategy) {
workSetFailure = null;
workSetContextualErrorHandler = null;
workSetForcesCommit = DocumentCommitStrategy.FORCE.equals( commitStrategy )
// We need to commit in order to make the changes visible
// TODO HSEARCH-3117 this may not be true with the NRT implementation from Search 5
|| DocumentRefreshStrategy.FORCE.equals( refreshStrategy );
}

<T> T submit(LuceneWriteWork<T> work) {
if ( workSetFailure == null ) {
try {
hasUncommittedWorks = true;
return work.execute( context );
}
catch (RuntimeException e) {
Expand All @@ -79,10 +94,9 @@ <T> T submit(LuceneWriteWork<T> work) {
}

<T> void afterWorkSet(CompletableFuture<T> future, T resultIfSuccess) {
IndexWriter indexWriter = context.getIndexWriter();
if ( workSetFailure == null ) {
if ( workSetFailure == null && workSetForcesCommit ) {
try {
doCommit( indexWriter );
commitIfNecessary();
}
catch (RuntimeException e) {
workSetFailure = e;
Expand All @@ -101,15 +115,18 @@ <T> void afterWorkSet(CompletableFuture<T> future, T resultIfSuccess) {
}
}

private void doCommit(IndexWriter indexWriter) {
try {
// TODO HSEARCH-3117 restore the commit policy feature to allow scheduled commits?
indexWriter.commit();
}
catch (RuntimeException | IOException e) {
throw log.unableToCommitIndex( indexEventContext, e );
private void commitIfNecessary() {
if ( hasUncommittedWorks ) {
IndexWriter indexWriter = context.getIndexWriter();
try {
// TODO HSEARCH-3117 restore the commit policy feature to allow scheduled commits?
hasUncommittedWorks = false;
indexWriter.commit();
}
catch (RuntimeException | IOException e) {
throw log.unableToCommitIndex( indexEventContext, e );
}
}

}

private ContextualErrorHandler getWorkSetContextualErrorHandler() {
Expand Down

0 comments on commit b5db2f0

Please sign in to comment.