Skip to content

Commit

Permalink
HSEARCH-2830 Make sure all async works are flushed before we start El…
Browse files Browse the repository at this point in the history
…asticsearch benchmarks
  • Loading branch information
yrodiere authored and Sanne committed Aug 2, 2017
1 parent 3b42391 commit 24cb48e
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 38 deletions.
Expand Up @@ -15,7 +15,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorCreation;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorHelper;
import org.hibernate.search.exception.AssertionFailure;
import org.hibernate.search.util.impl.CollectionHelper;
import org.openjdk.jmh.annotations.Level;
Expand Down Expand Up @@ -88,7 +88,7 @@ public void setup(NonStreamWriteEngineHolder eh, ThreadParams threadParams) {
toUpdate.set( shuffledIds.get( i ) );
}

SearchIntegratorCreation.preindexEntities(
SearchIntegratorHelper.preindexEntities(
eh.getSearchIntegrator(),
eh.getDataset(),
IntStream.concat( toDelete.stream(), toUpdate.stream() )
Expand Down
Expand Up @@ -36,6 +36,7 @@ public static void main(String... args) throws Exception {
.param( "maxResults", "10" )
.param( "worksPerChangeset", "2;4" )
.param( "changesetsPerFlush", "50" )
.param( "streamedAddsPerFlush", "300" )
.forks( 0 ) //To simplify debugging; Remember this implies JVM parameters via @Fork won't be applied.
.build();

Expand Down
Expand Up @@ -10,12 +10,9 @@
import java.net.URISyntaxException;
import java.util.stream.IntStream;

import org.hibernate.search.backend.FlushLuceneWork;
import org.hibernate.search.engineperformance.elasticsearch.datasets.Dataset;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.DatasetCreation;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorCreation;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.IndexedTypeIdentifier;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorHelper;
import org.hibernate.search.spi.SearchIntegrator;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
Expand Down Expand Up @@ -68,9 +65,9 @@ public class NonStreamWriteEngineHolder extends BaseIndexSetup {

@Setup
public void initializeState() throws IOException, URISyntaxException {
si = SearchIntegratorCreation.createIntegrator( client, getConnectionInfo(), refreshAfterWrite, workerExecution );
si = SearchIntegratorHelper.createIntegrator( client, getConnectionInfo(), refreshAfterWrite, workerExecution );
data = DatasetCreation.createDataset( dataset, pickCacheDirectory() );
SearchIntegratorCreation.preindexEntities( si, data, IntStream.range( 0, indexSize ) );
SearchIntegratorHelper.preindexEntities( si, data, IntStream.range( 0, indexSize ) );

String[] worksPerChangesetSplit = worksPerChangeset.split( ";" );
addsDeletesPerChangeset = Integer.parseInt( worksPerChangesetSplit[0] );
Expand Down Expand Up @@ -105,12 +102,6 @@ public int getQueryMaxResults() {
return maxResults;
}

public void flush(IndexedTypeIdentifier typeId) {
for ( IndexManager indexManager : si.getIndexBinding( typeId ).getIndexManagerSelector().all() ) {
indexManager.performStreamOperation( new FlushLuceneWork( null, typeId ), null, false );
}
}

@TearDown
public void shutdownIndexingEngine() throws IOException {
if ( si != null ) {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import org.hibernate.search.backend.spi.Worker;
import org.hibernate.search.engineperformance.elasticsearch.datasets.Dataset;
import org.hibernate.search.engineperformance.elasticsearch.model.BookEntity;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorHelper;
import org.hibernate.search.query.engine.spi.EntityInfo;
import org.hibernate.search.query.engine.spi.HSQuery;
import org.hibernate.search.spi.SearchIntegrator;
Expand Down Expand Up @@ -52,7 +53,8 @@ public class NonStreamWriteJMHBenchmarks {
@Benchmark
@Threads(20)
public void write(NonStreamWriteEngineHolder eh, ChangesetGenerator changesetGenerator, NonStreamWriteCounters counters) {
Worker worker = eh.getSearchIntegrator().getWorker();
SearchIntegrator si = eh.getSearchIntegrator();
Worker worker = si.getWorker();
Dataset dataset = eh.getDataset();

changesetGenerator.stream().forEach( changeset -> {
Expand All @@ -76,7 +78,7 @@ public void write(NonStreamWriteEngineHolder eh, ChangesetGenerator changesetGen
} );

// Ensure that we'll block until all works have been performed
eh.flush( BookEntity.TYPE_ID );
SearchIntegratorHelper.flush( si, BookEntity.TYPE_ID );
}

@Benchmark
Expand Down
Expand Up @@ -10,12 +10,9 @@
import java.net.URISyntaxException;
import java.util.stream.IntStream;

import org.hibernate.search.backend.FlushLuceneWork;
import org.hibernate.search.engineperformance.elasticsearch.datasets.Dataset;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.DatasetCreation;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorCreation;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.IndexedTypeIdentifier;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorHelper;
import org.hibernate.search.spi.SearchIntegrator;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
Expand Down Expand Up @@ -52,9 +49,9 @@ public class StreamWriteEngineHolder extends BaseIndexSetup {
*/
@Setup(Level.Iteration)
public void initializeState() throws IOException, URISyntaxException {
si = SearchIntegratorCreation.createIntegrator( client, getConnectionInfo(), refreshAfterWrite, null /* irrelevant */ );
si = SearchIntegratorHelper.createIntegrator( client, getConnectionInfo(), refreshAfterWrite, null /* irrelevant */ );
data = DatasetCreation.createDataset( dataset, pickCacheDirectory() );
SearchIntegratorCreation.preindexEntities( si, data, IntStream.range( 0, indexSize ) );
SearchIntegratorHelper.preindexEntities( si, data, IntStream.range( 0, indexSize ) );
}

@TearDown(Level.Iteration)
Expand All @@ -80,10 +77,4 @@ public int getStreamedAddsPerFlush() {
return streamedAddsPerFlush;
}

public void flush(IndexedTypeIdentifier typeId) {
for ( IndexManager indexManager : si.getIndexBinding( typeId ).getIndexManagerSelector().all() ) {
indexManager.performStreamOperation( new FlushLuceneWork( null, typeId ), null, false );
}
}

}
Expand Up @@ -16,6 +16,7 @@
import org.hibernate.search.engine.spi.DocumentBuilderIndexedEntity;
import org.hibernate.search.engineperformance.elasticsearch.datasets.Dataset;
import org.hibernate.search.engineperformance.elasticsearch.model.BookEntity;
import org.hibernate.search.engineperformance.elasticsearch.setuputilities.SearchIntegratorHelper;
import org.hibernate.search.spi.InstanceInitializer;
import org.hibernate.search.spi.SearchIntegrator;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void write(StreamWriteEngineHolder eh, StreamAddIdGenerator idGenerator,
} );

// Ensure that we'll block until all works have been performed
eh.flush( BookEntity.TYPE_ID );
SearchIntegratorHelper.flush( si, BookEntity.TYPE_ID );
}

}
Expand Up @@ -6,6 +6,13 @@
*/
package org.hibernate.search.engineperformance.elasticsearch.model;

import org.apache.lucene.analysis.core.LowerCaseFilterFactory;
import org.apache.lucene.analysis.snowball.SnowballPorterFilterFactory;
import org.apache.lucene.analysis.standard.StandardTokenizerFactory;
import org.hibernate.search.annotations.Analyze;
import org.hibernate.search.annotations.AnalyzerDef;
import org.hibernate.search.annotations.DocumentId;
import org.hibernate.search.annotations.Field;
import org.hibernate.search.annotations.Indexed;
import org.hibernate.search.annotations.NumericField;
import org.hibernate.search.annotations.Parameter;
Expand All @@ -15,13 +22,6 @@
import org.hibernate.search.annotations.TokenizerDef;
import org.hibernate.search.spi.IndexedTypeIdentifier;
import org.hibernate.search.spi.impl.PojoIndexedTypeIdentifier;
import org.apache.lucene.analysis.core.LowerCaseFilterFactory;
import org.apache.lucene.analysis.snowball.SnowballPorterFilterFactory;
import org.apache.lucene.analysis.standard.StandardTokenizerFactory;
import org.hibernate.search.annotations.Analyze;
import org.hibernate.search.annotations.AnalyzerDef;
import org.hibernate.search.annotations.DocumentId;
import org.hibernate.search.annotations.Field;

@Indexed
@AnalyzerDef(name = "textAnalyzer", tokenizer = @TokenizerDef(factory = StandardTokenizerFactory.class) , filters = {
Expand Down
Expand Up @@ -9,21 +9,24 @@
import java.util.function.IntConsumer;
import java.util.stream.IntStream;

import org.hibernate.search.backend.FlushLuceneWork;
import org.hibernate.search.backend.spi.Work;
import org.hibernate.search.backend.spi.WorkType;
import org.hibernate.search.backend.spi.Worker;
import org.hibernate.search.elasticsearch.client.impl.ElasticsearchClientFactory;
import org.hibernate.search.engineperformance.elasticsearch.datasets.Dataset;
import org.hibernate.search.engineperformance.elasticsearch.model.BookEntity;
import org.hibernate.search.engineperformance.elasticsearch.stub.BlackholeElasticsearchClientFactory;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.IndexedTypeIdentifier;
import org.hibernate.search.spi.SearchIntegrator;
import org.hibernate.search.spi.SearchIntegratorBuilder;
import org.hibernate.search.testsupport.setup.SearchConfigurationForTest;
import org.hibernate.search.testsupport.setup.TransactionContextForTest;

public class SearchIntegratorCreation {
public class SearchIntegratorHelper {

private SearchIntegratorCreation() {
private SearchIntegratorHelper() {
//do not construct
}

Expand Down Expand Up @@ -62,9 +65,16 @@ public static void preindexEntities(SearchIntegrator si, Dataset data, IntStream
Indexer indexer = new Indexer( si, data );
idStream.forEach( indexer );
indexer.flush();
flush( si, BookEntity.TYPE_ID );
println( " ... added " + indexer.count + " entities to the index." );
}

public static void flush(SearchIntegrator si, IndexedTypeIdentifier typeId) {
for ( IndexManager indexManager : si.getIndexBinding( typeId ).getIndexManagerSelector().all() ) {
indexManager.performStreamOperation( new FlushLuceneWork( null, typeId ), null, false );
}
}

private static class Indexer implements IntConsumer {
private final Worker worker;
private final Dataset data;
Expand Down

0 comments on commit 24cb48e

Please sign in to comment.