Skip to content

Commit

Permalink
HSEARCH-1886 Introduce Backends, instantiated right when an index man…
Browse files Browse the repository at this point in the history
…ager group is created

This will allow to start background services, such as JMS/JGroups
message consumers on the master nodes, even if there is no index manager
yet (as is the case with dynamic sharding when a *slave* triggers the
creation of a shard).
  • Loading branch information
yrodiere authored and Sanne committed Apr 19, 2017
1 parent 2de925d commit 0fc2d22
Show file tree
Hide file tree
Showing 23 changed files with 766 additions and 207 deletions.
@@ -0,0 +1,42 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.jgroups.impl;

import java.util.Properties;

import org.hibernate.search.backend.spi.Backend;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.WorkerBuildContext;


/**
* @author Yoann Rodiere
*/
public class JGroupsBackend implements Backend {

private Properties properties;

@Override
public void initialize(Properties properties, WorkerBuildContext context) {
this.properties = properties;
}

@Override
@SuppressWarnings("deprecation")
public BackendQueueProcessor createQueueProcessor(IndexManager indexManager, WorkerBuildContext context) {
NodeSelectorStrategy nodeSelectorStrategy = createNodeSelectorStrategy( indexManager );
BackendQueueProcessor queueProcessor = new JGroupsBackendQueueProcessor( nodeSelectorStrategy );
queueProcessor.initialize( properties, context, indexManager );
return queueProcessor;
}

protected NodeSelectorStrategy createNodeSelectorStrategy(IndexManager indexManager) {
return new AutoNodeSelector( indexManager.getIndexName() );
}

}
@@ -0,0 +1,21 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.jgroups.impl;

import org.hibernate.search.indexes.spi.IndexManager;

/**
* @author Yoann Rodiere
*/
public class JGroupsMasterBackend extends JGroupsBackend {

@Override
protected NodeSelectorStrategy createNodeSelectorStrategy(IndexManager indexManager) {
return new MasterNodeSelector();
}

}
@@ -0,0 +1,21 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.jgroups.impl;

import org.hibernate.search.indexes.spi.IndexManager;

/**
* @author Yoann Rodiere
*/
public class JGroupsSlaveBackend extends JGroupsBackend {

@Override
protected NodeSelectorStrategy createNodeSelectorStrategy(IndexManager indexManager) {
return new SlaveNodeSelector();
}

}
Expand Up @@ -7,12 +7,16 @@
package org.hibernate.search.backend.jgroups.impl;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.spi.Backend;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.WorkerBuildContext;
import org.hibernate.search.util.logging.impl.Log;
import org.hibernate.search.util.logging.impl.LoggerFactory;

