Skip to content

Commit

Permalink
HSEARCH-2710 Make sure we only apply works sent over JGroups to the m…
Browse files Browse the repository at this point in the history
…aster node
  • Loading branch information
yrodiere authored and Sanne committed May 9, 2017
1 parent 61540cc commit edaff0a
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 34 deletions.
Expand Up @@ -38,6 +38,7 @@ public class JGroupsMasterMessageListener implements Receiver {
private final SearchIntegrator integrator;
private final NodeSelectorService selector;
private final LuceneWorkSerializer luceneWorkSerializer;
private volatile OperationDispatcher operationDispatcher;

public JGroupsMasterMessageListener(BuildContext context, NodeSelectorService masterNodeSelector, LuceneWorkSerializer luceneWorkSerializer) {
this.integrator = context.getUninitializedSearchIntegrator();
Expand Down Expand Up @@ -67,19 +68,29 @@ private void applyLuceneWorkLocally(List<LuceneWork> queue, Message message) {
if ( queue != null && !queue.isEmpty() ) {
if ( log.isDebugEnabled() ) {
log.debugf(
"There are %d Lucene docs received from slave node %s to be processed by master",
"There are %d Lucene docs received from slave node %s to be processed if this node is the master",
(Integer) queue.size(),
message.getSrc()
);
}
OperationDispatcher operationDispatcher = integrator.getRemoteOperationDispatcher();

OperationDispatcher operationDispatcher = getOperationDispatcher();
operationDispatcher.dispatch( queue, null );
}
else {
log.receivedEmptyLuceneWorksInMessage();
}
}

private OperationDispatcher getOperationDispatcher() {
if ( operationDispatcher == null ) {
operationDispatcher = integrator.createRemoteOperationDispatcher(
indexManager -> selector.getMasterNodeSelector( indexManager.getIndexName() ).isIndexOwnerLocal()
);
}
return operationDispatcher;
}

// ------------------------------------------------------------------------------------------------------------------
// Implementations of JGroups interfaces
// ------------------------------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -72,7 +72,7 @@ public void onMessage(Message message) {
* and update their shard list.
* Thus the index name is rather useless in this case.
*/
OperationDispatcher dispatcher = integrator.getRemoteOperationDispatcher();
OperationDispatcher dispatcher = getOperationDispatcher( integrator );
dispatcher.dispatch( queue, null );
}
catch (JMSException e) {
Expand All @@ -88,6 +88,10 @@ public void onMessage(Message message) {
}
}

private OperationDispatcher getOperationDispatcher(SearchIntegrator integrator) {
return integrator.createRemoteOperationDispatcher( indexManager -> true );
}

private void logMessageDetails(ObjectMessage objectMessage, String indexName) throws JMSException {
String id = objectMessage.getStringProperty( "HSearchMsgId" );
log.debug( "Message Received for index '" + indexName + "': " + id );
Expand Down
Expand Up @@ -9,6 +9,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
Expand All @@ -29,34 +30,41 @@
*/
public class TransactionalOperationDispatcher implements OperationDispatcher {
private final Function<Class<?>, EntityIndexBinding> bindingLookup;
private final TransactionalOperationExecutorSelector executorSelector;
private final IndexManagerHolder indexManagerHolder;
private final Predicate<IndexManager> indexManagerFilter;

public TransactionalOperationDispatcher(SearchIntegrator integrator) {
this( integrator, indexManager -> true );
}

public TransactionalOperationDispatcher(SearchIntegrator integrator, Predicate<IndexManager> indexManagerFilter) {
this( integrator.unwrap( ExtendedSearchIntegrator.class ).getIndexManagerHolder(),
integrator::getIndexBinding );
integrator::getIndexBinding, indexManagerFilter );
}

public TransactionalOperationDispatcher(IndexManagerHolder indexManagerHolder,
Map<Class<?>, EntityIndexBinding> bindings) {
this( indexManagerHolder, bindings::get );
this( indexManagerHolder, bindings::get, indexManager -> true );
}

private TransactionalOperationDispatcher(IndexManagerHolder indexManagerHolder,
Function<Class<?>, EntityIndexBinding> bindingLookup) {
this.executorSelector = new TransactionalOperationExecutorSelector( indexManagerHolder );
Function<Class<?>, EntityIndexBinding> bindingLookup,
Predicate<IndexManager> indexManagerFilter) {
this.indexManagerHolder = indexManagerHolder;
this.bindingLookup = bindingLookup;
this.indexManagerFilter = indexManagerFilter;
}

@Override
public void dispatch(LuceneWork work, IndexingMonitor monitor) {
WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter();
WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter( indexManagerHolder, indexManagerFilter );
appendWork( context, work );
context.commitOperations( monitor );
}

@Override
public void dispatch(List<LuceneWork> queue, IndexingMonitor monitor) {
WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter();
WorkQueuePerIndexSplitter context = new WorkQueuePerIndexSplitter( indexManagerHolder, indexManagerFilter );
for ( LuceneWork work : queue ) {
appendWork( context, work );
}
Expand All @@ -67,7 +75,7 @@ private void appendWork(WorkQueuePerIndexSplitter context, LuceneWork work) {
final Class<?> entityType = work.getEntityClass();
EntityIndexBinding entityIndexBinding = bindingLookup.apply( entityType );
IndexShardingStrategy shardingStrategy = entityIndexBinding.getSelectionStrategy();
TransactionalOperationExecutor executor = work.acceptIndexWorkVisitor( executorSelector, null );
TransactionalOperationExecutor executor = work.acceptIndexWorkVisitor( TransactionalOperationExecutorSelector.INSTANCE, null );
executor.performOperation( work, shardingStrategy, context );
}

Expand Down
Expand Up @@ -6,6 +6,7 @@
*/
package org.hibernate.search.backend.impl;


import org.hibernate.search.backend.AddLuceneWork;
import org.hibernate.search.backend.DeleteLuceneWork;
import org.hibernate.search.backend.FlushLuceneWork;
Expand All @@ -15,7 +16,6 @@
import org.hibernate.search.backend.PurgeAllLuceneWork;
import org.hibernate.search.backend.UpdateLuceneWork;
import org.hibernate.search.backend.spi.DeleteByQueryLuceneWork;
import org.hibernate.search.indexes.impl.IndexManagerHolder;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.store.IndexShardingStrategy;

Expand All @@ -34,17 +34,17 @@
*/
public class TransactionalOperationExecutorSelector implements IndexWorkVisitor<Void, TransactionalOperationExecutor> {

public static final TransactionalOperationExecutorSelector INSTANCE = new TransactionalOperationExecutorSelector();

private final AddSelectionExecutor addExecutor = new AddSelectionExecutor();
private final DeleteSelectionExecutor deleteExecutor = new DeleteSelectionExecutor();
private final OptimizeSelectionExecutor optimizeExecutor = new OptimizeSelectionExecutor();
private final PurgeAllSelectionExecutor purgeExecutor = new PurgeAllSelectionExecutor();
private final FlushSelectionExecutor flushExecutor = new FlushSelectionExecutor();
private final DeleteByQuerySelectionExecutor deleteByQueryExecutor = new DeleteByQuerySelectionExecutor();

private final IndexManagerHolder indexManagerHolder;

public TransactionalOperationExecutorSelector(IndexManagerHolder indexManagerHolder) {
this.indexManagerHolder = indexManagerHolder;
private TransactionalOperationExecutorSelector() {
// Private, use INSTANCE instead
}

@Override
Expand Down Expand Up @@ -93,7 +93,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi
work.getIdInString(),
work.getDocument()
);
context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work );
context.addToQueue( indexManager, work );
}

}
Expand All @@ -109,7 +109,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi
work.getIdInString()
);
for ( IndexManager indexManager : indexManagers ) {
context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work );
context.addToQueue( indexManager, work );
}
}

