From d44fc0255a7277134409f1aca28dcdef0917026a Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 11 Aug 2011 19:58:06 +0000 Subject: [PATCH] https://issues.jboss.org/browse/HORNETQ-746 - Fixing a deadlock with Netty NIO --- .../core/server/impl/ServerConsumerImpl.java | 73 ++-- .../hornetq/utils/ConfigurationHelper.java | 8 +- .../client/JmsNettyNioStressTest.java | 332 ++++++++++++++++++ 3 files changed, 359 insertions(+), 54 deletions(-) create mode 100644 tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java diff --git a/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java b/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java index 9541e9fb394..df05dc6e84b 100644 --- a/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java +++ b/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java @@ -93,8 +93,6 @@ private static void trace(final String message) private volatile LargeMessageDeliverer largeMessageDeliverer = null; - private boolean largeMessageInDelivery; - /** * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session. */ @@ -235,7 +233,7 @@ public HandleStatus handle(final MessageReference ref) throws Exception // If there is a pendingLargeMessage we can't take another message // This has to be checked inside the lock as the set to null is done inside the lock - if (largeMessageInDelivery) + if (largeMessageDeliverer != null) { return HandleStatus.BUSY; } @@ -463,21 +461,6 @@ public void setTransferring(final boolean transferring) synchronized (lock) { this.transferring = transferring; - - if (transferring) - { - // Now we must wait for any large message delivery to finish - while (largeMessageInDelivery) - { - try - { - Thread.sleep(1); - } - catch (InterruptedException ignore) - { - } - } - } } // Outside the lock @@ -662,25 +645,27 @@ public AtomicInteger getAvailableCredits() private void promptDelivery() { - synchronized (lock) + // largeMessageDeliverer is aways set inside a lock + // if we don't acquire a lock, we will have NPE eventually + if (largeMessageDeliverer != null) { - // largeMessageDeliverer is aways set inside a lock - // if we don't acquire a lock, we will have NPE eventually - if (largeMessageDeliverer != null) - { - resumeLargeMessage(); - } - else - { - if (browseOnly) - { - messageQueue.getExecutor().execute(browserDeliverer); - } - else - { - messageQueue.forceDelivery(); - } - } + resumeLargeMessage(); + } + else + { + forceDelivery(); + } + } + + private void forceDelivery() + { + if (browseOnly) + { + messageQueue.getExecutor().execute(browserDeliverer); + } + else + { + messageQueue.deliverAsync(); } } @@ -691,8 +676,6 @@ private void resumeLargeMessage() private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception { - largeMessageInDelivery = true; - final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref); // it doesn't need lock because deliverLargeMesasge is already inside the lock() @@ -714,6 +697,7 @@ private void deliverStandardMessage(final MessageReference ref, final ServerMess } } + // Inner classes // ------------------------------------------------------------------------ @@ -727,16 +711,7 @@ public void run() { if (largeMessageDeliverer == null || largeMessageDeliverer.deliver()) { - if (browseOnly) - { - messageQueue.getExecutor().execute(browserDeliverer); - } - else - { - // prompt Delivery only if chunk was finished - - messageQueue.deliverAsync(); - } + forceDelivery(); } } catch (Exception e) @@ -901,8 +876,6 @@ public void finish() throws Exception largeMessageDeliverer = null; - largeMessageInDelivery = false; - largeMessage = null; } } diff --git a/src/main/org/hornetq/utils/ConfigurationHelper.java b/src/main/org/hornetq/utils/ConfigurationHelper.java index ba8feddc0ab..08f943bdd16 100644 --- a/src/main/org/hornetq/utils/ConfigurationHelper.java +++ b/src/main/org/hornetq/utils/ConfigurationHelper.java @@ -73,7 +73,7 @@ public static int getIntProperty(final String propName, final int def, final Map { return Integer.valueOf((String)prop); } - else if (prop instanceof Integer == false) + else if (prop instanceof Number == false) { ConfigurationHelper.log.warn("Property " + propName + " must be an Integer, it is " + @@ -83,7 +83,7 @@ else if (prop instanceof Integer == false) } else { - return (Integer)prop; + return ((Number)prop).intValue(); } } } @@ -108,7 +108,7 @@ public static long getLongProperty(final String propName, final long def, final { return Long.valueOf((String)prop); } - else if (prop instanceof Long == false) + else if (prop instanceof Number == false) { ConfigurationHelper.log.warn("Property " + propName + " must be an Long, it is " + @@ -118,7 +118,7 @@ else if (prop instanceof Long == false) } else { - return (Long)prop; + return ((Number)prop).longValue(); } } } diff --git a/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java b/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java new file mode 100644 index 00000000000..54aa538ee30 --- /dev/null +++ b/tests/src/org/hornetq/tests/integration/client/JmsNettyNioStressTest.java @@ -0,0 +1,332 @@ +/* + * Copyright 2009 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.tests.integration.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Assert; + +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.ClientSession; +import org.hornetq.api.core.client.ClientSessionFactory; +import org.hornetq.api.core.client.ServerLocator; +import org.hornetq.api.jms.HornetQJMSClient; +import org.hornetq.api.jms.JMSFactoryType; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; +import org.hornetq.core.remoting.impl.netty.TransportConstants; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.jms.client.HornetQConnectionFactory; +import org.hornetq.jms.client.HornetQDestination; +import org.hornetq.tests.util.ServiceTestBase; +import org.hornetq.tests.util.UnitTestCase; + +/** + * -- https://issues.jboss.org/browse/HORNETQ-746 + * Stress test using netty with NIO and many JMS clients concurrently, to try + * and induce a deadlock. + *

