Skip to content

Commit

Permalink
HSEARCH-1304 and HSEARCH-1305 Make deletage backends and timeouts con…
Browse files Browse the repository at this point in the history
…figurable
  • Loading branch information
Sanne committed Apr 18, 2013
1 parent 1414939 commit 78f340c
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 32 deletions.
Expand Up @@ -49,8 +49,12 @@ public class BackendFactory {
private static final Log log = LoggerFactory.make();

public static BackendQueueProcessor createBackend(DirectoryBasedIndexManager indexManager, WorkerBuildContext context, Properties properties) {

String backend = properties.getProperty( Environment.WORKER_BACKEND );
return createBackend( backend, indexManager, context, properties );
}

public static BackendQueueProcessor createBackend(String backend, DirectoryBasedIndexManager indexManager, WorkerBuildContext context,
Properties properties) {

final BackendQueueProcessor backendQueueProcessor;

Expand Down
Expand Up @@ -67,9 +67,10 @@ public void stop() {
dispatcher.stop();
}

public void send(final Message message, final boolean synchronous) throws Exception {
public void send(final Message message, final boolean synchronous, final long timeout) throws Exception {
final RequestOptions options = synchronous ? RequestOptions.SYNC() : RequestOptions.ASYNC();
options.setExclusionList( dispatcher.getChannel().getAddress() );
options.setTimeout( timeout );
RspList<Object> rspList = dispatcher.castMessage( null, message, options );
//JGroups won't throw these automatically as it would with a JChannel usage,
//so we provide the same semantics by throwing the JGroups specific exceptions
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.hibernate.search.backend.BackendFactory;
import org.hibernate.search.backend.IndexingMonitor;
import org.hibernate.search.backend.LuceneWork;
import org.hibernate.search.backend.impl.lucene.LuceneBackendQueueProcessor;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
import org.hibernate.search.engine.ServiceManager;
import org.hibernate.search.indexes.impl.DirectoryBasedIndexManager;
Expand All @@ -53,6 +52,12 @@
*/
public class JGroupsBackendQueueProcessor implements BackendQueueProcessor {

/**
* All configuration properties need to be prefixed with <blockquote>.jgroups
* </blockquote> to be interpreted by this backend.
*/
private static final String JGROUPS_CONFIGURATION_SPACE = "jgroups";

/**
* Configuration property specific the the backend instance. When enabled
* the invoker thread will use JGroups in synchronous mode waiting for the
Expand All @@ -67,6 +72,24 @@ public class JGroupsBackendQueueProcessor implements BackendQueueProcessor {
*/
public static final String BLOCK_WAITING_ACK = "block_waiting_ack";

/**
* This JGroups backend is meant to delegate to a different backend on the
* master node. Generally this is expected to be the Lucene backend,
* but this property allows to specify a different implementation for the delegate.
*/
public static final String DELEGATE_BACKEND = "delegate_backend";

/**
* Specifies the timeout defined on messages sent to other nodes via the JGroups
* Channel. Value interpreted in milliseconds.
*/
public static final String MESSAGE_TIMEOUT_MS = "messages_timeout";

/**
* Default value for the {@link #MESSAGE_TIMEOUT_MS} configuration property.
*/
public static final int DEFAULT_MESSAGE_TIMEOUT = 20000;

private static final Log log = LoggerFactory.make();

private final NodeSelectorStrategy selectionStrategy;
Expand All @@ -79,36 +102,40 @@ public class JGroupsBackendQueueProcessor implements BackendQueueProcessor {
private ServiceManager serviceManager;

private JGroupsBackendQueueTask jgroupsProcessor;
private LuceneBackendQueueProcessor luceneBackendQueueProcessor;
private BackendQueueProcessor delegatedBackend;

public JGroupsBackendQueueProcessor(NodeSelectorStrategy selectionStrategy) {
this.selectionStrategy = selectionStrategy;
}

@Override
public void initialize(Properties props, WorkerBuildContext context, DirectoryBasedIndexManager indexManager) {
this.indexManager = indexManager;
this.indexName = indexManager.getIndexName();
assertLegacyOptionsNotUsed( props, indexName );
this.indexManager = indexManager;
serviceManager = context.getServiceManager();
this.messageSender = serviceManager.requestService( JGroupsChannelProvider.class, context );
NodeSelectorStrategyHolder masterNodeSelector = serviceManager.requestService( MasterSelectorServiceProvider.class, context );
masterNodeSelector.setNodeSelectorStrategy( indexName, selectionStrategy );
selectionStrategy.viewAccepted( messageSender.getView() ); // set current view?

final boolean sync = BackendFactory.isConfiguredAsSync( props );
final boolean block = ConfigurationParseHelper.getBooleanValue( props, BLOCK_WAITING_ACK, sync );
final Properties jgroupsProperties = new MaskedProperty( props, JGROUPS_CONFIGURATION_SPACE );
final boolean block = ConfigurationParseHelper.getBooleanValue( jgroupsProperties, BLOCK_WAITING_ACK, sync );

final long messageTimeout = ConfigurationParseHelper.getLongValue( jgroupsProperties, MESSAGE_TIMEOUT_MS, DEFAULT_MESSAGE_TIMEOUT );

log.jgroupsBlockWaitingForAck( indexName, block );
jgroupsProcessor = new JGroupsBackendQueueTask( this, indexManager, masterNodeSelector, block );
luceneBackendQueueProcessor = new LuceneBackendQueueProcessor();
luceneBackendQueueProcessor.initialize( props, context, indexManager );
jgroupsProcessor = new JGroupsBackendQueueTask( this, indexManager, masterNodeSelector, block, messageTimeout );

String backend = ConfigurationParseHelper.getString( jgroupsProperties, DELEGATE_BACKEND, "lucene" );
delegatedBackend = BackendFactory.createBackend( backend, indexManager, context, props );
}

public void close() {
serviceManager.releaseService( MasterSelectorServiceProvider.class );
serviceManager.releaseService( JGroupsChannelProvider.class );
luceneBackendQueueProcessor.close();
delegatedBackend.close();
}

MessageSender getMessageSender() {
Expand All @@ -135,7 +162,7 @@ public void indexMappingChanged() {
@Override
public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
if ( selectionStrategy.isIndexOwnerLocal() ) {
luceneBackendQueueProcessor.applyWork( workList, monitor );
delegatedBackend.applyWork( workList, monitor );
}
else {
if ( workList == null ) {
Expand All @@ -148,7 +175,7 @@ public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
@Override
public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor) {
if ( selectionStrategy.isIndexOwnerLocal() ) {
luceneBackendQueueProcessor.applyStreamWork( singleOperation, monitor );
delegatedBackend.applyStreamWork( singleOperation, monitor );
}
else {
//TODO optimize for single operation?
Expand All @@ -158,7 +185,7 @@ public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor)

@Override
public Lock getExclusiveWriteLock() {
return luceneBackendQueueProcessor.getExclusiveWriteLock();
return delegatedBackend.getExclusiveWriteLock();
}

private static void assertLegacyOptionsNotUsed(Properties props, String indexName) {
Expand All @@ -174,4 +201,13 @@ private static void assertLegacyOptionsNotUsed(Properties props, String indexNam
public boolean blocksForACK() {
return jgroupsProcessor.blocksForACK();
}

public BackendQueueProcessor getDelegatedBackend() {
return delegatedBackend;
}

public long getMessageTimeout() {
return jgroupsProcessor.getMessageTimeout();
}

}
Expand Up @@ -50,12 +50,14 @@ public class JGroupsBackendQueueTask {
private final IndexManager indexManager;
private final NodeSelectorStrategy masterNodeSelector;
private final boolean blockForACK; //true by default if this backend is synchronous
private final long messageTimeout;

public JGroupsBackendQueueTask(JGroupsBackendQueueProcessor factory, IndexManager indexManager,
NodeSelectorStrategyHolder masterNodeSelector, boolean blockForACK) {
NodeSelectorStrategyHolder masterNodeSelector, boolean blockForACK, long messageTimeout) {
this.factory = factory;
this.indexManager = indexManager;
this.blockForACK = blockForACK;
this.messageTimeout = messageTimeout;
this.indexName = indexManager.getIndexName();
this.masterNodeSelector = masterNodeSelector.getMasterNodeSelector( indexName );
}
Expand Down Expand Up @@ -91,7 +93,7 @@ public void sendLuceneWorkList(List<LuceneWork> queue) {

try {
Message message = masterNodeSelector.createMessage( data );
factory.getMessageSender().send( message, blockForACK );
factory.getMessageSender().send( message, blockForACK, messageTimeout );
if ( trace ) {
log.tracef( "Lucene works have been sent from slave %s to master node.", factory.getAddress() );
}
Expand All @@ -105,4 +107,8 @@ public boolean blocksForACK() {
return blockForACK;
}

public long getMessageTimeout() {
return messageTimeout;
}

}
Expand Up @@ -33,11 +33,11 @@
*
* @author <a href="mailto:ales.justin@jboss.org">Ales Justin</a>
*/
final class MessageListenerToRequestHandlerAdapter implements RequestHandler {
public final class MessageListenerToRequestHandlerAdapter implements RequestHandler {

private final MessageListener delegate;

MessageListenerToRequestHandlerAdapter(final MessageListener delegate) {
public MessageListenerToRequestHandlerAdapter(final MessageListener delegate) {
this.delegate = delegate;
}

Expand Down
Expand Up @@ -46,9 +46,10 @@ public interface MessageSender {
*
* @param message the JGroups message
* @param synchronous set to true if we need to block until an ACK is received
* @param messageTimeout in milliseconds
* @throws java.lang.Exception for any error
*/
void send(Message message, boolean synchronous) throws Exception;
void send(Message message, boolean synchronous, long messageTimeout) throws Exception;

/**
* Get sender's address.
Expand Down
Expand Up @@ -113,6 +113,27 @@ public static final int parseInt(String value, String errorMsgOnParseFailure) {
}
}

/**
* Parses a String to get an long value.
*
* @param value A string containing an long value to parse
* @param errorMsgOnParseFailure message being wrapped in a SearchException if value is null or not correct.
* @return the parsed value
* @throws SearchException both for null values and for Strings not containing a valid int.
*/
public static final long parseLong(String value, String errorMsgOnParseFailure) {
if ( value == null ) {
throw new SearchException( errorMsgOnParseFailure );
}
else {
try {
return Long.parseLong( value.trim() );
} catch (NumberFormatException nfe) {
throw new SearchException( errorMsgOnParseFailure, nfe );
}
}
}

/**
* In case value is null or an empty string the defValue is returned
* @param value
Expand All @@ -129,6 +150,23 @@ public static final int parseInt(String value, int defValue, String errorMsgOnPa
return parseInt( value, errorMsgOnParseFailure );
}
}

/**
* In case value is null or an empty string the defValue is returned
* @param value
* @param defValue
* @param errorMsgOnParseFailure
* @return the converted long.
* @throws SearchException if value can't be parsed.
*/
public static final long parseLong(String value, long defValue, String errorMsgOnParseFailure) {
if ( StringHelper.isEmpty( value ) ) {
return defValue;
}
else {
return parseLong( value, errorMsgOnParseFailure );
}
}

/**
* Looks for a numeric value in the Properties, returning
Expand All @@ -146,6 +184,22 @@ public static final int getIntValue(Properties cfg, String key, int defValue) {
return parseInt( propValue, defValue, "Unable to parse " + key + ": " + propValue );
}

/**
* Looks for a numeric value in the Properties, returning
* defValue if not found or if an empty string is found.
* When the key the value is found but not in valid format
* a standard error message is generated.
* @param cfg
* @param key
* @param defValue
* @return the converted long value.
* @throws SearchException for invalid format.
*/
public static long getLongValue(Properties cfg, String key, long defaultValue) {
String propValue = cfg.getProperty( key );
return parseLong( propValue, defaultValue, "Unable to parse " + key + ": " + propValue );
}

/**
* Parses a string to recognize exactly either "true" or "false".
*
Expand Down Expand Up @@ -196,4 +250,5 @@ public static final String getString(Properties cfg, String key, String defaultV
String propValue = cfg.getProperty( key );
return propValue == null ? defaultValue : propValue;
}

}
Expand Up @@ -28,6 +28,7 @@
import org.hibernate.search.annotations.DocumentId;
import org.hibernate.search.annotations.Field;
import org.hibernate.search.annotations.Indexed;
import org.hibernate.search.backend.impl.blackhole.BlackHoleBackendQueueProcessor;
import org.hibernate.search.backend.impl.jgroups.JGroupsBackendQueueProcessor;
import org.hibernate.search.backend.impl.jgroups.JGroupsChannelProvider;
import org.hibernate.search.backend.spi.BackendQueueProcessor;
Expand Down Expand Up @@ -63,9 +64,11 @@ public class SyncJGroupsBackendTest {
public TestingSearchFactoryHolder slaveNode = new TestingSearchFactoryHolder( Dvd.class, Book.class, Drink.class, Star.class )
.withProperty( "hibernate.search.default.worker.backend", "jgroupsSlave" )
.withProperty( "hibernate.search.dvds.worker.execution", "sync" )
.withProperty( "hibernate.search.dvds.jgroups.delegate_backend", "blackhole" )
.withProperty( "hibernate.search.dvds.jgroups.messages_timeout", "200" )
.withProperty( "hibernate.search.books.worker.execution", "async" )
.withProperty( "hibernate.search.drinks." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "true" )
.withProperty( "hibernate.search.stars." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "false" )
.withProperty( "hibernate.search.drinks.jgroups." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "true" )
.withProperty( "hibernate.search.stars.jgroups." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "false" )
.withProperty( JGroupsChannelProvider.CONFIGURATION_FILE, JGROUPS_CONFIGURATION )
;

Expand Down Expand Up @@ -133,24 +136,41 @@ public void testSynchAsConfigured() {
Assert.assertTrue( npeTriggered );
}

@Test
public void alternativeBackendConfiguration() {
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( slaveNode, "dvds" );
JGroupsBackendQueueProcessor jgroupsProcessor = (JGroupsBackendQueueProcessor) backendQueueProcessor;
BackendQueueProcessor delegatedBackend = jgroupsProcessor.getDelegatedBackend();
Assert.assertTrue ( "dvds backend was configured with a deleage to blackhole but it's not using it", delegatedBackend instanceof BlackHoleBackendQueueProcessor );
}

@Test
public void alternativeJGroupsTimeoutConfiguration() {
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( slaveNode, "dvds" );
JGroupsBackendQueueProcessor jgroupsProcessor = (JGroupsBackendQueueProcessor) backendQueueProcessor;
long messageTimeout = jgroupsProcessor.getMessageTimeout();
Assert.assertEquals( "message timeout configuration property not applied", 200, messageTimeout );
}

private JGroupsReceivingMockBackend extractMockBackend(String indexName) {
IndexManager indexManager = masterNode.getSearchFactory().getAllIndexesManager().getIndexManager( indexName );
Assert.assertNotNull( indexManager );
DirectoryBasedIndexManager dbi = (DirectoryBasedIndexManager) indexManager;
BackendQueueProcessor backendQueueProcessor = dbi.getBackendQueueProcessor();
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( masterNode, indexName );
Assert.assertTrue( "Backend not using the configured Mock!", backendQueueProcessor instanceof JGroupsReceivingMockBackend );
return (JGroupsReceivingMockBackend) backendQueueProcessor;
}

private JGroupsBackendQueueProcessor extractJGroupsBackend(String indexName) {
IndexManager indexManager = slaveNode.getSearchFactory().getAllIndexesManager().getIndexManager( indexName );
Assert.assertNotNull( indexManager );
DirectoryBasedIndexManager dbi = (DirectoryBasedIndexManager) indexManager;
BackendQueueProcessor backendQueueProcessor = dbi.getBackendQueueProcessor();
BackendQueueProcessor backendQueueProcessor = extractBackendQueue( slaveNode, indexName );
Assert.assertTrue( "Backend not using JGroups!", backendQueueProcessor instanceof JGroupsBackendQueueProcessor );
return (JGroupsBackendQueueProcessor) backendQueueProcessor;
}

private static BackendQueueProcessor extractBackendQueue(TestingSearchFactoryHolder node, String indexName) {
IndexManager indexManager = node.getSearchFactory().getAllIndexesManager().getIndexManager( indexName );
Assert.assertNotNull( indexManager );
DirectoryBasedIndexManager dbi = (DirectoryBasedIndexManager) indexManager;
return dbi.getBackendQueueProcessor();
}

private void storeBook(int id, String string) {
Book book = new Book();
book.id = id;
Expand Down

0 comments on commit 78f340c

Please sign in to comment.