Skip to content

Commit

Permalink
HSEARCH-1735 Clearly distinguish the Async from the BatchSync Process…
Browse files Browse the repository at this point in the history
…ing strategies and make sure no unnecessary thread is started
  • Loading branch information
Sanne committed Nov 27, 2014
1 parent ddf7b12 commit 29473df
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 30 deletions.
@@ -0,0 +1,50 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.impl.lucene;

import java.util.List;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;


/**
* This is the asynchronous backend logic for the LuceneBackendQueueProcessor.
* It merely forwards batches of indexing work to the async Executor for this indexing backend.
*
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2014 Red Hat Inc.
* @since 5.0
*/
final class AsyncWorkProcessor implements WorkProcessor {

private volatile LuceneBackendResources resources;

public AsyncWorkProcessor(LuceneBackendResources resources) {
this.resources = resources;
}

@Override
public void shutdown() {
//no-op
}

@Override
public void submit(List<LuceneWork> workList, IndexingMonitor monitor) {
LuceneBackendQueueTask luceneBackendQueueProcessor = new LuceneBackendQueueTask(
workList,
resources,
monitor
);
resources.getAsynchIndexingExecutor().execute( luceneBackendQueueProcessor );
}

@Override
public void updateResources(LuceneBackendResources resources) {
this.resources = resources;
}

}
Expand Up @@ -37,7 +37,7 @@ public class LuceneBackendQueueProcessor implements BackendQueueProcessor {
private boolean sync;
private AbstractWorkspaceImpl workspaceOverride;
private LuceneBackendTaskStreamer streamWorker;
private BatchSyncProcessor batchSyncProcessor;
private WorkProcessor workProcessor;

@Override
public void initialize(Properties props, WorkerBuildContext context, DirectoryBasedIndexManager indexManager) {
Expand All @@ -49,14 +49,23 @@ public void initialize(Properties props, WorkerBuildContext context, DirectoryBa
}
resources = new LuceneBackendResources( context, indexManager, props, workspaceOverride );
streamWorker = new LuceneBackendTaskStreamer( resources );
batchSyncProcessor = new BatchSyncProcessor( resources, indexManager.getIndexName() );
batchSyncProcessor.start();
String indexName = indexManager.getIndexName();
if ( sync ) {
final SyncWorkProcessor batchSyncProcessor = new SyncWorkProcessor( resources, indexName );
batchSyncProcessor.start();
log.luceneBackendInitializedSynchronously( indexName );
workProcessor = batchSyncProcessor;
}
else {
workProcessor = new AsyncWorkProcessor( resources );
log.luceneBackendInitializedAsynchronously( indexName );
}
}

@Override
public void close() {
resources.shutdown();
batchSyncProcessor.shutdown();
workProcessor.shutdown();
}

@Override
Expand All @@ -72,17 +81,7 @@ public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
if ( workList == null ) {
throw new IllegalArgumentException( "workList should not be null" );
}
if ( sync ) {
batchSyncProcessor.submit( workList, monitor );
}
else {
LuceneBackendQueueTask luceneBackendQueueProcessor = new LuceneBackendQueueTask(
workList,
resources,
monitor
);
resources.getQueueingExecutor().execute( luceneBackendQueueProcessor );
}
workProcessor.submit( workList, monitor );
}

