From 78f340ccb6499a360ac2f5a0651491f50e0f1084 Mon Sep 17 00:00:00 2001 From: Sanne Grinovero Date: Wed, 17 Apr 2013 14:20:40 +0100 Subject: [PATCH] HSEARCH-1304 and HSEARCH-1305 Make deletage backends and timeouts configurable --- .../search/backend/BackendFactory.java | 6 +- .../impl/jgroups/DispatcherMessageSender.java | 3 +- .../jgroups/JGroupsBackendQueueProcessor.java | 58 +++++++++++++++---- .../impl/jgroups/JGroupsBackendQueueTask.java | 10 +++- ...essageListenerToRequestHandlerAdapter.java | 4 +- .../backend/impl/jgroups/MessageSender.java | 3 +- .../impl/ConfigurationParseHelper.java | 55 ++++++++++++++++++ .../jgroups/SyncJGroupsBackendTest.java | 40 +++++++++---- .../test/jgroups/slave/JGroupsSlaveTest.java | 12 ++-- 9 files changed, 159 insertions(+), 32 deletions(-) diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/BackendFactory.java b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/BackendFactory.java index cc401a78f4f..0738a4d12cc 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/BackendFactory.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/BackendFactory.java @@ -49,8 +49,12 @@ public class BackendFactory { private static final Log log = LoggerFactory.make(); public static BackendQueueProcessor createBackend(DirectoryBasedIndexManager indexManager, WorkerBuildContext context, Properties properties) { - String backend = properties.getProperty( Environment.WORKER_BACKEND ); + return createBackend( backend, indexManager, context, properties ); + } + + public static BackendQueueProcessor createBackend(String backend, DirectoryBasedIndexManager indexManager, WorkerBuildContext context, + Properties properties) { final BackendQueueProcessor backendQueueProcessor; diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/DispatcherMessageSender.java b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/DispatcherMessageSender.java index 9de0fdbba9e..6277da1f3bb 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/DispatcherMessageSender.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/DispatcherMessageSender.java @@ -67,9 +67,10 @@ public void stop() { dispatcher.stop(); } - public void send(final Message message, final boolean synchronous) throws Exception { + public void send(final Message message, final boolean synchronous, final long timeout) throws Exception { final RequestOptions options = synchronous ? RequestOptions.SYNC() : RequestOptions.ASYNC(); options.setExclusionList( dispatcher.getChannel().getAddress() ); + options.setTimeout( timeout ); RspList rspList = dispatcher.castMessage( null, message, options ); //JGroups won't throw these automatically as it would with a JChannel usage, //so we provide the same semantics by throwing the JGroups specific exceptions diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueProcessor.java b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueProcessor.java index c4500029adf..8799e614f89 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueProcessor.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueProcessor.java @@ -31,7 +31,6 @@ import org.hibernate.search.backend.BackendFactory; import org.hibernate.search.backend.IndexingMonitor; import org.hibernate.search.backend.LuceneWork; -import org.hibernate.search.backend.impl.lucene.LuceneBackendQueueProcessor; import org.hibernate.search.backend.spi.BackendQueueProcessor; import org.hibernate.search.engine.ServiceManager; import org.hibernate.search.indexes.impl.DirectoryBasedIndexManager; @@ -53,6 +52,12 @@ */ public class JGroupsBackendQueueProcessor implements BackendQueueProcessor { + /** + * All configuration properties need to be prefixed with
.jgroups + *
to be interpreted by this backend. + */ + private static final String JGROUPS_CONFIGURATION_SPACE = "jgroups"; + /** * Configuration property specific the the backend instance. When enabled * the invoker thread will use JGroups in synchronous mode waiting for the @@ -67,6 +72,24 @@ public class JGroupsBackendQueueProcessor implements BackendQueueProcessor { */ public static final String BLOCK_WAITING_ACK = "block_waiting_ack"; + /** + * This JGroups backend is meant to delegate to a different backend on the + * master node. Generally this is expected to be the Lucene backend, + * but this property allows to specify a different implementation for the delegate. + */ + public static final String DELEGATE_BACKEND = "delegate_backend"; + + /** + * Specifies the timeout defined on messages sent to other nodes via the JGroups + * Channel. Value interpreted in milliseconds. + */ + public static final String MESSAGE_TIMEOUT_MS = "messages_timeout"; + + /** + * Default value for the {@link #MESSAGE_TIMEOUT_MS} configuration property. + */ + public static final int DEFAULT_MESSAGE_TIMEOUT = 20000; + private static final Log log = LoggerFactory.make(); private final NodeSelectorStrategy selectionStrategy; @@ -79,7 +102,7 @@ public class JGroupsBackendQueueProcessor implements BackendQueueProcessor { private ServiceManager serviceManager; private JGroupsBackendQueueTask jgroupsProcessor; - private LuceneBackendQueueProcessor luceneBackendQueueProcessor; + private BackendQueueProcessor delegatedBackend; public JGroupsBackendQueueProcessor(NodeSelectorStrategy selectionStrategy) { this.selectionStrategy = selectionStrategy; @@ -87,9 +110,9 @@ public JGroupsBackendQueueProcessor(NodeSelectorStrategy selectionStrategy) { @Override public void initialize(Properties props, WorkerBuildContext context, DirectoryBasedIndexManager indexManager) { + this.indexManager = indexManager; this.indexName = indexManager.getIndexName(); assertLegacyOptionsNotUsed( props, indexName ); - this.indexManager = indexManager; serviceManager = context.getServiceManager(); this.messageSender = serviceManager.requestService( JGroupsChannelProvider.class, context ); NodeSelectorStrategyHolder masterNodeSelector = serviceManager.requestService( MasterSelectorServiceProvider.class, context ); @@ -97,18 +120,22 @@ public void initialize(Properties props, WorkerBuildContext context, DirectoryBa selectionStrategy.viewAccepted( messageSender.getView() ); // set current view? final boolean sync = BackendFactory.isConfiguredAsSync( props ); - final boolean block = ConfigurationParseHelper.getBooleanValue( props, BLOCK_WAITING_ACK, sync ); + final Properties jgroupsProperties = new MaskedProperty( props, JGROUPS_CONFIGURATION_SPACE ); + final boolean block = ConfigurationParseHelper.getBooleanValue( jgroupsProperties, BLOCK_WAITING_ACK, sync ); + + final long messageTimeout = ConfigurationParseHelper.getLongValue( jgroupsProperties, MESSAGE_TIMEOUT_MS, DEFAULT_MESSAGE_TIMEOUT ); log.jgroupsBlockWaitingForAck( indexName, block ); - jgroupsProcessor = new JGroupsBackendQueueTask( this, indexManager, masterNodeSelector, block ); - luceneBackendQueueProcessor = new LuceneBackendQueueProcessor(); - luceneBackendQueueProcessor.initialize( props, context, indexManager ); + jgroupsProcessor = new JGroupsBackendQueueTask( this, indexManager, masterNodeSelector, block, messageTimeout ); + + String backend = ConfigurationParseHelper.getString( jgroupsProperties, DELEGATE_BACKEND, "lucene" ); + delegatedBackend = BackendFactory.createBackend( backend, indexManager, context, props ); } public void close() { serviceManager.releaseService( MasterSelectorServiceProvider.class ); serviceManager.releaseService( JGroupsChannelProvider.class ); - luceneBackendQueueProcessor.close(); + delegatedBackend.close(); } MessageSender getMessageSender() { @@ -135,7 +162,7 @@ public void indexMappingChanged() { @Override public void applyWork(List workList, IndexingMonitor monitor) { if ( selectionStrategy.isIndexOwnerLocal() ) { - luceneBackendQueueProcessor.applyWork( workList, monitor ); + delegatedBackend.applyWork( workList, monitor ); } else { if ( workList == null ) { @@ -148,7 +175,7 @@ public void applyWork(List workList, IndexingMonitor monitor) { @Override public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor) { if ( selectionStrategy.isIndexOwnerLocal() ) { - luceneBackendQueueProcessor.applyStreamWork( singleOperation, monitor ); + delegatedBackend.applyStreamWork( singleOperation, monitor ); } else { //TODO optimize for single operation? @@ -158,7 +185,7 @@ public void applyStreamWork(LuceneWork singleOperation, IndexingMonitor monitor) @Override public Lock getExclusiveWriteLock() { - return luceneBackendQueueProcessor.getExclusiveWriteLock(); + return delegatedBackend.getExclusiveWriteLock(); } private static void assertLegacyOptionsNotUsed(Properties props, String indexName) { @@ -174,4 +201,13 @@ private static void assertLegacyOptionsNotUsed(Properties props, String indexNam public boolean blocksForACK() { return jgroupsProcessor.blocksForACK(); } + + public BackendQueueProcessor getDelegatedBackend() { + return delegatedBackend; + } + + public long getMessageTimeout() { + return jgroupsProcessor.getMessageTimeout(); + } + } diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueTask.java b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueTask.java index f4d106cb4a8..4db8c9b7a5e 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueTask.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/JGroupsBackendQueueTask.java @@ -50,12 +50,14 @@ public class JGroupsBackendQueueTask { private final IndexManager indexManager; private final NodeSelectorStrategy masterNodeSelector; private final boolean blockForACK; //true by default if this backend is synchronous + private final long messageTimeout; public JGroupsBackendQueueTask(JGroupsBackendQueueProcessor factory, IndexManager indexManager, - NodeSelectorStrategyHolder masterNodeSelector, boolean blockForACK) { + NodeSelectorStrategyHolder masterNodeSelector, boolean blockForACK, long messageTimeout) { this.factory = factory; this.indexManager = indexManager; this.blockForACK = blockForACK; + this.messageTimeout = messageTimeout; this.indexName = indexManager.getIndexName(); this.masterNodeSelector = masterNodeSelector.getMasterNodeSelector( indexName ); } @@ -91,7 +93,7 @@ public void sendLuceneWorkList(List queue) { try { Message message = masterNodeSelector.createMessage( data ); - factory.getMessageSender().send( message, blockForACK ); + factory.getMessageSender().send( message, blockForACK, messageTimeout ); if ( trace ) { log.tracef( "Lucene works have been sent from slave %s to master node.", factory.getAddress() ); } @@ -105,4 +107,8 @@ public boolean blocksForACK() { return blockForACK; } + public long getMessageTimeout() { + return messageTimeout; + } + } diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageListenerToRequestHandlerAdapter.java b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageListenerToRequestHandlerAdapter.java index c6b69141055..b78c997abc4 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageListenerToRequestHandlerAdapter.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageListenerToRequestHandlerAdapter.java @@ -33,11 +33,11 @@ * * @author Ales Justin */ -final class MessageListenerToRequestHandlerAdapter implements RequestHandler { +public final class MessageListenerToRequestHandlerAdapter implements RequestHandler { private final MessageListener delegate; - MessageListenerToRequestHandlerAdapter(final MessageListener delegate) { + public MessageListenerToRequestHandlerAdapter(final MessageListener delegate) { this.delegate = delegate; } diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageSender.java b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageSender.java index 441f6d54be2..e415c8e5311 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageSender.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/backend/impl/jgroups/MessageSender.java @@ -46,9 +46,10 @@ public interface MessageSender { * * @param message the JGroups message * @param synchronous set to true if we need to block until an ACK is received + * @param messageTimeout in milliseconds * @throws java.lang.Exception for any error */ - void send(Message message, boolean synchronous) throws Exception; + void send(Message message, boolean synchronous, long messageTimeout) throws Exception; /** * Get sender's address. diff --git a/hibernate-search-engine/src/main/java/org/hibernate/search/util/configuration/impl/ConfigurationParseHelper.java b/hibernate-search-engine/src/main/java/org/hibernate/search/util/configuration/impl/ConfigurationParseHelper.java index af71a1f34f0..c3e4bdd1805 100644 --- a/hibernate-search-engine/src/main/java/org/hibernate/search/util/configuration/impl/ConfigurationParseHelper.java +++ b/hibernate-search-engine/src/main/java/org/hibernate/search/util/configuration/impl/ConfigurationParseHelper.java @@ -113,6 +113,27 @@ public static final int parseInt(String value, String errorMsgOnParseFailure) { } } + /** + * Parses a String to get an long value. + * + * @param value A string containing an long value to parse + * @param errorMsgOnParseFailure message being wrapped in a SearchException if value is null or not correct. + * @return the parsed value + * @throws SearchException both for null values and for Strings not containing a valid int. + */ + public static final long parseLong(String value, String errorMsgOnParseFailure) { + if ( value == null ) { + throw new SearchException( errorMsgOnParseFailure ); + } + else { + try { + return Long.parseLong( value.trim() ); + } catch (NumberFormatException nfe) { + throw new SearchException( errorMsgOnParseFailure, nfe ); + } + } + } + /** * In case value is null or an empty string the defValue is returned * @param value @@ -129,6 +150,23 @@ public static final int parseInt(String value, int defValue, String errorMsgOnPa return parseInt( value, errorMsgOnParseFailure ); } } + + /** + * In case value is null or an empty string the defValue is returned + * @param value + * @param defValue + * @param errorMsgOnParseFailure + * @return the converted long. + * @throws SearchException if value can't be parsed. + */ + public static final long parseLong(String value, long defValue, String errorMsgOnParseFailure) { + if ( StringHelper.isEmpty( value ) ) { + return defValue; + } + else { + return parseLong( value, errorMsgOnParseFailure ); + } + } /** * Looks for a numeric value in the Properties, returning @@ -146,6 +184,22 @@ public static final int getIntValue(Properties cfg, String key, int defValue) { return parseInt( propValue, defValue, "Unable to parse " + key + ": " + propValue ); } + /** + * Looks for a numeric value in the Properties, returning + * defValue if not found or if an empty string is found. + * When the key the value is found but not in valid format + * a standard error message is generated. + * @param cfg + * @param key + * @param defValue + * @return the converted long value. + * @throws SearchException for invalid format. + */ + public static long getLongValue(Properties cfg, String key, long defaultValue) { + String propValue = cfg.getProperty( key ); + return parseLong( propValue, defaultValue, "Unable to parse " + key + ": " + propValue ); + } + /** * Parses a string to recognize exactly either "true" or "false". * @@ -196,4 +250,5 @@ public static final String getString(Properties cfg, String key, String defaultV String propValue = cfg.getProperty( key ); return propValue == null ? defaultValue : propValue; } + } diff --git a/hibernate-search-engine/src/test/java/org/hibernate/search/test/backends/jgroups/SyncJGroupsBackendTest.java b/hibernate-search-engine/src/test/java/org/hibernate/search/test/backends/jgroups/SyncJGroupsBackendTest.java index 8d0a8091e35..b22227d6357 100644 --- a/hibernate-search-engine/src/test/java/org/hibernate/search/test/backends/jgroups/SyncJGroupsBackendTest.java +++ b/hibernate-search-engine/src/test/java/org/hibernate/search/test/backends/jgroups/SyncJGroupsBackendTest.java @@ -28,6 +28,7 @@ import org.hibernate.search.annotations.DocumentId; import org.hibernate.search.annotations.Field; import org.hibernate.search.annotations.Indexed; +import org.hibernate.search.backend.impl.blackhole.BlackHoleBackendQueueProcessor; import org.hibernate.search.backend.impl.jgroups.JGroupsBackendQueueProcessor; import org.hibernate.search.backend.impl.jgroups.JGroupsChannelProvider; import org.hibernate.search.backend.spi.BackendQueueProcessor; @@ -63,9 +64,11 @@ public class SyncJGroupsBackendTest { public TestingSearchFactoryHolder slaveNode = new TestingSearchFactoryHolder( Dvd.class, Book.class, Drink.class, Star.class ) .withProperty( "hibernate.search.default.worker.backend", "jgroupsSlave" ) .withProperty( "hibernate.search.dvds.worker.execution", "sync" ) + .withProperty( "hibernate.search.dvds.jgroups.delegate_backend", "blackhole" ) + .withProperty( "hibernate.search.dvds.jgroups.messages_timeout", "200" ) .withProperty( "hibernate.search.books.worker.execution", "async" ) - .withProperty( "hibernate.search.drinks." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "true" ) - .withProperty( "hibernate.search.stars." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "false" ) + .withProperty( "hibernate.search.drinks.jgroups." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "true" ) + .withProperty( "hibernate.search.stars.jgroups." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "false" ) .withProperty( JGroupsChannelProvider.CONFIGURATION_FILE, JGROUPS_CONFIGURATION ) ; @@ -133,24 +136,41 @@ public void testSynchAsConfigured() { Assert.assertTrue( npeTriggered ); } + @Test + public void alternativeBackendConfiguration() { + BackendQueueProcessor backendQueueProcessor = extractBackendQueue( slaveNode, "dvds" ); + JGroupsBackendQueueProcessor jgroupsProcessor = (JGroupsBackendQueueProcessor) backendQueueProcessor; + BackendQueueProcessor delegatedBackend = jgroupsProcessor.getDelegatedBackend(); + Assert.assertTrue ( "dvds backend was configured with a deleage to blackhole but it's not using it", delegatedBackend instanceof BlackHoleBackendQueueProcessor ); + } + + @Test + public void alternativeJGroupsTimeoutConfiguration() { + BackendQueueProcessor backendQueueProcessor = extractBackendQueue( slaveNode, "dvds" ); + JGroupsBackendQueueProcessor jgroupsProcessor = (JGroupsBackendQueueProcessor) backendQueueProcessor; + long messageTimeout = jgroupsProcessor.getMessageTimeout(); + Assert.assertEquals( "message timeout configuration property not applied", 200, messageTimeout ); + } + private JGroupsReceivingMockBackend extractMockBackend(String indexName) { - IndexManager indexManager = masterNode.getSearchFactory().getAllIndexesManager().getIndexManager( indexName ); - Assert.assertNotNull( indexManager ); - DirectoryBasedIndexManager dbi = (DirectoryBasedIndexManager) indexManager; - BackendQueueProcessor backendQueueProcessor = dbi.getBackendQueueProcessor(); + BackendQueueProcessor backendQueueProcessor = extractBackendQueue( masterNode, indexName ); Assert.assertTrue( "Backend not using the configured Mock!", backendQueueProcessor instanceof JGroupsReceivingMockBackend ); return (JGroupsReceivingMockBackend) backendQueueProcessor; } private JGroupsBackendQueueProcessor extractJGroupsBackend(String indexName) { - IndexManager indexManager = slaveNode.getSearchFactory().getAllIndexesManager().getIndexManager( indexName ); - Assert.assertNotNull( indexManager ); - DirectoryBasedIndexManager dbi = (DirectoryBasedIndexManager) indexManager; - BackendQueueProcessor backendQueueProcessor = dbi.getBackendQueueProcessor(); + BackendQueueProcessor backendQueueProcessor = extractBackendQueue( slaveNode, indexName ); Assert.assertTrue( "Backend not using JGroups!", backendQueueProcessor instanceof JGroupsBackendQueueProcessor ); return (JGroupsBackendQueueProcessor) backendQueueProcessor; } + private static BackendQueueProcessor extractBackendQueue(TestingSearchFactoryHolder node, String indexName) { + IndexManager indexManager = node.getSearchFactory().getAllIndexesManager().getIndexManager( indexName ); + Assert.assertNotNull( indexManager ); + DirectoryBasedIndexManager dbi = (DirectoryBasedIndexManager) indexManager; + return dbi.getBackendQueueProcessor(); + } + private void storeBook(int id, String string) { Book book = new Book(); book.id = id; diff --git a/hibernate-search-orm/src/test/java/org/hibernate/search/test/jgroups/slave/JGroupsSlaveTest.java b/hibernate-search-orm/src/test/java/org/hibernate/search/test/jgroups/slave/JGroupsSlaveTest.java index ee02245607a..419c1775175 100644 --- a/hibernate-search-orm/src/test/java/org/hibernate/search/test/jgroups/slave/JGroupsSlaveTest.java +++ b/hibernate-search-orm/src/test/java/org/hibernate/search/test/jgroups/slave/JGroupsSlaveTest.java @@ -29,12 +29,15 @@ import org.jgroups.Channel; import org.jgroups.JChannel; +import org.jgroups.blocks.MessageDispatcher; import org.hibernate.Session; import org.hibernate.Transaction; import org.hibernate.cfg.Configuration; import org.hibernate.search.Environment; +import org.hibernate.search.backend.impl.jgroups.JGroupsBackendQueueProcessor; import org.hibernate.search.backend.impl.jgroups.JGroupsChannelProvider; +import org.hibernate.search.backend.impl.jgroups.MessageListenerToRequestHandlerAdapter; import org.hibernate.search.test.SearchTestCase; import org.hibernate.search.test.jgroups.common.JGroupsCommonTest; import org.hibernate.search.util.configuration.impl.ConfigurationParseHelper; @@ -76,9 +79,6 @@ public void testMessageSend() throws Exception { s.persist( ts2 ); tx.commit(); - //need to sleep for the message consumption - Thread.sleep( JGroupsCommonTest.NETWORK_WAIT_MILLISECONDS ); - boolean failed = true; for ( int i = 0; i < JGroupsCommonTest.MAX_WAITS; i++ ) { Thread.sleep( JGroupsCommonTest.NETWORK_WAIT_MILLISECONDS ); @@ -128,7 +128,9 @@ public void testMessageSend() throws Exception { private void prepareJGroupsChannel() throws Exception { channel = new JChannel( ConfigurationParseHelper.locateConfig( "testing-flush-loopback.xml" ) ); channel.connect( CHANNEL_NAME ); - channel.setReceiver( new JGroupsReceiver(getSearchFactoryImpl()) ); + JGroupsReceiver listener = new JGroupsReceiver(getSearchFactoryImpl()); + MessageListenerToRequestHandlerAdapter adapter = new MessageListenerToRequestHandlerAdapter( listener ); + MessageDispatcher standardDispatcher = new MessageDispatcher( channel, listener, listener, adapter ); } @Override @@ -154,6 +156,8 @@ protected Class[] getAnnotatedClasses() { protected void configure(Configuration cfg) { super.configure( cfg ); cfg.setProperty( "hibernate.search.default." + Environment.WORKER_BACKEND, "jgroupsSlave" ); + cfg.setProperty( "hibernate.search.default." + JGroupsBackendQueueProcessor.BLOCK_WAITING_ACK, "false" ); + cfg.setProperty( "hibernate.search.default." + JGroupsBackendQueueProcessor.MESSAGE_TIMEOUT_MS, "100" ); cfg.setProperty( JGroupsChannelProvider.CLUSTER_NAME, CHANNEL_NAME ); cfg.setProperty( JGroupsChannelProvider.CONFIGURATION_FILE, "testing-flush-loopback.xml" ); }