Expand All @@ -126,7 +126,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi
work.getIdInString()
);
for ( IndexManager indexManager : indexManagers ) {
context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work );
context.addToQueue( indexManager, work );
}
}

Expand All @@ -143,7 +143,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi
WorkQueuePerIndexSplitter context) {
IndexManager[] indexManagers = shardingStrategy.getIndexManagersForAllShards();
for ( IndexManager indexManager : indexManagers ) {
indexManager.performStreamOperation( work, null, false );
context.performStreamOperation( indexManager, work );
}
}

Expand All @@ -156,7 +156,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi
WorkQueuePerIndexSplitter context) {
IndexManager[] indexManagers = shardingStrategy.getIndexManagersForAllShards();
for ( IndexManager indexManager : indexManagers ) {
indexManager.performStreamOperation( work, null, false );
context.performStreamOperation( indexManager, work );
}
}

Expand All @@ -173,7 +173,7 @@ public final void performOperation(LuceneWork work, IndexShardingStrategy shardi
work.getIdInString()
);
for ( IndexManager indexManager : indexManagers ) {
context.getIndexManagerQueue( indexManager.getIndexName(), indexManagerHolder ).add( work );
context.addToQueue( indexManager, work );
}
}

Expand Down
Expand Up @@ -8,12 +8,13 @@

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Predicate;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.indexes.impl.IndexManagerHolder;
import org.hibernate.search.indexes.spi.IndexManager;

