Skip to content

Commit

Permalink
HSEARCH-2675 In the JGroups backend, create the delegate backend on d…
Browse files Browse the repository at this point in the history
…emand

So that we support dynamic master node re-election. Previously the
delegate backend was only created if the current node was initially a
master, preventing a former slave to take on the role of master.
  • Loading branch information
yrodiere authored and Sanne committed May 9, 2017
1 parent edaff0a commit 47f0967
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 19 deletions.
Expand Up @@ -131,17 +131,16 @@ public JGroupsBackendQueueProcessor createQueueProcessor(IndexManager indexManag
messageSender, indexManager, masterNodeSelector, luceneWorkSerializer,
block, messageTimeout );

BackendQueueProcessor delegateQueueProcessor = null;
if ( selectionStrategy.isIndexOwnerLocal() ) {
delegateQueueProcessor = BackendFactory.createBackend(
delegateBackendName, indexManager, context, properties );
}

JGroupsBackendQueueProcessor queueProcessor = new JGroupsBackendQueueProcessor(
selectionStrategy, jgroupsProcessor, delegateQueueProcessor );
selectionStrategy, jgroupsProcessor,
() -> createDelegateQueueProcessor( indexManager, context ) );
return queueProcessor;
}

private BackendQueueProcessor createDelegateQueueProcessor(IndexManager indexManager, WorkerBuildContext context) {
return BackendFactory.createBackend(delegateBackendName, indexManager, context, properties );
}

protected NodeSelectorStrategy createNodeSelectorStrategy(IndexManager indexManager) {
return new AutoNodeSelector( indexManager.getIndexName() );
}
Expand Down
Expand Up @@ -8,6 +8,7 @@

import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
Expand All @@ -26,28 +27,36 @@ public class JGroupsBackendQueueProcessor implements BackendQueueProcessor {

private final NodeSelectorStrategy selectionStrategy;
private final JGroupsBackendQueueTask jgroupsProcessor;
private final BackendQueueProcessor delegatedBackend;
private final Supplier<BackendQueueProcessor> delegatedBackendFactory;
private volatile BackendQueueProcessor delegate;

public JGroupsBackendQueueProcessor(NodeSelectorStrategy selectionStrategy,
JGroupsBackendQueueTask jgroupsProcessor,
BackendQueueProcessor delegatedBackend) {
Supplier<BackendQueueProcessor> delegatedBackendFactory) {
this.selectionStrategy = selectionStrategy;
this.jgroupsProcessor = jgroupsProcessor;
this.delegatedBackend = delegatedBackend;
this.delegatedBackendFactory = delegatedBackendFactory;
if ( selectionStrategy.isIndexOwnerLocal() ) {
/*
* Eager initialization if we know from the start we are the master.
* This allows in particular the delegate backend to fail fast
* if there is a configuration issue.
*/
getOrCreateDelegate();
}
}

@Override
public void close() {
if ( selectionStrategy.isIndexOwnerLocal() ) {
//TODO verify all delegates have been closed when ownership was lost before [HSEARCH-2060]
delegatedBackend.close();
public synchronized void close() {
if ( delegate != null ) {
delegate.close();
}
}

@Override
public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
if ( selectionStrategy.isIndexOwnerLocal() ) {
delegatedBackend.applyWork( workList, monitor );
getOrCreateDelegate().applyWork( workList, monitor );
}
else {
if ( workList == null ) {
Expand All @@ -60,20 +69,33 @@ public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
@Override
public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor) {
if ( selectionStrategy.isIndexOwnerLocal() ) {
delegatedBackend.applyStreamWork( singleOperation, monitor );
getOrCreateDelegate().applyStreamWork( singleOperation, monitor );
}
else {
//TODO optimize for single operation?
jgroupsProcessor.sendLuceneWorkList( Collections.singletonList( singleOperation ) );
}
}

private BackendQueueProcessor getOrCreateDelegate() {
if ( delegate != null ) {
return delegate;
}
synchronized ( this ) {
if ( delegate != null ) {
return delegate;
}
delegate = delegatedBackendFactory.get();
return delegate;
}
}

public boolean blocksForACK() {
return jgroupsProcessor.blocksForACK();
}

public BackendQueueProcessor getDelegatedBackend() {
return delegatedBackend;
public BackendQueueProcessor getExistingDelegate() {
return delegate;
}

public long getMessageTimeout() {
Expand Down
Expand Up @@ -125,7 +125,7 @@ public void testSynchAsConfigured() {
public void alternativeBackendConfiguration() {
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( masterNode, "dvds" );
JGroupsReceivingMockBackendQueueProcessor jgroupsProcessor = (JGroupsReceivingMockBackendQueueProcessor) backendQueueProcessor;
BackendQueueProcessor delegatedBackend = jgroupsProcessor.getDelegate().getDelegatedBackend();
BackendQueueProcessor delegatedBackend = jgroupsProcessor.getDelegate().getExistingDelegate();
Assert.assertTrue( "dvds backend was configured with a delegate to blackhole but it's not using it", delegatedBackend instanceof BlackHoleBackendQueueProcessor );
}

Expand Down

0 comments on commit 47f0967

Please sign in to comment.