Expand All @@ -24,74 +28,106 @@
* @author Sanne Grinovero (C) 2013 Red Hat Inc.
* @since 4.3
*/
public class JGroupsReceivingMockBackend extends JGroupsBackendQueueProcessor implements BackendQueueProcessor {
public class JGroupsReceivingMockBackend implements Backend {

private static final Log log = LoggerFactory.make();

private volatile CountDownLatch threadTrap;
private volatile boolean failOnMessage = false;
private volatile boolean receivedAnything = false;

public JGroupsReceivingMockBackend() {
super( new MasterNodeSelector() );
}
private final JGroupsBackend delegate = new JGroupsMasterBackend();

@Override
public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
receivedSomething();
countDownAndJoin();
public void initialize(Properties properties, WorkerBuildContext context) {
delegate.initialize( properties, context );
}

private void receivedSomething() {
receivedAnything = true;
@Override
public void close() {
delegate.close();
}

@Override
public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor) {
//Unused
receivedSomething();
countDownAndJoin();
public BackendQueueProcessor createQueueProcessor(IndexManager indexManager, WorkerBuildContext context) {
return new JGroupsReceivingMockBackendQueueProcessor( delegate.createQueueProcessor( indexManager, context ) );
}

public void resetThreadTrap() {
threadTrap = new CountDownLatch( 2 );
}
public static class JGroupsReceivingMockBackendQueueProcessor implements BackendQueueProcessor {

public boolean wasSomethingReceived() {
return receivedAnything;
}
private final JGroupsBackendQueueProcessor delegate;

private volatile CountDownLatch threadTrap;
private volatile boolean failOnMessage = false;
private volatile boolean receivedAnything = false;

public void countDownAndJoin() {
if ( failOnMessage ) {
throw new NullPointerException( "Simulated Failure" );
public JGroupsReceivingMockBackendQueueProcessor(JGroupsBackendQueueProcessor delegate) {
super();
this.delegate = delegate;
}
log.trace( "[PREJOIN] Timestamp: " + System.nanoTime() );
try {
threadTrap.countDown();
//Basically we want to wait forever until we are awoken; we
//cap the definition of "forever" to 2 minutes to abort the test
//but this should not be necessary.
//The main test thread will release us ASAP so a large timeout should not
//affect the actual test duration.
threadTrap.await( 2, TimeUnit.MINUTES );

@Override
public void close() {
delegate.close();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();

@Override
public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
receivedSomething();
countDownAndJoin();
}
log.trace( "[POSTJOIN] Timestamp: " + System.nanoTime() );
}

public int releaseBlockedThreads() {
int count = (int) threadTrap.getCount();
for ( int i = 0; i < count; i++ ) {
threadTrap.countDown();
private void receivedSomething() {
receivedAnything = true;
}

@Override
public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor) {
//Unused
receivedSomething();
countDownAndJoin();
}
return count;
}

public void induceFailure() {
failOnMessage = true;
public JGroupsBackendQueueProcessor getDelegate() {
return delegate;
}

public void resetThreadTrap() {
threadTrap = new CountDownLatch( 2 );
}

public boolean wasSomethingReceived() {
return receivedAnything;
}

public void countDownAndJoin() {
if ( failOnMessage ) {
throw new NullPointerException( "Simulated Failure" );
}
log.trace( "[PREJOIN] Timestamp: " + System.nanoTime() );
try {
threadTrap.countDown();
//Basically we want to wait forever until we are awoken; we
//cap the definition of "forever" to 2 minutes to abort the test
//but this should not be necessary.
//The main test thread will release us ASAP so a large timeout should not
//affect the actual test duration.
threadTrap.await( 2, TimeUnit.MINUTES );
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
log.trace( "[POSTJOIN] Timestamp: " + System.nanoTime() );
}

public int releaseBlockedThreads() {
int count = (int) threadTrap.getCount();
for ( int i = 0; i < count; i++ ) {
threadTrap.countDown();
}
return count;
}

public void induceFailure() {
failOnMessage = true;
}
}

}
Expand Up @@ -13,6 +13,7 @@
import org.hibernate.search.annotations.Field;
import org.hibernate.search.annotations.Indexed;
import org.hibernate.search.backend.impl.blackhole.BlackHoleBackendQueueProcessor;
import org.hibernate.search.backend.jgroups.impl.JGroupsReceivingMockBackend.JGroupsReceivingMockBackendQueueProcessor;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.backend.spi.Work;
import org.hibernate.search.backend.spi.WorkType;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void testSynchAsConfigured() {
JGroupsBackendQueueProcessor starsBackend = extractJGroupsBackend( "stars" );
Assert.assertFalse( "stars index was configured with an asyncronous JGroups backend", starsBackend.blocksForACK() );

JGroupsReceivingMockBackend dvdBackendMock = extractMockBackend( "dvds" );
JGroupsReceivingMockBackendQueueProcessor dvdBackendMock = extractMockBackend( "dvds" );

dvdBackendMock.resetThreadTrap();
boolean timeoutTriggered = false;
Expand All @@ -95,7 +96,7 @@ public void testSynchAsConfigured() {
Assert.assertTrue( "The backend didn't receive any message: something wrong with the test setup of network configuration", dvdBackendMock.wasSomethingReceived() );
Assert.assertTrue( timeoutTriggered );

JGroupsReceivingMockBackend booksBackendMock = extractMockBackend( "books" );
JGroupsReceivingMockBackendQueueProcessor booksBackendMock = extractMockBackend( "books" );
booksBackendMock.resetThreadTrap();
//Books are async so they should not timeout
storeBook( 1, "Hibernate Search in Action" );
Expand Down Expand Up @@ -123,8 +124,8 @@ public void testSynchAsConfigured() {
@Test
public void alternativeBackendConfiguration() {
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( masterNode, "dvds" );
JGroupsBackendQueueProcessor jgroupsProcessor = (JGroupsBackendQueueProcessor) backendQueueProcessor;
BackendQueueProcessor delegatedBackend = jgroupsProcessor.getDelegatedBackend();
JGroupsReceivingMockBackendQueueProcessor jgroupsProcessor = (JGroupsReceivingMockBackendQueueProcessor) backendQueueProcessor;
BackendQueueProcessor delegatedBackend = jgroupsProcessor.getDelegate().getDelegatedBackend();
Assert.assertTrue( "dvds backend was configured with a delegate to blackhole but it's not using it", delegatedBackend instanceof BlackHoleBackendQueueProcessor );
}

Expand All @@ -136,10 +137,10 @@ public void alternativeJGroupsTimeoutConfiguration() {
Assert.assertEquals( "message timeout configuration property not applied", JGROUPS_MESSAGES_TIMEOUT, messageTimeout );
}

private JGroupsReceivingMockBackend extractMockBackend(String indexName) {
private JGroupsReceivingMockBackendQueueProcessor extractMockBackend(String indexName) {
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( masterNode, indexName );
Assert.assertTrue( "Backend not using the configured Mock!", backendQueueProcessor instanceof JGroupsReceivingMockBackend );
return (JGroupsReceivingMockBackend) backendQueueProcessor;
Assert.assertTrue( "Backend not using the configured Mock!", backendQueueProcessor instanceof JGroupsReceivingMockBackendQueueProcessor );
return (JGroupsReceivingMockBackendQueueProcessor) backendQueueProcessor;
}

private JGroupsBackendQueueProcessor extractJGroupsBackend(String indexName) {
Expand Down
@@ -0,0 +1,42 @@
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.backend.jms.impl;

import java.util.Properties;

import org.hibernate.search.backend.spi.Backend;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.indexes.spi.IndexManager;
import org.hibernate.search.spi.WorkerBuildContext;


/**
* @author Yoann Rodiere
*/
public class JndiJMSBackend implements Backend {

private Properties properties;

@Override
public void initialize(Properties properties, WorkerBuildContext context) {
this.properties = properties;
}

@Override
public boolean isTransactional() {
return true;
}

@Override
@SuppressWarnings("deprecation")
public BackendQueueProcessor createQueueProcessor(IndexManager indexManager, WorkerBuildContext context) {
BackendQueueProcessor queueProcessor = new JndiJMSBackendQueueProcessor();
queueProcessor.initialize( properties, context, indexManager );
return queueProcessor;
}

}
3 changes: 2 additions & 1 deletion documentation/src/main/asciidoc/architecture.asciidoc
Expand Up @@ -69,7 +69,8 @@ Hibernate Search offers the ability to let the batched work being processed by d
Several back ends are provided out of the box and you have the option to plugin your own. It is
important to understand that in this context back end encompasses more than just the configuration
option `hibernate.search.default.worker.backend`. This property just specifies a implementation of
the BackendQueueProcessor interface which is a part of a back end configuration. In most cases,
the `BackendQueueProcessor` interface (or the `Backend` interface, see <<table-backend-configuration, the configuration options>>)
which is a part of a back end configuration. In most cases,
however, additional configuration settings are needed to successfully configure a specific backend
setup, like for example the JMS back end.

Expand Down
12 changes: 11 additions & 1 deletion documentation/src/main/asciidoc/configuration.asciidoc
Expand Up @@ -524,7 +524,17 @@ configured differently for each index.

`blackhole`: Mainly a test/developer setting which ignores all indexing work

You can also specify the fully qualified name of a class implementing BackendQueueProcessor. This way you can implement your own communication layer. The implementation is responsible for returning a Runnable instance which on execution will process the index work.
You can also specify the fully qualified name of a class implementing `BackendQueueProcessor`.
This way you can implement your own communication layer: the queue processors will receive all index works
to be sent to index managers.

Please note that instances of `BackendQueueProcessor` will only be created as soon as index managers are created,
and those may be created lazily
(for instance when the indexes are using <<advanced-features-dynamic-sharding,dynamic sharding>>).
If you want to initialize some resources eagerly,
you can instead specify the fully qualified name of a class implementing `Backend`.
This class which will be instantiated and initialized on startup,
and will then be asked to produce `BackendQueueProcessor` instances as needed.

|===============

Expand Down

0 comments on commit 0fc2d22

Please sign in to comment.