/**
* Used by {@link org.hibernate.search.backend.impl.TransactionalOperationExecutor} to split a list of operations
Expand All @@ -23,15 +24,34 @@
*/
public class WorkQueuePerIndexSplitter {

private final IndexManagerHolder indexManagerHolder;
private final Predicate<IndexManager> indexManagerFilter;
private final HashMap<String,WorkPlan> queues = new HashMap<String,WorkPlan>();

public List<LuceneWork> getIndexManagerQueue(String indexName, IndexManagerHolder indexManagerHolder) {
public WorkQueuePerIndexSplitter(IndexManagerHolder indexManagerHolder,
Predicate<IndexManager> indexManagerFilter) {
this.indexManagerHolder = indexManagerHolder;
this.indexManagerFilter = indexManagerFilter;
}

public void addToQueue(IndexManager indexManager, LuceneWork work) {
if ( !indexManagerFilter.test( indexManager ) ) {
return;
}
String indexName = indexManager.getIndexName();
WorkPlan plan = queues.get( indexName );
if ( plan == null ) {
plan = new WorkPlan( indexManagerHolder.getBackendQueueProcessor( indexName ) );
queues.put( indexName, plan );
}
return plan.queue;
plan.queue.add( work );
}

public void performStreamOperation(IndexManager indexManager, LuceneWork work) {
if ( !indexManagerFilter.test( indexManager ) ) {
return;
}
indexManager.performStreamOperation( work, null, false );
}

/**
Expand Down
Expand Up @@ -14,6 +14,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -131,7 +132,6 @@ public class ImmutableSearchFactory implements ExtendedSearchIntegratorWithShare
private final DatabaseRetrievalMethod defaultDatabaseRetrievalMethod;
private final boolean enlistWorkerInTransaction;
private final boolean indexUninvertingAllowed;
private final OperationDispatcher remoteOperationDispatcher;
private volatile LuceneWorkSerializer workSerializer;

public ImmutableSearchFactory(SearchFactoryState state) {
Expand Down Expand Up @@ -195,8 +195,6 @@ public ImmutableSearchFactory(SearchFactoryState state) {
this.indexUninvertingAllowed = ConfigurationParseHelper.getBooleanValue(
configurationProperties, Environment.INDEX_UNINVERTING_ALLOWED, false
);

this.remoteOperationDispatcher = new TransactionalOperationDispatcher( this );
}

private ObjectLookupMethod determineDefaultObjectLookupMethod() {
Expand Down Expand Up @@ -694,8 +692,8 @@ public HSQuery createLuceneBasedHSQuery() {
}

@Override
public OperationDispatcher getRemoteOperationDispatcher() {
return remoteOperationDispatcher;
public OperationDispatcher createRemoteOperationDispatcher(Predicate<IndexManager> predicate) {
return new TransactionalOperationDispatcher( this, predicate );
}

}
Expand Up @@ -11,6 +11,7 @@
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -404,8 +405,8 @@ public HSQuery createLuceneBasedHSQuery() {
}

@Override
public OperationDispatcher getRemoteOperationDispatcher() {
return delegate.getRemoteOperationDispatcher();
public OperationDispatcher createRemoteOperationDispatcher(Predicate<IndexManager> indexManagerFilter) {
return delegate.createRemoteOperationDispatcher( indexManagerFilter );
}

}
Expand Up @@ -7,6 +7,7 @@
package org.hibernate.search.spi;

import java.util.Set;
import java.util.function.Predicate;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -237,11 +238,16 @@ public interface SearchIntegrator extends AutoCloseable {
LuceneWorkSerializer getWorkSerializer();

/**
* @param indexManagerFilter A predicate allowing to exclude index managers from
* dispatching. Works will not be applied to these index managers.
* @return An operation dispatcher allowing to insert works retrieved from
* remote sources (e.g. JMS or JGroups slaves).
* remote sources (e.g. JMS or JGroups slaves), but only for index managers
* verifying the given predicate.
* This allows JMS or JGroups integrations to perform checks on index managers that
* wouldn't exist before the dispatch, in the case of dynamic sharding in particular.
*
* @hsearch.experimental Operation dispatchers are under active development.
* You should be prepared for incompatible changes in future releases.
*/
OperationDispatcher getRemoteOperationDispatcher();
OperationDispatcher createRemoteOperationDispatcher(Predicate<IndexManager> indexManagerFilter);
}

0 comments on commit edaff0a

Please sign in to comment.