Skip to content

Commit

Permalink
HSEARCH-2491 Flush per entity support for ElasticsearchIndexManager
Browse files Browse the repository at this point in the history
  • Loading branch information
gustavonalle authored and Sanne committed Dec 14, 2016
1 parent 26170ef commit b9ddf54
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
Expand Up @@ -8,13 +8,12 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.similarities.Similarity;
import org.hibernate.search.backend.FlushLuceneWork;
import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
Expand Down Expand Up @@ -47,6 +46,9 @@
import org.hibernate.search.util.configuration.impl.ConfigurationParseHelper;
import org.hibernate.search.util.logging.impl.LoggerFactory;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.similarities.Similarity;

/**
* An {@link IndexManager} applying indexing work to an Elasticsearch server.
*
Expand Down Expand Up @@ -387,14 +389,23 @@ public void performOperations(List<LuceneWork> workList, IndexingMonitor monitor

@Override
public void performStreamOperation(LuceneWork singleOperation, IndexingMonitor monitor, boolean forceAsync) {
if ( singleOperation == FlushLuceneWork.INSTANCE ) {
BackendRequest<?> request = singleOperation.acceptIndexWorkVisitor( visitor, monitor );
if ( singleOperation instanceof FlushLuceneWork ) {
requestProcessor.awaitAsyncProcessingCompletion();
executeBackendRequest( request, true );
}
else {
BackendRequest<?> request = singleOperation.acceptIndexWorkVisitor( visitor, monitor );
executeBackendRequest( request, false );
}
}

if ( request != null ) {
requestProcessor.executeAsync( request );
private void executeBackendRequest(BackendRequest<?> backendRequest, boolean sync) {
if ( backendRequest != null ) {
if ( sync ) {
requestProcessor.executeSync( Collections.<BackendRequest<?>>singletonList( backendRequest ) );
}
else {
requestProcessor.executeAsync( backendRequest );
}
}
}
Expand Down
Expand Up @@ -10,6 +10,7 @@
import java.util.Locale;
import java.util.Set;

import io.searchbox.indices.Flush;
import org.apache.lucene.document.Document;
import org.apache.lucene.facet.FacetsConfig;
import org.apache.lucene.index.DocValuesType;
Expand Down Expand Up @@ -138,8 +139,18 @@ public BackendRequest<?> visitUpdateWork(UpdateLuceneWork work, IndexingMonitor

@Override
public BackendRequest<?> visitFlushWork(FlushLuceneWork work, IndexingMonitor monitor) {
// Nothing to do
return null;
Flush flush = new Flush.Builder().setParameter( "wait_if_ongoing", "true" )
.addIndex( indexName )
.refresh( true )
.build();
return new BackendRequest<>(
flush,
work,
indexName,
monitor,
NoopBackendRequestSuccessHandler.INSTANCE,
false
);
}

@Override
Expand Down

0 comments on commit b9ddf54

Please sign in to comment.