Skip to content

Commit

Permalink
HSEARCH-1891 Making sure transactions are rolled back in document pro…
Browse files Browse the repository at this point in the history
…ducer also if interrupted
  • Loading branch information
gunnarmorling authored and Sanne committed Jun 8, 2015
1 parent e41384c commit 170b85d
Showing 1 changed file with 46 additions and 45 deletions.
Expand Up @@ -160,24 +160,27 @@ private void loadAllFromQueue(Session session) throws Exception {
* @throws InterruptedException
*/
private void loadList(List<Serializable> listIds, Session session, InstanceInitializer sessionInitializer) throws Exception {
beginTransaction( session );

Criteria criteria = session
.createCriteria( type )
.setCacheMode( cacheMode )
.setLockMode( LockMode.NONE )
.setCacheable( false )
.setFlushMode( FlushMode.MANUAL )
.setFetchSize( listIds.size() )
.setResultTransformer( CriteriaSpecification.DISTINCT_ROOT_ENTITY )
.add( Restrictions.in( idName, listIds ) );
List<?> list = criteria.list();
monitor.entitiesLoaded( list.size() );
indexAllQueue( tenantIdentifier( session ), session, list, sessionInitializer );
session.clear();
try {
beginTransaction( session );

// it's read-only, so no need to commit
rollbackTransaction( session );
Criteria criteria = session
.createCriteria( type )
.setCacheMode( cacheMode )
.setLockMode( LockMode.NONE )
.setCacheable( false )
.setFlushMode( FlushMode.MANUAL )
.setFetchSize( listIds.size() )
.setResultTransformer( CriteriaSpecification.DISTINCT_ROOT_ENTITY )
.add( Restrictions.in( idName, listIds ) );
List<?> list = criteria.list();
monitor.entitiesLoaded( list.size() );
indexAllQueue( tenantIdentifier( session ), session, list, sessionInitializer );
session.clear();
}
finally {
// it's read-only, so no need to commit
rollbackTransaction( session );
}
}

private void beginTransaction(Session session) throws Exception {
Expand Down Expand Up @@ -212,42 +215,40 @@ private String tenantIdentifier(Session session) {
return tenantId;
}

private void indexAllQueue(String tenantId, Session session, List<?> entities, InstanceInitializer sessionInitializer) {
try {
ConversionContext contextualBridge = new ContextualExceptionBridgeHelper();
if ( entities == null || entities.isEmpty() ) {
return;
private void indexAllQueue(String tenantId, Session session, List<?> entities, InstanceInitializer sessionInitializer) throws InterruptedException {
ConversionContext contextualBridge = new ContextualExceptionBridgeHelper();

if ( entities == null || entities.isEmpty() ) {
return;
}
else {
log.tracef( "received a list of objects to index: %s", entities );
for ( Object object : entities ) {
try {
index( tenantId, object, session, sessionInitializer, contextualBridge );
monitor.documentsBuilt( 1 );
}
else {
log.tracef( "received a list of objects to index: %s", entities );
for ( Object object : entities ) {
try {
index( tenantId, object, session, sessionInitializer, contextualBridge );
monitor.documentsBuilt( 1 );
}
catch (InterruptedException ie) {
// rethrowing the interrupted exception
throw ie;
}
catch (RuntimeException e) {
String errorMsg = log.massIndexerUnableToIndexInstance(
object.getClass().getName(),
object.toString()
);
errorHandler.handleException( errorMsg, e );
}
}
catch (RuntimeException e) {
String errorMsg = log.massIndexerUnableToIndexInstance(
object.getClass().getName(),
object.toString()
);
errorHandler.handleException( errorMsg, e );
}
}
catch (InterruptedException e) {
// just quit
Thread.currentThread().interrupt();
}
}
}

@SuppressWarnings("unchecked")
private void index(String tenantId, Object entity, Session session, InstanceInitializer sessionInitializer, ConversionContext conversionContext)
throws InterruptedException {

// abort if the thread has been interrupted while not in wait(), I/O or similar which themselves would have
// raised the InterruptedException
if ( Thread.currentThread().isInterrupted() ) {
throw new InterruptedException();
}

Serializable id = session.getIdentifier( entity );
Class<?> clazz = HibernateHelper.getClass( entity );
EntityIndexBinding entityIndexBinding = entityIndexBindings.get( clazz );
Expand Down

0 comments on commit 170b85d

Please sign in to comment.