Skip to content

Commit

Permalink
HSEARCH-1260 Merge the two DocumentBuilder indexing phases in a singl…
Browse files Browse the repository at this point in the history
…e one
  • Loading branch information
Sanne committed Jan 23, 2014
1 parent 89823ef commit 0eff03b
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 246 deletions.
5 changes: 3 additions & 2 deletions orm/src/main/java/org/hibernate/search/MassIndexer.java
Expand Up @@ -61,11 +61,12 @@ public interface MassIndexer {
MassIndexer batchSizeToLoadObjects(int batchSize);

/**
* Sets the number of threads used to load the lazy collections
* related to the indexed entities.
* Deprecated: value is ignored.
* @param numberOfThreads
* @return <tt>this</tt> for method chaining
* @deprecated Being ignored: this method will be removed.
*/
@Deprecated
MassIndexer threadsForSubsequentFetching(int numberOfThreads);

/**
Expand Down
Expand Up @@ -50,8 +50,7 @@ public class BatchCoordinator extends ErrorHandledRunnable {
private final Class<?>[] rootEntities; //entity types to reindex excluding all subtypes of each-other
private final SessionFactoryImplementor sessionFactory;
private final int typesToIndexInParallel;
private final int objectLoadingThreads;
private final int collectionLoadingThreads;
private final int documentBuilderThreads;
private final CacheMode cacheMode;
private final int objectLoadingBatchSize;
private final boolean optimizeAtEnd;
Expand All @@ -66,8 +65,7 @@ public BatchCoordinator(Set<Class<?>> rootEntities,
SearchFactoryImplementor searchFactoryImplementor,
SessionFactoryImplementor sessionFactory,
int typesToIndexInParallel,
int objectLoadingThreads,
int collectionLoadingThreads,
int documentBuilderThreads,
CacheMode cacheMode,
int objectLoadingBatchSize,
long objectsLimit,
Expand All @@ -81,8 +79,7 @@ public BatchCoordinator(Set<Class<?>> rootEntities,
this.rootEntities = rootEntities.toArray( new Class<?>[rootEntities.size()] );
this.sessionFactory = sessionFactory;
this.typesToIndexInParallel = typesToIndexInParallel;
this.objectLoadingThreads = objectLoadingThreads;
this.collectionLoadingThreads = collectionLoadingThreads;
this.documentBuilderThreads = documentBuilderThreads;
this.cacheMode = cacheMode;
this.objectLoadingBatchSize = objectLoadingBatchSize;
this.optimizeAtEnd = optimizeAtEnd;
Expand All @@ -94,7 +91,7 @@ public BatchCoordinator(Set<Class<?>> rootEntities,
}

@Override
public void runWithErrroHandler() {
public void runWithErrorHandler() {
final BatchBackend backend = searchFactoryImplementor.makeBatchBackend( monitor );
try {
beforeBatch( backend ); // purgeAll and pre-optimize activities
Expand Down Expand Up @@ -123,7 +120,7 @@ private void doBatchWork(BatchBackend backend) throws InterruptedException {
executor.execute(
new BatchIndexingWorkspace(
searchFactoryImplementor, sessionFactory, type,
objectLoadingThreads, collectionLoadingThreads,
documentBuilderThreads,
cacheMode, objectLoadingBatchSize, endAllSignal,
monitor, backend, objectsLimit, idFetchSize
)
Expand Down
Expand Up @@ -50,11 +50,9 @@ public class BatchIndexingWorkspace extends ErrorHandledRunnable {

private final SessionFactoryImplementor sessionFactory;

private final ProducerConsumerQueue<List<Serializable>> fromIdentifierListToEntities;
private final ProducerConsumerQueue<List<?>> fromEntityToAddwork;
private final ProducerConsumerQueue<List<Serializable>> primaryKeyStream;

private final int objectLoadingThreadNum;
private final int luceneWorkerBuildingThreadNum;
private final int documentBuilderThreads;
private final Class<?> indexedType;
private final String idNameOfIndexedType;

Expand All @@ -79,7 +77,6 @@ public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor,
SessionFactoryImplementor sessionFactory,
Class<?> entityType,
int objectLoadingThreads,
int collectionLoadingThreads,
CacheMode cacheMode,
int objectLoadingBatchSize,
CountDownLatch endAllSignal,
Expand All @@ -96,38 +93,34 @@ public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor,
this.sessionFactory = sessionFactory;

//thread pool sizing:
this.objectLoadingThreadNum = objectLoadingThreads;
this.luceneWorkerBuildingThreadNum = collectionLoadingThreads;//collections are loaded as needed by building the document
this.documentBuilderThreads = objectLoadingThreads;

//loading options:
this.cacheMode = cacheMode;
this.objectLoadingBatchSize = objectLoadingBatchSize;
this.backend = backend;

//pipelining queues:
this.fromIdentifierListToEntities = new ProducerConsumerQueue<List<Serializable>>( 1 );
this.fromEntityToAddwork = new ProducerConsumerQueue<List<?>>( objectLoadingThreadNum );
this.primaryKeyStream = new ProducerConsumerQueue<List<Serializable>>( 1 );

//end signal shared with other instances:
this.endAllSignal = endAllSignal;
this.producerEndSignal = new CountDownLatch( luceneWorkerBuildingThreadNum );
this.producerEndSignal = new CountDownLatch( documentBuilderThreads );

this.monitor = monitor;
this.objectsLimit = objectsLimit;
}

@Override
public void runWithErrroHandler() {
public void runWithErrorHandler() {
try {
final ErrorHandler errorHandler = searchFactoryImplementor.getErrorHandler();
final BatchTransactionalContext btctx = new BatchTransactionalContext( searchFactoryImplementor, sessionFactory, errorHandler );
final BatchTransactionalContext transactionalContext = new BatchTransactionalContext( searchFactoryImplementor, sessionFactory, errorHandler );
//first start the consumers, then the producers (reverse order):
//from entity to LuceneWork:
startTransformationToLuceneWork( btctx, errorHandler );
//from primary key to loaded entity:
startTransformationToEntities( btctx, errorHandler );
//from primary keys to LuceneWork ADD operations:
startTransformationToLuceneWork( transactionalContext, errorHandler );
//from class definition to all primary keys:
startProducingPrimaryKeys( btctx, errorHandler );
startProducingPrimaryKeys( transactionalContext, errorHandler );
try {
producerEndSignal.await(); //await for all work being sent to the backend
log.debugf( "All work for type %s has been produced", indexedType.getName() );
Expand All @@ -143,10 +136,10 @@ public void runWithErrroHandler() {
}
}

private void startProducingPrimaryKeys(BatchTransactionalContext btctx, ErrorHandler errorHandler) {
final Runnable primaryKeyOutputter = new OptionallyWrapInJTATransaction( btctx,
private void startProducingPrimaryKeys(BatchTransactionalContext transactionalContext, ErrorHandler errorHandler) {
final Runnable primaryKeyOutputter = new OptionallyWrapInJTATransaction( transactionalContext,
new IdentifierProducer(
fromIdentifierListToEntities, sessionFactory,
primaryKeyStream, sessionFactory,
objectLoadingBatchSize, indexedType, monitor,
objectsLimit, errorHandler, idFetchSize
));
Expand All @@ -160,39 +153,23 @@ private void startProducingPrimaryKeys(BatchTransactionalContext btctx, ErrorHan
}
}

private void startTransformationToEntities(BatchTransactionalContext btctx, ErrorHandler errorHandler) {
final Runnable entityOutputter = new OptionallyWrapInJTATransaction( btctx,
new IdentifierConsumerEntityProducer(
fromIdentifierListToEntities, fromEntityToAddwork, monitor,
sessionFactory, cacheMode, indexedType, idNameOfIndexedType
private void startTransformationToLuceneWork(BatchTransactionalContext transactionalContext, ErrorHandler errorHandler) {
final Runnable documentOutputter = new OptionallyWrapInJTATransaction( transactionalContext,
new IdentifierConsumerDocumentProducer(
primaryKeyStream, monitor, sessionFactory, producerEndSignal,
cacheMode, indexedType, searchFactoryImplementor,
idNameOfIndexedType, backend, errorHandler
));
final ThreadPoolExecutor execFirstLoader = Executors.newFixedThreadPool( objectLoadingThreadNum, "entityloader" );
final ThreadPoolExecutor execFirstLoader = Executors.newFixedThreadPool( documentBuilderThreads, "entityloader" );
try {
for ( int i = 0; i < objectLoadingThreadNum; i++ ) {
execFirstLoader.execute( entityOutputter );
for ( int i = 0; i < documentBuilderThreads; i++ ) {
execFirstLoader.execute( documentOutputter );
}
}
finally {
execFirstLoader.shutdown();
}
}

private void startTransformationToLuceneWork(BatchTransactionalContext btctx, ErrorHandler errorHandler) {
final Runnable luceneOutputter = new OptionallyWrapInJTATransaction( btctx,
new EntityConsumerLuceneWorkProducer(
fromEntityToAddwork, monitor,
sessionFactory, producerEndSignal, searchFactoryImplementor,
cacheMode, backend, errorHandler
));
final ThreadPoolExecutor execDocBuilding = Executors.newFixedThreadPool( luceneWorkerBuildingThreadNum, "collectionsloader" );
try {
for ( int i = 0; i < luceneWorkerBuildingThreadNum; i++ ) {
execDocBuilding.execute( luceneOutputter );
}
}
finally {
execDocBuilding.shutdown();
}
}

}
Expand Up @@ -46,7 +46,7 @@ protected ErrorHandledRunnable(SearchFactoryImplementor searchFactoryImplementor
public final void run() {
ErrorHandler errorHandler = searchFactoryImplementor.getErrorHandler();
try {
runWithErrroHandler();
runWithErrorHandler();
}
catch (Exception re) {
//being this an async thread we want to make sure everything is somehow reported
Expand All @@ -55,7 +55,7 @@ public final void run() {
}
}

protected abstract void runWithErrroHandler() throws Exception;
protected abstract void runWithErrorHandler() throws Exception;

protected void cleanUpOnError() {
//no-op unless overridden
Expand Down

0 comments on commit 0eff03b

Please sign in to comment.