+ * A large number of JMS clients are started concurrently. Some produce to queue + * 1 over one connection, others consume from queue 1 and produce to queue 2 + * over a second connection, and others consume from queue 2 over a third + * connection. + *

+ * Each operation is done in a JMS transaction, sending/consuming one message + * per transaction. + *

+ * The server is set up with netty, with only one NIO worker and 1 hornetq + * server worker. This increases the chance for the deadlock to occur. + *

+ * If the deadlock occurs, all threads will block/die. A simple transaction + * counting strategy is used to verify that the count has reached the expected + * value. + * @author Carl Heymann + */ +public class JmsNettyNioStressTest extends ServiceTestBase +{ + + // Constants ----------------------------------------------------- + + // Attributes ---------------------------------------------------- + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + // Public -------------------------------------------------------- + + // Remove this method to re-enable those tests + public void testStressSendNetty() throws Exception + { + doTestStressSend(true); + } + + public void doTestStressSend(final boolean netty) throws Exception + { + // first set up the server + Map params = new HashMap(); + params.put(TransportConstants.PORT_PROP_NAME, 5445); + params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + params.put(TransportConstants.USE_NIO_PROP_NAME, true); + // minimize threads to maximize possibility for deadlock + params.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, 1); + params.put(TransportConstants.BATCH_DELAY, 50); + Configuration config = UnitTestCase.createDefaultConfig(params, ServiceTestBase.NETTY_ACCEPTOR_FACTORY); + HornetQServer server = createServer(true, config); + server.getConfiguration().setThreadPoolMaxSize(2); + server.start(); + + // now the client side + Map connectionParams = new HashMap(); + connectionParams.put(TransportConstants.PORT_PROP_NAME, 5445); + connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost"); + connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, true); + connectionParams.put(TransportConstants.BATCH_DELAY, 50); + connectionParams.put(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, 6); + final TransportConfiguration transpConf = new TransportConfiguration(NettyConnectorFactory.class.getName(), + connectionParams); + final ServerLocator locator = createNonHALocator(netty); + + // each thread will do this number of transactions + final int numberOfMessages = 100; + + // these must all be the same + final int numProducers = 30; + final int numConsumerProducers = 30; + final int numConsumers = 30; + + // each produce, consume+produce and consume increments this counter + final AtomicInteger totalCount = new AtomicInteger(0); + + // the total we expect if all producers, consumer-producers and + // consumers complete normally + int totalExpectedCount = (numProducers + numConsumerProducers + numConsumerProducers) * numberOfMessages; + + // each group gets a separate connection + final Connection connectionProducer; + final Connection connectionConsumerProducer; + final Connection connectionConsumer; + + // create the 2 queues used in the test + ClientSessionFactory sf = locator.createSessionFactory(transpConf); + ClientSession session = sf.createTransactedSession(); + session.createQueue("jms.queue.queue", "jms.queue.queue"); + session.createQueue("jms.queue.queue2", "jms.queue.queue2"); + session.commit(); + sf.close(); + session.close(); + locator.close(); + + // create and start JMS connections + HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transpConf); + connectionProducer = cf.createConnection(); + connectionProducer.start(); + + connectionConsumerProducer = cf.createConnection(); + connectionConsumerProducer.start(); + + connectionConsumer = cf.createConnection(); + connectionConsumer.start(); + + // these threads produce messages on the the first queue + for (int i = 0; i < numProducers; i++) + { + new Thread() + { + @Override + public void run() + { + + Session session = null; + try + { + session = connectionProducer.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer messageProducer = session.createProducer(HornetQDestination.createQueue("queue")); + messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for (int i = 0; i < numberOfMessages; i++) + { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[3000]); + message.setStringProperty("Service", "LoadShedService"); + message.setStringProperty("Action", "testAction"); + + messageProducer.send(message); + session.commit(); + + totalCount.incrementAndGet(); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + if (session != null) + { + try + { + session.close(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + } + }.start(); + } + + // these threads just consume from the one and produce on a second queue + for (int i = 0; i < numConsumerProducers; i++) + { + new Thread() + { + @Override + public void run() + { + Session session = null; + try + { + session = connectionConsumerProducer.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(HornetQDestination.createQueue("queue")); + MessageProducer messageProducer = session.createProducer(HornetQDestination.createQueue("queue2")); + messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < numberOfMessages; i++) + { + BytesMessage message = (BytesMessage)consumer.receive(5000); + if (message == null) + { + return; + } + message = session.createBytesMessage(); + message.writeBytes(new byte[3000]); + message.setStringProperty("Service", "LoadShedService"); + message.setStringProperty("Action", "testAction"); + messageProducer.send(message); + session.commit(); + + totalCount.incrementAndGet(); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + if (session != null) + { + try + { + session.close(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + } + }.start(); + } + + // these threads consume from the second queue + for (int i = 0; i < numConsumers; i++) + { + new Thread() + { + @Override + public void run() + { + Session session = null; + try + { + session = connectionConsumer.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(HornetQDestination.createQueue("queue2")); + for (int i = 0; i < numberOfMessages; i++) + { + BytesMessage message = (BytesMessage)consumer.receive(5000); + if (message == null) + { + return; + } + session.commit(); + + totalCount.incrementAndGet(); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + if (session != null) + { + try + { + session.close(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + } + }.start(); + } + + // check that the overall transaction count reaches the expected number, + // which would indicate that the system didn't stall + int timeoutCounter = 0; + int maxSecondsToWait = 60; + while (timeoutCounter < maxSecondsToWait && totalCount.get() < totalExpectedCount) + { + timeoutCounter++; + Thread.sleep(1000); + System.out.println("Not done yet.. " + (maxSecondsToWait - timeoutCounter) + "; " + totalCount.get()); + } + System.out.println("Done.." + totalCount.get() + ", expected " + totalExpectedCount); + Assert.assertEquals("Possible deadlock", totalExpectedCount, totalCount.get()); + System.out.println("After assert"); + + // attempt cleaning up (this is not in a finally, still needs some work) + connectionProducer.close(); + connectionConsumerProducer.close(); + connectionConsumer.close(); + + server.stop(); + } + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + + // Inner classes ------------------------------------------------- + +} \ No newline at end of file