From 29473df4cf2d5f9dc2c20c85da06d35b497efb03 Mon Sep 17 00:00:00 2001
From: Sanne Grinovero
Date: Fri, 14 Nov 2014 00:36:14 +0000
Subject: [PATCH] HSEARCH-1735 Clearly distinguish the Async from the BatchSync
Processing strategies and make sure no unnecessary thread is started
---
.../impl/lucene/AsyncWorkProcessor.java | 50 +++++++++++++++++++
.../lucene/LuceneBackendQueueProcessor.java | 31 ++++++------
.../impl/lucene/LuceneBackendResources.java | 40 +++++++++++----
...cProcessor.java => SyncWorkProcessor.java} | 8 +--
.../backend/impl/lucene/WorkProcessor.java | 43 ++++++++++++++++
.../search/util/logging/impl/Log.java | 8 +++
6 files changed, 150 insertions(+), 30 deletions(-)
create mode 100644 engine/src/main/java/org/hibernate/search/backend/impl/lucene/AsyncWorkProcessor.java
rename engine/src/main/java/org/hibernate/search/backend/impl/lucene/{BatchSyncProcessor.java => SyncWorkProcessor.java} (93%)
create mode 100644 engine/src/main/java/org/hibernate/search/backend/impl/lucene/WorkProcessor.java
diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/AsyncWorkProcessor.java b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/AsyncWorkProcessor.java
new file mode 100644
index 00000000000..320688d2ce5
--- /dev/null
+++ b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/AsyncWorkProcessor.java
@@ -0,0 +1,50 @@
+/*
+ * Hibernate Search, full-text search for your domain model
+ *
+ * License: GNU Lesser General Public License (LGPL), version 2.1 or later
+ * See the lgpl.txt file in the root directory or .
+ */
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.List;
+
+import org.hibernate.search.backend.IndexingMonitor;
+import org.hibernate.search.backend.LuceneWork;
+
+
+/**
+ * This is the asynchronous backend logic for the LuceneBackendQueueProcessor.
+ * It merely forwards batches of indexing work to the async Executor for this indexing backend.
+ *
+ * @author Sanne Grinovero (C) 2014 Red Hat Inc.
+ * @since 5.0
+ */
+final class AsyncWorkProcessor implements WorkProcessor {
+
+ private volatile LuceneBackendResources resources;
+
+ public AsyncWorkProcessor(LuceneBackendResources resources) {
+ this.resources = resources;
+ }
+
+ @Override
+ public void shutdown() {
+ //no-op
+ }
+
+ @Override
+ public void submit(List workList, IndexingMonitor monitor) {
+ LuceneBackendQueueTask luceneBackendQueueProcessor = new LuceneBackendQueueTask(
+ workList,
+ resources,
+ monitor
+ );
+ resources.getAsynchIndexingExecutor().execute( luceneBackendQueueProcessor );
+ }
+
+ @Override
+ public void updateResources(LuceneBackendResources resources) {
+ this.resources = resources;
+ }
+
+}
diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
index 7b455ca6e40..8bda5d6a27d 100644
--- a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
+++ b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
@@ -37,7 +37,7 @@ public class LuceneBackendQueueProcessor implements BackendQueueProcessor {
private boolean sync;
private AbstractWorkspaceImpl workspaceOverride;
private LuceneBackendTaskStreamer streamWorker;
- private BatchSyncProcessor batchSyncProcessor;
+ private WorkProcessor workProcessor;
@Override
public void initialize(Properties props, WorkerBuildContext context, DirectoryBasedIndexManager indexManager) {
@@ -49,14 +49,23 @@ public void initialize(Properties props, WorkerBuildContext context, DirectoryBa
}
resources = new LuceneBackendResources( context, indexManager, props, workspaceOverride );
streamWorker = new LuceneBackendTaskStreamer( resources );
- batchSyncProcessor = new BatchSyncProcessor( resources, indexManager.getIndexName() );
- batchSyncProcessor.start();
+ String indexName = indexManager.getIndexName();
+ if ( sync ) {
+ final SyncWorkProcessor batchSyncProcessor = new SyncWorkProcessor( resources, indexName );
+ batchSyncProcessor.start();
+ log.luceneBackendInitializedSynchronously( indexName );
+ workProcessor = batchSyncProcessor;
+ }
+ else {
+ workProcessor = new AsyncWorkProcessor( resources );
+ log.luceneBackendInitializedAsynchronously( indexName );
+ }
}
@Override
public void close() {
resources.shutdown();
- batchSyncProcessor.shutdown();
+ workProcessor.shutdown();
}
@Override
@@ -72,17 +81,7 @@ public void applyWork(List workList, IndexingMonitor monitor) {
if ( workList == null ) {
throw new IllegalArgumentException( "workList should not be null" );
}
- if ( sync ) {
- batchSyncProcessor.submit( workList, monitor );
- }
- else {
- LuceneBackendQueueTask luceneBackendQueueProcessor = new LuceneBackendQueueTask(
- workList,
- resources,
- monitor
- );
- resources.getQueueingExecutor().execute( luceneBackendQueueProcessor );
- }
+ workProcessor.submit( workList, monitor );
}
@Override
@@ -107,7 +106,7 @@ public void setCustomWorkspace(AbstractWorkspaceImpl workspace) {
@Override
public void indexMappingChanged() {
resources = resources.onTheFlyRebuild();
- batchSyncProcessor.updateResources( resources );
+ workProcessor.updateResources( resources );
}
}
diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendResources.java b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendResources.java
index d3c999c0802..c905aa5e18c 100755
--- a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendResources.java
+++ b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendResources.java
@@ -36,20 +36,20 @@ public final class LuceneBackendResources {
private final LuceneWorkVisitor visitor;
private final AbstractWorkspaceImpl workspace;
private final ErrorHandler errorHandler;
- private final ExecutorService queueingExecutor;
private final int maxQueueLength;
private final String indexName;
private final ReadLock readLock;
private final WriteLock writeLock;
+ private volatile ExecutorService asyncIndexingExecutor;
+
LuceneBackendResources(WorkerBuildContext context, DirectoryBasedIndexManager indexManager, Properties props, AbstractWorkspaceImpl workspace) {
this.indexName = indexManager.getIndexName();
this.errorHandler = context.getErrorHandler();
this.workspace = workspace;
this.visitor = new LuceneWorkVisitor( workspace );
this.maxQueueLength = PropertiesParseHelper.extractMaxQueueSize( indexName, props );
- this.queueingExecutor = Executors.newFixedThreadPool( 1, "Index updates queue processor for index " + indexName, maxQueueLength );
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
@@ -61,13 +61,30 @@ private LuceneBackendResources(LuceneBackendResources previous) {
this.workspace = previous.workspace;
this.visitor = new LuceneWorkVisitor( workspace );
this.maxQueueLength = previous.maxQueueLength;
- this.queueingExecutor = previous.queueingExecutor;
+ this.asyncIndexingExecutor = previous.asyncIndexingExecutor;
this.readLock = previous.readLock;
this.writeLock = previous.writeLock;
}
- public ExecutorService getQueueingExecutor() {
- return queueingExecutor;
+ public ExecutorService getAsynchIndexingExecutor() {
+ ExecutorService executor = asyncIndexingExecutor;
+ if ( executor != null ) {
+ return executor;
+ }
+ else {
+ return getAsynchIndexingExecutorSynchronized();
+ }
+ }
+
+ private synchronized ExecutorService getAsynchIndexingExecutorSynchronized() {
+ ExecutorService executor = asyncIndexingExecutor;
+ if ( executor != null ) {
+ return executor;
+ }
+ else {
+ this.asyncIndexingExecutor = Executors.newFixedThreadPool( 1, "Index updates queue processor for index " + indexName, maxQueueLength );
+ return this.asyncIndexingExecutor;
+ }
}
public int getMaxQueueLength() {
@@ -89,23 +106,26 @@ public AbstractWorkspaceImpl getWorkspace() {
public void shutdown() {
//need to close them in this specific order:
try {
- flushCloseExecutor( queueingExecutor );
+ flushCloseExecutor();
}
finally {
workspace.shutDownNow();
}
}
- private void flushCloseExecutor(ExecutorService executor) {
- executor.shutdown();
+ private void flushCloseExecutor() {
+ if ( asyncIndexingExecutor == null ) {
+ return;
+ }
+ asyncIndexingExecutor.shutdown();
try {
- executor.awaitTermination( Long.MAX_VALUE, TimeUnit.SECONDS );
+ asyncIndexingExecutor.awaitTermination( Long.MAX_VALUE, TimeUnit.SECONDS );
}
catch (InterruptedException e) {
log.interruptedWhileWaitingForIndexActivity( e );
Thread.currentThread().interrupt();
}
- if ( ! executor.isTerminated() ) {
+ if ( ! asyncIndexingExecutor.isTerminated() ) {
log.unableToShutdownAsynchronousIndexingByTimeout( indexName );
}
}
diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/BatchSyncProcessor.java b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/SyncWorkProcessor.java
similarity index 93%
rename from engine/src/main/java/org/hibernate/search/backend/impl/lucene/BatchSyncProcessor.java
rename to engine/src/main/java/org/hibernate/search/backend/impl/lucene/SyncWorkProcessor.java
index 4e3946e1cb1..ed2631728d7 100644
--- a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/BatchSyncProcessor.java
+++ b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/SyncWorkProcessor.java
@@ -20,7 +20,7 @@
* Multiple threads produce one or more {@link org.hibernate.search.backend.LuceneWork}
* by calling {@link #submit(java.util.List, org.hibernate.search.backend.IndexingMonitor)},
* and get blocked until their changes are applied to the index;
- * The {@link org.hibernate.search.backend.impl.lucene.BatchSyncProcessor.Consumer} thread will
+ * The {@link org.hibernate.search.backend.impl.lucene.SyncWorkProcessor.Consumer} thread will
* coalesce changes from multiple threads and apply them in the index, releasing the waiting threads
* at the end.
*
@@ -28,7 +28,7 @@
*
* @author gustavonalle
*/
-public class BatchSyncProcessor {
+final class SyncWorkProcessor implements WorkProcessor {
private static final Log log = LoggerFactory.make();
@@ -44,7 +44,7 @@ public class BatchSyncProcessor {
* @param resources LuceneResources to obtain the workspace
* @param indexName for debugging purposes
*/
- public BatchSyncProcessor(LuceneBackendResources resources, String indexName) {
+ public SyncWorkProcessor(LuceneBackendResources resources, String indexName) {
this.resources = resources;
this.indexName = indexName;
consumerThread = new Thread( new Consumer(), "Hibernate Search sync consumer thread for index " + indexName );
@@ -99,7 +99,7 @@ public void shutdown() {
* Handle on the fly rebuilds
* @param resources new instance of {@link org.hibernate.search.backend.impl.lucene.LuceneBackendResources}
*/
- void updateResources(LuceneBackendResources resources) {
+ public void updateResources(LuceneBackendResources resources) {
this.resources = resources;
}
diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/lucene/WorkProcessor.java b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/WorkProcessor.java
new file mode 100644
index 00000000000..ec9b7f8ce5f
--- /dev/null
+++ b/engine/src/main/java/org/hibernate/search/backend/impl/lucene/WorkProcessor.java
@@ -0,0 +1,43 @@
+/*
+ * Hibernate Search, full-text search for your domain model
+ *
+ * License: GNU Lesser General Public License (LGPL), version 2.1 or later
+ * See the lgpl.txt file in the root directory or .
+ */
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.List;
+
+import org.hibernate.search.backend.IndexingMonitor;
+import org.hibernate.search.backend.LuceneWork;
+
+
+/**
+ * Defines the contract for async and synchronous processors to apply
+ * batched of indexing work to the index.
+ *
+ * @author Sanne Grinovero (C) 2014 Red Hat Inc.
+ * @since 5.0
+ */
+interface WorkProcessor {
+
+ /**
+ * Prepare for the queue to be shut down.
+ * Needs to flush pending work and shutdown any internal threads.
+ */
+ void shutdown();
+
+ /**
+ * Enqueues a new batch of indexing work to be applied.
+ * @param workList the list of work
+ * @param monitor any optional listener which needs to be notified for the work.
+ */
+ void submit(List workList, IndexingMonitor monitor);
+
+ /**
+ * Only invoked when some dynamic parameters are reconfigured
+ * @param resources the new instance to be used
+ */
+ void updateResources(LuceneBackendResources resources);
+
+}
diff --git a/engine/src/main/java/org/hibernate/search/util/logging/impl/Log.java b/engine/src/main/java/org/hibernate/search/util/logging/impl/Log.java
index b6f1ef3addd..5dac9bf0477 100644
--- a/engine/src/main/java/org/hibernate/search/util/logging/impl/Log.java
+++ b/engine/src/main/java/org/hibernate/search/util/logging/impl/Log.java
@@ -680,4 +680,12 @@ public interface Log extends BasicLogger {
@LogMessage(level = INFO)
@Message(id = 231, value = "Stopping sync consumer thread for index '%s'" )
void stoppingSyncConsumerThread(String indexName);
+
+ @LogMessage(level = Level.DEBUG)
+ @Message(id = 232, value = "Backend for index '%s' started: using a Synchronous batching backend." )
+ void luceneBackendInitializedSynchronously(String indexName);
+
+ @LogMessage(level = Level.DEBUG)
+ @Message(id = 233, value = "Backend for index '%s' started: using an Asynchronous backend with periodic commits." )
+ void luceneBackendInitializedAsynchronously(String indexName);
}