@Override
Expand All @@ -107,7 +106,7 @@ public void setCustomWorkspace(AbstractWorkspaceImpl workspace) {
@Override
public void indexMappingChanged() {
resources = resources.onTheFlyRebuild();
batchSyncProcessor.updateResources( resources );
workProcessor.updateResources( resources );
}

}
Expand Up @@ -36,20 +36,20 @@ public final class LuceneBackendResources {
private final LuceneWorkVisitor visitor;
private final AbstractWorkspaceImpl workspace;
private final ErrorHandler errorHandler;
private final ExecutorService queueingExecutor;
private final int maxQueueLength;
private final String indexName;

private final ReadLock readLock;
private final WriteLock writeLock;

private volatile ExecutorService asyncIndexingExecutor;

LuceneBackendResources(WorkerBuildContext context, DirectoryBasedIndexManager indexManager, Properties props, AbstractWorkspaceImpl workspace) {
this.indexName = indexManager.getIndexName();
this.errorHandler = context.getErrorHandler();
this.workspace = workspace;
this.visitor = new LuceneWorkVisitor( workspace );
this.maxQueueLength = PropertiesParseHelper.extractMaxQueueSize( indexName, props );
this.queueingExecutor = Executors.newFixedThreadPool( 1, "Index updates queue processor for index " + indexName, maxQueueLength );
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
Expand All @@ -61,13 +61,30 @@ private LuceneBackendResources(LuceneBackendResources previous) {
this.workspace = previous.workspace;
this.visitor = new LuceneWorkVisitor( workspace );
this.maxQueueLength = previous.maxQueueLength;
this.queueingExecutor = previous.queueingExecutor;
this.asyncIndexingExecutor = previous.asyncIndexingExecutor;
this.readLock = previous.readLock;
this.writeLock = previous.writeLock;
}

public ExecutorService getQueueingExecutor() {
return queueingExecutor;
public ExecutorService getAsynchIndexingExecutor() {
ExecutorService executor = asyncIndexingExecutor;
if ( executor != null ) {
return executor;
}
else {
return getAsynchIndexingExecutorSynchronized();
}
}

private synchronized ExecutorService getAsynchIndexingExecutorSynchronized() {
ExecutorService executor = asyncIndexingExecutor;
if ( executor != null ) {
return executor;
}
else {
this.asyncIndexingExecutor = Executors.newFixedThreadPool( 1, "Index updates queue processor for index " + indexName, maxQueueLength );
return this.asyncIndexingExecutor;
}
}

public int getMaxQueueLength() {
Expand All @@ -89,23 +106,26 @@ public AbstractWorkspaceImpl getWorkspace() {
public void shutdown() {
//need to close them in this specific order:
try {
flushCloseExecutor( queueingExecutor );
flushCloseExecutor();
}
finally {
workspace.shutDownNow();
}
}

private void flushCloseExecutor(ExecutorService executor) {
executor.shutdown();
private void flushCloseExecutor() {
if ( asyncIndexingExecutor == null ) {
return;
}
asyncIndexingExecutor.shutdown();
try {
executor.awaitTermination( Long.MAX_VALUE, TimeUnit.SECONDS );
asyncIndexingExecutor.awaitTermination( Long.MAX_VALUE, TimeUnit.SECONDS );
}
catch (InterruptedException e) {
log.interruptedWhileWaitingForIndexActivity( e );
Thread.currentThread().interrupt();
}
if ( ! executor.isTerminated() ) {
if ( ! asyncIndexingExecutor.isTerminated() ) {
log.unableToShutdownAsynchronousIndexingByTimeout( indexName );
}
}
Expand Down
Expand Up @@ -20,15 +20,15 @@
* Multiple threads produce one or more {@link org.hibernate.search.backend.LuceneWork}
* by calling {@link #submit(java.util.List, org.hibernate.search.backend.IndexingMonitor)},
* and get blocked until their changes are applied to the index;</p>
* The {@link org.hibernate.search.backend.impl.lucene.BatchSyncProcessor.Consumer} thread will
* The {@link org.hibernate.search.backend.impl.lucene.SyncWorkProcessor.Consumer} thread will
* coalesce changes from multiple threads and apply them in the index, releasing the waiting threads
* at the end.
* <p>
* In the absence of work to be applied, the Consumer thread is parked to avoid busy waiting.</p>
*
* @author gustavonalle
*/
public class BatchSyncProcessor {
final class SyncWorkProcessor implements WorkProcessor {

private static final Log log = LoggerFactory.make();

Expand All @@ -44,7 +44,7 @@ public class BatchSyncProcessor {
* @param resources LuceneResources to obtain the workspace
* @param indexName for debugging purposes
*/
public BatchSyncProcessor(LuceneBackendResources resources, String indexName) {
public SyncWorkProcessor(LuceneBackendResources resources, String indexName) {
this.resources = resources;
this.indexName = indexName;
consumerThread = new Thread( new Consumer(), "Hibernate Search sync consumer thread for index " + indexName );
Expand Down Expand Up @@ -99,7 +99,7 @@ public void shutdown() {
* Handle on the fly rebuilds
* @param resources new instance of {@link org.hibernate.search.backend.impl.lucene.LuceneBackendResources}
*/
void updateResources(LuceneBackendResources resources) {
public void updateResources(LuceneBackendResources resources) {
this.resources = resources;
}

Expand Down
@@ -0,0 +1,43 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.impl.lucene;

import java.util.List;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;


/**
* Defines the contract for async and synchronous processors to apply
* batched of indexing work to the index.
*
* @author Sanne Grinovero <sanne@hibernate.org> (C) 2014 Red Hat Inc.
* @since 5.0
*/
interface WorkProcessor {

/**
* Prepare for the queue to be shut down.
* Needs to flush pending work and shutdown any internal threads.
*/
void shutdown();

/**
* Enqueues a new batch of indexing work to be applied.
* @param workList the list of work
* @param monitor any optional listener which needs to be notified for the work.
*/
void submit(List<LuceneWork> workList, IndexingMonitor monitor);

/**
* Only invoked when some dynamic parameters are reconfigured
* @param resources the new instance to be used
*/
void updateResources(LuceneBackendResources resources);

}
Expand Up @@ -680,4 +680,12 @@ public interface Log extends BasicLogger {
@LogMessage(level = INFO)
@Message(id = 231, value = "Stopping sync consumer thread for index '%s'" )
void stoppingSyncConsumerThread(String indexName);

@LogMessage(level = Level.DEBUG)
@Message(id = 232, value = "Backend for index '%s' started: using a Synchronous batching backend." )
void luceneBackendInitializedSynchronously(String indexName);

@LogMessage(level = Level.DEBUG)
@Message(id = 233, value = "Backend for index '%s' started: using an Asynchronous backend with periodic commits." )
void luceneBackendInitializedAsynchronously(String indexName);
}

0 comments on commit 29473df

Please sign in to comment.