Skip to content

Commit

Permalink
HSEARCH-901
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanne authored and hferentschik committed Sep 8, 2011
1 parent d8341e0 commit 6d67012
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 103 deletions.
Expand Up @@ -34,37 +34,38 @@
import org.hibernate.search.backend.impl.batch.BatchBackend;
import org.hibernate.search.batchindexing.MassIndexerProgressMonitor;
import org.hibernate.search.engine.spi.SearchFactoryImplementor;
import org.hibernate.search.util.logging.impl.LoggerFactory;
import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/**
* This runnable will prepare a pipeline for batch indexing
* of entities, managing the lifecycle of several ThreadPools.
*
*
* @author Sanne Grinovero
*/
public class BatchIndexingWorkspace implements Runnable {

private static final Log log = LoggerFactory.make();

private final SearchFactoryImplementor searchFactory;
private final SessionFactory sessionFactory;

//following order shows the 4 stages of an entity flowing to the index:
private final ThreadPoolExecutor execIdentifiersLoader;
private final ProducerConsumerQueue<List<Serializable>> fromIdentifierListToEntities;
private final ThreadPoolExecutor execFirstLoader;
private final ProducerConsumerQueue<List<?>> fromEntityToAddwork;
private final ThreadPoolExecutor execDocBuilding;
private final ThreadPoolExecutor execIdentifiersLoader;
private final ProducerConsumerQueue<List<Serializable>> fromIdentifierListToEntities;
private final ThreadPoolExecutor execFirstLoader;
private final ProducerConsumerQueue<List<?>> fromEntityToAddwork;
private final ThreadPoolExecutor execDocBuilding;

private final int objectLoadingThreadNum;
private final int luceneworkerBuildingThreadNum;
private final int luceneWorkerBuildingThreadNum;
private final Class<?> indexedType;

private final String idNameOfIndexedType;

// status control
private final CountDownLatch producerEndSignal; //released when we stop adding Documents to Index
private final CountDownLatch endAllSignal; //released when we release all locks and IndexWriter

// progress monitor
private final MassIndexerProgressMonitor monitor;

Expand All @@ -73,66 +74,69 @@ public class BatchIndexingWorkspace implements Runnable {
private final int objectLoadingBatchSize;

private final BatchBackend backend;

private final long objectsLimit;

public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor, SessionFactory sessionFactory,
Class<?> entityType,
int objectLoadingThreads, int collectionLoadingThreads,
CacheMode cacheMode, int objectLoadingBatchSize,
CountDownLatch endAllSignal,
MassIndexerProgressMonitor monitor, BatchBackend backend,
long objectsLimit) {
Class<?> entityType,
int objectLoadingThreads, int collectionLoadingThreads,
CacheMode cacheMode, int objectLoadingBatchSize,
CountDownLatch endAllSignal,
MassIndexerProgressMonitor monitor, BatchBackend backend,
long objectsLimit) {

this.indexedType = entityType;
this.idNameOfIndexedType = searchFactoryImplementor.getIndexBindingForEntity( entityType )
.getDocumentBuilder()
.getIdentifierName();
this.searchFactory = searchFactoryImplementor;
this.sessionFactory = sessionFactory;

//thread pool sizing:
this.objectLoadingThreadNum = objectLoadingThreads;
this.luceneworkerBuildingThreadNum = collectionLoadingThreads;//collections are loaded as needed by building the document
this.luceneWorkerBuildingThreadNum = collectionLoadingThreads;//collections are loaded as needed by building the document

//loading options:
this.cacheMode = cacheMode;
this.objectLoadingBatchSize = objectLoadingBatchSize;
this.backend = backend;

//executors: (quite expensive constructor)
//execIdentifiersLoader has size 1 and is not configurable: ensures the list is consistent as produced by one transaction
this.execIdentifiersLoader = Executors.newFixedThreadPool( 1, "identifierloader" );
this.execFirstLoader = Executors.newFixedThreadPool( objectLoadingThreadNum, "entityloader" );
this.execDocBuilding = Executors.newFixedThreadPool( luceneworkerBuildingThreadNum, "collectionsloader" );
this.execDocBuilding = Executors.newFixedThreadPool( luceneWorkerBuildingThreadNum, "collectionsloader" );

//pipelining queues:
this.fromIdentifierListToEntities = new ProducerConsumerQueue<List<Serializable>>( 1 );
this.fromEntityToAddwork = new ProducerConsumerQueue<List<?>>( objectLoadingThreadNum );

//end signal shared with other instances:
this.endAllSignal = endAllSignal;
this.producerEndSignal = new CountDownLatch( luceneworkerBuildingThreadNum );
this.producerEndSignal = new CountDownLatch( luceneWorkerBuildingThreadNum );

this.monitor = monitor;
this.objectsLimit = objectsLimit;
}

public void run() {
try {

//first start the consumers, then the producers (reverse order):
for ( int i=0; i < luceneworkerBuildingThreadNum; i++ ) {
//from entity to LuceneWork:
final EntityConsumerLuceneworkProducer producer = new EntityConsumerLuceneworkProducer(
for ( int i = 0; i < luceneWorkerBuildingThreadNum; i++ ) {
//from entity to LuceneWork:
final EntityConsumerLuceneWorkProducer producer = new EntityConsumerLuceneWorkProducer(
fromEntityToAddwork, monitor,
sessionFactory, producerEndSignal, searchFactory,
cacheMode, backend
);
execDocBuilding.execute( new OptionallyWrapInJTATransaction( sessionFactory, producer ) );
}
for ( int i=0; i < objectLoadingThreadNum; i++ ) {
//from primary key to loaded entity:
for ( int i = 0; i < objectLoadingThreadNum; i++ ) {
//from primary key to loaded entity:
final IdentifierConsumerEntityProducer producer = new IdentifierConsumerEntityProducer(
fromIdentifierListToEntities, fromEntityToAddwork, monitor,
sessionFactory, cacheMode, indexedType
sessionFactory, cacheMode, indexedType, idNameOfIndexedType
);
execFirstLoader.execute( new OptionallyWrapInJTATransaction( sessionFactory, producer ) );
}
Expand All @@ -143,15 +147,16 @@ public void run() {
objectsLimit
);
execIdentifiersLoader.execute( new OptionallyWrapInJTATransaction( sessionFactory, producer ) );

//shutdown all executors:
execIdentifiersLoader.shutdown();
execFirstLoader.shutdown();
execDocBuilding.shutdown();
try {
producerEndSignal.await(); //await for all work being sent to the backend
log.debugf( "All work for type %s has been produced", indexedType.getName() );
} catch (InterruptedException e) {
}
catch ( InterruptedException e ) {
//restore interruption signal:
Thread.currentThread().interrupt();
throw new SearchException( "Interrupted on batch Indexing; index will be left in unknown state!", e );
Expand All @@ -161,5 +166,4 @@ public void run() {
endAllSignal.countDown();
}
}

}
@@ -1,4 +1,4 @@
/*
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* Copyright (c) 2010, Red Hat, Inc. and/or its affiliates or third-party contributors as
Expand Down Expand Up @@ -57,7 +57,7 @@
*
* @author Sanne Grinovero
*/
public class EntityConsumerLuceneworkProducer implements SessionAwareRunnable {
public class EntityConsumerLuceneWorkProducer implements SessionAwareRunnable {

private static final Log log = LoggerFactory.make();

Expand All @@ -72,7 +72,7 @@ public class EntityConsumerLuceneworkProducer implements SessionAwareRunnable {

private final BatchBackend backend;

public EntityConsumerLuceneworkProducer(
public EntityConsumerLuceneWorkProducer(
ProducerConsumerQueue<List<?>> entitySource,
MassIndexerProgressMonitor monitor,
SessionFactory sessionFactory,
Expand Down Expand Up @@ -163,5 +163,4 @@ private void index( Object entity, Session session, EntityInitializer sessionIni
AddLuceneWork addWork = docBuilder.createAddWork( clazz, entity, id, idInString, sessionInitializer );
backend.enqueueAsyncWork( addWork );
}

}
Expand Up @@ -36,19 +36,19 @@
import org.hibernate.criterion.CriteriaSpecification;
import org.hibernate.criterion.Restrictions;
import org.hibernate.search.batchindexing.MassIndexerProgressMonitor;
import org.hibernate.search.util.logging.impl.LoggerFactory;
import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;

/**
* This Runnable is consuming entity identifiers and
* This {@code Runnable} is consuming entity identifiers and
* producing loaded detached entities for the next queue.
* It will finish when the queue it's consuming from will
* It will finish when the queue it is consuming from will
* signal there are no more identifiers.
*
*
* @author Sanne Grinovero
*/
public class IdentifierConsumerEntityProducer implements SessionAwareRunnable {

private static final Log log = LoggerFactory.make();

private final ProducerConsumerQueue<List<Serializable>> source;
Expand All @@ -57,20 +57,23 @@ public class IdentifierConsumerEntityProducer implements SessionAwareRunnable {
private final CacheMode cacheMode;
private final Class<?> type;
private final MassIndexerProgressMonitor monitor;
private final String idName;

public IdentifierConsumerEntityProducer(
ProducerConsumerQueue<List<Serializable>> fromIdentifierListToEntities,
ProducerConsumerQueue<List<?>> fromEntityToAddwork,
ProducerConsumerQueue<List<?>> fromEntityToAddWork,
MassIndexerProgressMonitor monitor,
SessionFactory sessionFactory,
CacheMode cacheMode, Class<?> type) {
this.source = fromIdentifierListToEntities;
this.destination = fromEntityToAddwork;
this.monitor = monitor;
this.sessionFactory = sessionFactory;
this.cacheMode = cacheMode;
this.type = type;
log.trace( "created" );
CacheMode cacheMode, Class<?> type,
String idName) {
this.source = fromIdentifierListToEntities;
this.destination = fromEntityToAddWork;
this.monitor = monitor;
this.sessionFactory = sessionFactory;
this.cacheMode = cacheMode;
this.type = type;
this.idName = idName;
log.trace( "created" );
}

public void run(Session upperSession) {
Expand All @@ -88,17 +91,17 @@ public void run(Session upperSession) {
loadAllFromQueue( session );
transaction.commit();
}
catch (Throwable e) {
catch ( Throwable e ) {
log.errorDuringBatchIndexing( e );
}
finally {
if (upperSession == null) {
if ( upperSession == null ) {
session.close();
}
}
log.trace( "finished" );
}

private void loadAllFromQueue(Session session) {
try {
Object take;
Expand All @@ -113,7 +116,7 @@ private void loadAllFromQueue(Session session) {
}
while ( take != null );
}
catch (InterruptedException e) {
catch ( InterruptedException e ) {
// just quit
Thread.currentThread().interrupt();
}
Expand All @@ -125,24 +128,25 @@ private void loadAllFromQueue(Session session) {
/**
* Loads a list of entities of defined type using their identifiers.
* The loaded objects are then pushed to the next queue one by one.
*
* @param listIds the list of entity identifiers (of type
* @param session the session to be used
*
* @throws InterruptedException
*/
private void loadList(List<Serializable> listIds, Session session) throws InterruptedException {
//TODO investigate if I should use ObjectLoaderHelper.initializeObjects instead
Criteria criteria = session
.createCriteria( type )
.setCacheMode( cacheMode )
.setLockMode( LockMode.NONE )
.setCacheable( false )
.setFlushMode( FlushMode.MANUAL )
.setResultTransformer( CriteriaSpecification.DISTINCT_ROOT_ENTITY )
.add( Restrictions.in( "id", listIds ) );
.createCriteria( type )
.setCacheMode( cacheMode )
.setLockMode( LockMode.NONE )
.setCacheable( false )
.setFlushMode( FlushMode.MANUAL )
.setResultTransformer( CriteriaSpecification.DISTINCT_ROOT_ENTITY )
.add( Restrictions.in( idName, listIds ) );
List<?> list = criteria.list();
monitor.entitiesLoaded( list.size() );
session.clear();
destination.put( list );
}

}

0 comments on commit 6d67012

Please sign in to comment.