Skip to content

Commit

Permalink
HSEARCH-987 Expose option to set JDBC FetchSize in MassIndexer
Browse files Browse the repository at this point in the history
  • Loading branch information
Sanne authored and hferentschik committed Dec 5, 2011
1 parent 99d3047 commit f37812b
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ transaction.commit();</programlisting>
.cacheMode( CacheMode.NORMAL )
.threadsToLoadObjects( 5 )
.threadsForIndexWriter( 3 )
.idFetchSize( 150 )
.threadsForSubsequentFetching( 20 )
.progressMonitor( monitor ) //a MassIndexerProgressMonitor implementation
.startAndWait();</programlisting>
Expand Down Expand Up @@ -332,5 +333,10 @@ transaction.commit();</programlisting>
and Hibernate Search is just passing these parameters through - see <xref
linkend="lucene-indexing-performance" /> for more details.</para>

<para>The <classname>MassIndexer</classname> uses a forward only scrollable result to iterate
on the primary keys to be loaded, but MySQL's JDBC driver will load all values in memory;
to avoid this "optimisation" set <literal>idFetchSize</literal> to
<literal>Integer.MIN_VALUE</literal>.</para>

</section>
</chapter>
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public interface MassIndexer {
* As a results the index will not be consistent
* with the database: use only for testing on an (undefined) subset of database data.
* @param maximum
* @return
* @return <tt>this</tt> for method chaining
*/
MassIndexer limitIndexedObjectsTo(long maximum);

Expand All @@ -144,4 +144,14 @@ public interface MassIndexer {
*/
void startAndWait() throws InterruptedException;

/**
* Specifies the fetch size to be used when loading primary keys
* if objects to be indexed. Some databases accept special values,
* for example MySQL might benefit from using {@link Integer#MIN_VALUE}
* otherwise it will attempt to preload everything in memory.
* @param idFetchSize
* @return <tt>this</tt> for method chaining
*/
public MassIndexer idFetchSize(int idFetchSize);

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class BatchCoordinator implements Runnable {
private final MassIndexerProgressMonitor monitor;
private final long objectsLimit;
private final ErrorHandler errorHandler;
private final int idFetchSize;

public BatchCoordinator(Set<Class<?>> rootEntities,
SearchFactoryImplementor searchFactoryImplementor,
Expand All @@ -77,7 +78,9 @@ public BatchCoordinator(Set<Class<?>> rootEntities,
boolean purgeAtStart,
boolean optimizeAfterPurge,
MassIndexerProgressMonitor monitor,
Integer writerThreads) {
Integer writerThreads,
int idFetchSize) {
this.idFetchSize = idFetchSize;
this.rootEntities = rootEntities.toArray( new Class<?>[rootEntities.size()] );
this.searchFactoryImplementor = searchFactoryImplementor;
this.sessionFactory = sessionFactory;
Expand Down Expand Up @@ -131,8 +134,8 @@ private void doBatchWork(BatchBackend backend) throws InterruptedException {
new BatchIndexingWorkspace(
searchFactoryImplementor, sessionFactory, type,
objectLoadingThreads, collectionLoadingThreads,
cacheMode, objectLoadingBatchSize,
endAllSignal, monitor, backend, objectsLimit
cacheMode, objectLoadingBatchSize, endAllSignal,
monitor, backend, objectsLimit, idFetchSize
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class BatchIndexingWorkspace implements Runnable {

private final long objectsLimit;

private final int idFetchSize;

public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor,
SessionFactory sessionFactory,
Class<?> entityType,
Expand All @@ -88,9 +90,11 @@ public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor,
CountDownLatch endAllSignal,
MassIndexerProgressMonitor monitor,
BatchBackend backend,
long objectsLimit) {
long objectsLimit,
int idFetchSize) {

this.indexedType = entityType;
this.idFetchSize = idFetchSize;
this.idNameOfIndexedType = searchFactoryImplementor.getIndexBindingForEntity( entityType )
.getDocumentBuilder()
.getIdentifierName();
Expand Down Expand Up @@ -150,7 +154,7 @@ public void run() {
final IdentifierProducer producer = new IdentifierProducer(
fromIdentifierListToEntities, sessionFactory,
objectLoadingBatchSize, indexedType, monitor,
objectsLimit, errorHandler
objectsLimit, errorHandler, idFetchSize
);
execIdentifiersLoader.execute( new OptionallyWrapInJTATransaction( sessionFactory, errorHandler, producer ) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class IdentifierProducer implements StatelessSessionAwareRunnable {
private final MassIndexerProgressMonitor monitor;
private final long objectsLimit;
private final ErrorHandler errorHandler;
private final int idFetchSize;

/**
* @param fromIdentifierListToEntities the target queue where the produced identifiers are sent to
Expand All @@ -77,14 +78,15 @@ public IdentifierProducer(
SessionFactory sessionFactory,
int objectLoadingBatchSize,
Class<?> indexedType, MassIndexerProgressMonitor monitor,
long objectsLimit, ErrorHandler errorHandler) {
long objectsLimit, ErrorHandler errorHandler, int idFetchSize) {
this.destination = fromIdentifierListToEntities;
this.sessionFactory = sessionFactory;
this.batchSize = objectLoadingBatchSize;
this.indexedType = indexedType;
this.monitor = monitor;
this.objectsLimit = objectsLimit;
this.errorHandler = errorHandler;
this.idFetchSize = idFetchSize;
log.trace( "created" );
}

Expand Down Expand Up @@ -140,7 +142,7 @@ private void loadAllIdentifiers(final StatelessSession session) throws Interrupt
.createCriteria( indexedType )
.setProjection( Projections.id() )
.setCacheable( false )
.setFetchSize( 100 );
.setFetchSize( idFetchSize );

ScrollableResults results = criteria.scroll( ScrollMode.FORWARD_ONLY );
ArrayList<Serializable> destinationList = new ArrayList<Serializable>( batchSize );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MassIndexerImpl implements MassIndexer {
private boolean purgeAtStart = true;
private boolean optimizeAfterPurge = true;
private MassIndexerProgressMonitor monitor;
private int idFetchSize = 100; //reasonable default as we only load IDs

protected MassIndexerImpl(SearchFactoryImplementor searchFactory, SessionFactory sessionFactory, Class<?>... entities) {
this.searchFactoryImplementor = searchFactory;
Expand Down Expand Up @@ -212,12 +213,20 @@ protected BatchCoordinator createCoordinator() {
objectLoadingThreads, collectionLoadingThreads,
cacheMode, objectLoadingBatchSize, objectsLimit,
optimizeAtEnd, purgeAtStart, optimizeAfterPurge,
monitor, writerThreads
monitor, writerThreads, idFetchSize
);
}

public MassIndexer limitIndexedObjectsTo(long maximum) {
this.objectsLimit = maximum;
return this;
}

@Override
public MassIndexer idFetchSize(int idFetchSize) {
// don't check for positive/zero values as it's actually used by some databases
// as special values which might be useful.
this.idFetchSize = idFetchSize;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* JBoss, Home of Professional Open Source
* Copyright 2011 Red Hat Inc. and/or its affiliates and other contributors
* as indicated by the @authors tag. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*/
package org.hibernate.search.test.batchindexing;

import junit.framework.Assert;

import org.hibernate.search.Environment;
import org.hibernate.search.FullTextSession;
import org.hibernate.search.engine.spi.SearchFactoryImplementor;
import org.hibernate.search.test.SearchTestCase;
import org.hibernate.search.test.errorhandling.MockErrorHandler;
import org.hibernate.testing.RequiresDialect;
import org.hibernate.testing.junit4.CustomRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.hibernate.search.MassIndexer;

/**
* Verifies that {@link MassIndexer#idFetchSize(int)} is applied by checking for errors thrown by
* the JDBC Dialect. We use this approach especially as we want to make sure that using
* {@link Integer#MIN_VALUE} is an acceptable option on MySQL as we suggest it on the documentation.
*
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2011 Red Hat Inc.
*/
@RunWith(CustomRunner.class)//needed to enable @RequiresDialect functionality
public class FetchSizeConfigurationTest extends SearchTestCase {

@Test
@RequiresDialect(comment = "H2 does not accept negative fetch sizes",
strictMatching = true, value = org.hibernate.dialect.H2Dialect.class)
public void testSetFetchSizeOnH2Fails() throws InterruptedException {
SearchFactoryImplementor searchFactory = getSearchFactoryImpl();
MockErrorHandler mockErrorHandler = MassIndexerErrorReportingTest.getErrorHandler( searchFactory );

FullTextSession fullTextSession = MassIndexerErrorReportingTest.prepareSomeData( this );

fullTextSession.createIndexer( Book.class ).idFetchSize( -1 ).startAndWait();

session.close();
String errorMessage = mockErrorHandler.getErrorMessage();
Assert.assertEquals( "HSEARCH000116: Unexpected error during MassIndexer operation", errorMessage );
Throwable exception = mockErrorHandler.getLastException();
Assert.assertTrue( exception instanceof org.hibernate.exception.GenericJDBCException );
}

@Test
@RequiresDialect(comment = "MySQL definitely should accept Integer.MIN_VALUE",
strictMatching = false, value = org.hibernate.dialect.MySQLDialect.class)
public void testSetFetchSizeOnMySQL() throws InterruptedException {
SearchFactoryImplementor searchFactory = getSearchFactoryImpl();
MockErrorHandler mockErrorHandler = MassIndexerErrorReportingTest.getErrorHandler( searchFactory );

FullTextSession fullTextSession = MassIndexerErrorReportingTest.prepareSomeData( this );

fullTextSession.createIndexer( Book.class ).idFetchSize( Integer.MIN_VALUE ).startAndWait();

session.close();
String errorMessage = mockErrorHandler.getErrorMessage();
Assert.assertEquals( null, errorMessage );
}

protected Class<?>[] getAnnotatedClasses() {
return new Class[] { Book.class, Nation.class };
}

protected void configure(org.hibernate.cfg.Configuration cfg) {
super.configure( cfg );
cfg.setProperty( Environment.ERROR_HANDLER, MockErrorHandler.class.getName() );
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package org.hibernate.search.test.batchindexing;

import junit.framework.Assert;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.hibernate.search.Environment;
import org.hibernate.search.FullTextSession;
Expand All @@ -13,6 +9,10 @@
import org.hibernate.search.exception.ErrorHandler;
import org.hibernate.search.test.SearchTestCase;
import org.hibernate.search.test.errorhandling.MockErrorHandler;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(BMUnitRunner.class)
public class MassIndexerErrorReportingTest extends SearchTestCase {
Expand All @@ -25,11 +25,28 @@ public class MassIndexerErrorReportingTest extends SearchTestCase {
name = "testMassIndexerErrorsReported")
public void testMassIndexerErrorsReported() throws InterruptedException {
SearchFactoryImplementor searchFactory = getSearchFactoryImpl();
MockErrorHandler mockErrorHandler = getErrorHandler( searchFactory );

FullTextSession fullTextSession = prepareSomeData( this );

fullTextSession.createIndexer( Book.class ).startAndWait();

session.close();
String errorMessage = mockErrorHandler.getErrorMessage();
Assert.assertEquals( "HSEARCH000116: Unexpected error during MassIndexer operation", errorMessage );
Throwable exception = mockErrorHandler.getLastException();
Assert.assertTrue( exception instanceof org.jboss.byteman.rule.exception.ExecuteException );
}

static MockErrorHandler getErrorHandler(SearchFactoryImplementor searchFactory) {
ErrorHandler errorHandler = searchFactory.getErrorHandler();
Assert.assertTrue( errorHandler instanceof MockErrorHandler );
MockErrorHandler mockErrorHandler = (MockErrorHandler) errorHandler;
return mockErrorHandler;
}

FullTextSession fullTextSession = Search.getFullTextSession( openSession() );
static FullTextSession prepareSomeData(SearchTestCase testcase) {
FullTextSession fullTextSession = Search.getFullTextSession( testcase.openSession() );
fullTextSession.beginTransaction();
Nation france = new Nation( "France", "FR" );
fullTextSession.save( france );
Expand All @@ -38,14 +55,7 @@ public void testMassIndexerErrorsReported() throws InterruptedException {
ceylonBook.setFirstPublishedIn( france );
fullTextSession.save( ceylonBook );
fullTextSession.getTransaction().commit();

fullTextSession.createIndexer( Book.class ).startAndWait();

session.close();
String errorMessage = mockErrorHandler.getErrorMessage();
Assert.assertEquals( "HSEARCH000116: Unexpected error during MassIndexer operation", errorMessage );
Throwable exception = mockErrorHandler.getLastException();
Assert.assertTrue( exception instanceof org.jboss.byteman.rule.exception.ExecuteException );
return fullTextSession;
}

protected Class<?>[] getAnnotatedClasses() {
Expand Down

0 comments on commit f37812b

Please sign in to comment.