From edaff0a3885a2c6a257834d66904d64d1fced709 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yoann=20Rodi=C3=A8re?= Date: Thu, 4 May 2017 11:06:33 +0200 Subject: [PATCH] HSEARCH-2710 Make sure we only apply works sent over JGroups to the master node --- .../impl/JGroupsMasterMessageListener.java | 15 +++++++++-- .../AbstractJMSHibernateSearchController.java | 6 ++++- .../TransactionalOperationDispatcher.java | 24 +++++++++++------ ...ransactionalOperationExecutorSelector.java | 22 ++++++++-------- .../impl/WorkQueuePerIndexSplitter.java | 26 ++++++++++++++++--- .../engine/impl/ImmutableSearchFactory.java | 8 +++--- .../engine/impl/MutableSearchFactory.java | 5 ++-- .../search/spi/SearchIntegrator.java | 10 +++++-- 8 files changed, 82 insertions(+), 34 deletions(-) diff --git a/backends/jgroups/src/main/java/org/hibernate/search/backend/jgroups/impl/JGroupsMasterMessageListener.java b/backends/jgroups/src/main/java/org/hibernate/search/backend/jgroups/impl/JGroupsMasterMessageListener.java index f1c97448b1e..91a3bbdb08f 100644 --- a/backends/jgroups/src/main/java/org/hibernate/search/backend/jgroups/impl/JGroupsMasterMessageListener.java +++ b/backends/jgroups/src/main/java/org/hibernate/search/backend/jgroups/impl/JGroupsMasterMessageListener.java @@ -38,6 +38,7 @@ public class JGroupsMasterMessageListener implements Receiver { private final SearchIntegrator integrator; private final NodeSelectorService selector; private final LuceneWorkSerializer luceneWorkSerializer; + private volatile OperationDispatcher operationDispatcher; public JGroupsMasterMessageListener(BuildContext context, NodeSelectorService masterNodeSelector, LuceneWorkSerializer luceneWorkSerializer) { this.integrator = context.getUninitializedSearchIntegrator(); @@ -67,12 +68,13 @@ private void applyLuceneWorkLocally(List queue, Message message) { if ( queue != null && !queue.isEmpty() ) { if ( log.isDebugEnabled() ) { log.debugf( - "There are %d Lucene docs received from slave node %s to be processed by master", + "There are %d Lucene docs received from slave node %s to be processed if this node is the master", (Integer) queue.size(), message.getSrc() ); } - OperationDispatcher operationDispatcher = integrator.getRemoteOperationDispatcher(); + + OperationDispatcher operationDispatcher = getOperationDispatcher(); operationDispatcher.dispatch( queue, null ); } else { @@ -80,6 +82,15 @@ private void applyLuceneWorkLocally(List queue, Message message) { } } + private OperationDispatcher getOperationDispatcher() { + if ( operationDispatcher == null ) { + operationDispatcher = integrator.createRemoteOperationDispatcher( + indexManager -> selector.getMasterNodeSelector( indexManager.getIndexName() ).isIndexOwnerLocal() + ); + } + return operationDispatcher; + } + // ------------------------------------------------------------------------------------------------------------------ // Implementations of JGroups interfaces // ------------------------------------------------------------------------------------------------------------------ diff --git a/backends/jms/src/main/java/org/hibernate/search/backend/impl/jms/AbstractJMSHibernateSearchController.java b/backends/jms/src/main/java/org/hibernate/search/backend/impl/jms/AbstractJMSHibernateSearchController.java index 791f2ef49a9..8bc739958c4 100644 --- a/backends/jms/src/main/java/org/hibernate/search/backend/impl/jms/AbstractJMSHibernateSearchController.java +++ b/backends/jms/src/main/java/org/hibernate/search/backend/impl/jms/AbstractJMSHibernateSearchController.java @@ -72,7 +72,7 @@ public void onMessage(Message message) { * and update their shard list. * Thus the index name is rather useless in this case. */ - OperationDispatcher dispatcher = integrator.getRemoteOperationDispatcher(); + OperationDispatcher dispatcher = getOperationDispatcher( integrator ); dispatcher.dispatch( queue, null ); } catch (JMSException e) { @@ -88,6 +88,10 @@ public void onMessage(Message message) { } } + private OperationDispatcher getOperationDispatcher(SearchIntegrator integrator) { + return integrator.createRemoteOperationDispatcher( indexManager -> true ); + } + private void logMessageDetails(ObjectMessage objectMessage, String indexName) throws JMSException { String id = objectMessage.getStringProperty( "HSearchMsgId" ); log.debug( "Message Received for index '" + indexName + "': " + id ); diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationDispatcher.java b/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationDispatcher.java index 45a651ae201..23fd504172e 100644 --- a/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationDispatcher.java +++ b/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationDispatcher.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.Predicate; import org.hibernate.search.backend.IndexingMonitor; import org.hibernate.search.backend.LuceneWork; @@ -29,34 +30,41 @@ */ public class TransactionalOperationDispatcher implements OperationDispatcher { private final Function, EntityIndexBinding> bindingLookup; - private final TransactionalOperationExecutorSelector executorSelector; + private final IndexManagerHolder indexManagerHolder; + private final Predicate indexManagerFilter; public TransactionalOperationDispatcher(SearchIntegrator integrator) { + this( integrator, indexManager -> true ); + } + + public TransactionalOperationDispatcher(SearchIntegrator integrator, Predicate indexManagerFilter) { this( integrator.unwrap( ExtendedSearchIntegrator.class ).getIndexManagerHolder(), - integrator::getIndexBinding ); + integrator::getIndexBinding, indexManagerFilter ); } public TransactionalOperationDispatcher(IndexManagerHolder indexManagerHolder, Map, EntityIndexBinding> bindings) { - this( indexManagerHolder, bindings::get ); + this( indexManagerHolder, bindings::get, indexManager -> true ); } private TransactionalOperationDispatcher(IndexManagerHolder indexManagerHolder, - Function, EntityIndexBinding> bindingLookup) { - this.executorSelector = new TransactionalOperationExecutorSelector( indexManagerHolder ); + Function, EntityIndexBinding> bindingLookup, + Predicate indexManagerFilter) { + this.indexManagerHolder = indexManagerHolder; this.bindingLookup = bindingLookup; + this.indexManagerFilter = indexManagerFilter; } @Override public void dispatch(LuceneWork work, IndexingMonitor monitor) { - WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter(); + WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter( indexManagerHolder, indexManagerFilter ); appendWork( context, work ); context.commitOperations( monitor ); } @Override public void dispatch(List queue, IndexingMonitor monitor) { - WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter(); + WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter( indexManagerHolder, indexManagerFilter ); for ( LuceneWork work : queue ) { appendWork( context, work ); } @@ -67,7 +75,7 @@ private void appendWork(WorkQueuePerIndexSplitter context, LuceneWork work) { final Class entityType = work.getEntityClass(); EntityIndexBinding entityIndexBinding = bindingLookup.apply( entityType ); IndexShardingStrategy shardingStrategy = entityIndexBinding.getSelectionStrategy(); - TransactionalOperationExecutor executor = work.acceptIndexWorkVisitor( executorSelector, null ); + TransactionalOperationExecutor executor = work.acceptIndexWorkVisitor( TransactionalOperationExecutorSelector.INSTANCE, null ); executor.performOperation( work, shardingStrategy, context ); } diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationExecutorSelector.java b/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationExecutorSelector.java index b895c51767b..e0a93d5fc7c 100644 --- a/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationExecutorSelector.java +++ b/engine/src/main/java/org/hibernate/search/backend/impl/TransactionalOperationExecutorSelector.java @@ -6,6 +6,7 @@ */ package org.hibernate.search.backend.impl; + import org.hibernate.search.backend.AddLuceneWork; import org.hibernate.search.backend.DeleteLuceneWork; import org.hibernate.search.backend.FlushLuceneWork; @@ -15,7 +16,6 @@ import org.hibernate.search.backend.PurgeAllLuceneWork; import org.hibernate.search.backend.UpdateLuceneWork; import org.hibernate.search.backend.spi.DeleteByQueryLuceneWork; -import org.hibernate.search.indexes.impl.IndexManagerHolder; import org.hibernate.search.indexes.spi.IndexManager; import org.hibernate.search.store.IndexShardingStrategy; @@ -34,6 +34,8 @@ */ public class TransactionalOperationExecutorSelector implements IndexWorkVisitor { + public static final TransactionalOperationExecutorSelector INSTANCE = new TransactionalOperationExecutorSelector(); + private final AddSelectionExecutor addExecutor = new AddSelectionExecutor(); private final DeleteSelectionExecutor deleteExecutor = new DeleteSelectionExecutor(); private final OptimizeSelectionExecutor optimizeExecutor = new OptimizeSelectionExecutor(); @@ -41,10 +43,8 @@ public class TransactionalOperationExecutorSelector implements IndexWorkVisitor< private final FlushSelectionExecutor flushExecutor = new FlushSelectionExecutor(); private final DeleteByQuerySelectionExecutor deleteByQueryExecutor = new DeleteByQuerySelectionExecutor(); - private final IndexManagerHolder indexManagerHolder; - - public TransactionalOperationExecutorSelector(IndexManagerHolder indexManagerHolder) { - this.indexManagerHolder = indexManagerHolder; + private TransactionalOperationExecutorSelector() { + // Private, use INSTANCE instead } @Override @@ -93,7 +93,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi work.getIdInString(), work.getDocument() ); - context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work ); + context.addToQueue( indexManager, work ); } } @@ -109,7 +109,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi work.getIdInString() ); for ( IndexManager indexManager : indexManagers ) { - context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work ); + context.addToQueue( indexManager, work ); } } @@ -126,7 +126,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi work.getIdInString() ); for ( IndexManager indexManager : indexManagers ) { - context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work ); + context.addToQueue( indexManager, work ); } } @@ -143,7 +143,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi WorkQueuePerIndexSplitter context) { IndexManager[] indexManagers = shardingStrategy.getIndexManagersForAllShards(); for ( IndexManager indexManager : indexManagers ) { - indexManager.performStreamOperation( work, null, false ); + context.performStreamOperation( indexManager, work ); } } @@ -156,7 +156,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi WorkQueuePerIndexSplitter context) { IndexManager[] indexManagers = shardingStrategy.getIndexManagersForAllShards(); for ( IndexManager indexManager : indexManagers ) { - indexManager.performStreamOperation( work, null, false ); + context.performStreamOperation( indexManager, work ); } } @@ -173,7 +173,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi work.getIdInString() ); for ( IndexManager indexManager : indexManagers ) { - context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work ); + context.addToQueue( indexManager, work ); } } diff --git a/engine/src/main/java/org/hibernate/search/backend/impl/WorkQueuePerIndexSplitter.java b/engine/src/main/java/org/hibernate/search/backend/impl/WorkQueuePerIndexSplitter.java index 7ecff8a9e66..d9c6ef00aff 100644 --- a/engine/src/main/java/org/hibernate/search/backend/impl/WorkQueuePerIndexSplitter.java +++ b/engine/src/main/java/org/hibernate/search/backend/impl/WorkQueuePerIndexSplitter.java @@ -8,12 +8,13 @@ import java.util.HashMap; import java.util.LinkedList; -import java.util.List; +import java.util.function.Predicate; import org.hibernate.search.backend.IndexingMonitor; import org.hibernate.search.backend.LuceneWork; import org.hibernate.search.backend.spi.BackendQueueProcessor; import org.hibernate.search.indexes.impl.IndexManagerHolder; +import org.hibernate.search.indexes.spi.IndexManager; /** * Used by {@link org.hibernate.search.backend.impl.TransactionalOperationExecutor} to split a list of operations @@ -23,15 +24,34 @@ */ public class WorkQueuePerIndexSplitter { + private final IndexManagerHolder indexManagerHolder; + private final Predicate indexManagerFilter; private final HashMap queues = new HashMap(); - public List getIndexManagerQueue(String indexName, IndexManagerHolder indexManagerHolder) { + public WorkQueuePerIndexSplitter(IndexManagerHolder indexManagerHolder, + Predicate indexManagerFilter) { + this.indexManagerHolder = indexManagerHolder; + this.indexManagerFilter = indexManagerFilter; + } + + public void addToQueue(IndexManager indexManager, LuceneWork work) { + if ( !indexManagerFilter.test( indexManager ) ) { + return; + } + String indexName = indexManager.getIndexName(); WorkPlan plan = queues.get( indexName ); if ( plan == null ) { plan = new WorkPlan( indexManagerHolder.getBackendQueueProcessor( indexName ) ); queues.put( indexName, plan ); } - return plan.queue; + plan.queue.add( work ); + } + + public void performStreamOperation(IndexManager indexManager, LuceneWork work) { + if ( !indexManagerFilter.test( indexManager ) ) { + return; + } + indexManager.performStreamOperation( work, null, false ); } /** diff --git a/engine/src/main/java/org/hibernate/search/engine/impl/ImmutableSearchFactory.java b/engine/src/main/java/org/hibernate/search/engine/impl/ImmutableSearchFactory.java index 1a3793ef198..054e9ec5e3d 100644 --- a/engine/src/main/java/org/hibernate/search/engine/impl/ImmutableSearchFactory.java +++ b/engine/src/main/java/org/hibernate/search/engine/impl/ImmutableSearchFactory.java @@ -14,6 +14,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.search.Query; @@ -131,7 +132,6 @@ public class ImmutableSearchFactory implements ExtendedSearchIntegratorWithShare private final DatabaseRetrievalMethod defaultDatabaseRetrievalMethod; private final boolean enlistWorkerInTransaction; private final boolean indexUninvertingAllowed; - private final OperationDispatcher remoteOperationDispatcher; private volatile LuceneWorkSerializer workSerializer; public ImmutableSearchFactory(SearchFactoryState state) { @@ -195,8 +195,6 @@ public ImmutableSearchFactory(SearchFactoryState state) { this.indexUninvertingAllowed = ConfigurationParseHelper.getBooleanValue( configurationProperties, Environment.INDEX_UNINVERTING_ALLOWED, false ); - - this.remoteOperationDispatcher = new TransactionalOperationDispatcher( this ); } private ObjectLookupMethod determineDefaultObjectLookupMethod() { @@ -694,8 +692,8 @@ public HSQuery createLuceneBasedHSQuery() { } @Override - public OperationDispatcher getRemoteOperationDispatcher() { - return remoteOperationDispatcher; + public OperationDispatcher createRemoteOperationDispatcher(Predicate predicate) { + return new TransactionalOperationDispatcher( this, predicate ); } } diff --git a/engine/src/main/java/org/hibernate/search/engine/impl/MutableSearchFactory.java b/engine/src/main/java/org/hibernate/search/engine/impl/MutableSearchFactory.java index 34235672b4c..4665997f6ad 100644 --- a/engine/src/main/java/org/hibernate/search/engine/impl/MutableSearchFactory.java +++ b/engine/src/main/java/org/hibernate/search/engine/impl/MutableSearchFactory.java @@ -11,6 +11,7 @@ import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.search.Query; @@ -404,8 +405,8 @@ public HSQuery createLuceneBasedHSQuery() { } @Override - public OperationDispatcher getRemoteOperationDispatcher() { - return delegate.getRemoteOperationDispatcher(); + public OperationDispatcher createRemoteOperationDispatcher(Predicate indexManagerFilter) { + return delegate.createRemoteOperationDispatcher( indexManagerFilter ); } } diff --git a/engine/src/main/java/org/hibernate/search/spi/SearchIntegrator.java b/engine/src/main/java/org/hibernate/search/spi/SearchIntegrator.java index 7f3f23302ce..9fd814077bd 100644 --- a/engine/src/main/java/org/hibernate/search/spi/SearchIntegrator.java +++ b/engine/src/main/java/org/hibernate/search/spi/SearchIntegrator.java @@ -7,6 +7,7 @@ package org.hibernate.search.spi; import java.util.Set; +import java.util.function.Predicate; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.search.Query; @@ -237,11 +238,16 @@ public interface SearchIntegrator extends AutoCloseable { LuceneWorkSerializer getWorkSerializer(); /** + * @param indexManagerFilter A predicate allowing to exclude index managers from + * dispatching. Works will not be applied to these index managers. * @return An operation dispatcher allowing to insert works retrieved from - * remote sources (e.g. JMS or JGroups slaves). + * remote sources (e.g. JMS or JGroups slaves), but only for index managers + * verifying the given predicate. + * This allows JMS or JGroups integrations to perform checks on index managers that + * wouldn't exist before the dispatch, in the case of dynamic sharding in particular. * * @hsearch.experimental Operation dispatchers are under active development. * You should be prepared for incompatible changes in future releases. */ - OperationDispatcher getRemoteOperationDispatcher(); + OperationDispatcher createRemoteOperationDispatcher(Predicate indexManagerFilter); }