diff --git a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java index 0f289e7ea4f..78a2b5462cf 100644 --- a/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/hornetq-core-client/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java @@ -579,6 +579,7 @@ public void run() catch (Exception e) { HornetQClientLogger.LOGGER.errorDecodingPacket(e); + throw new IllegalStateException(e); } } diff --git a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java index bdc1d7a3744..614ed5c0318 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java @@ -584,7 +584,15 @@ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer if (conn != null) { - conn.connection.bufferReceived(connectionID, buffer); + try + { + conn.connection.bufferReceived(connectionID, buffer); + } + catch (RuntimeException e) + { + HornetQServerLogger.LOGGER.warn("Failed to decode buffer, disconnect immediately.", e); + conn.connection.fail(new HornetQException(e.getMessage())); + } } else { diff --git a/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/DisconnectOnCriticalFailureTest.java b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/DisconnectOnCriticalFailureTest.java new file mode 100644 index 00000000000..58f07323f04 --- /dev/null +++ b/tests/byteman-tests/src/test/java/org/hornetq/byteman/tests/DisconnectOnCriticalFailureTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.hornetq.byteman.tests; + +import org.hornetq.tests.util.JMSTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@RunWith(BMUnitRunner.class) +public class DisconnectOnCriticalFailureTest extends JMSTestBase +{ + + private static AtomicBoolean corruptPacket = new AtomicBoolean(false); + + @Test + @BMRules + ( + rules = + { + @BMRule + ( + name = "Corrupt Decoding", + targetClass = "org.hornetq.core.protocol.core.impl.PacketDecoder", + targetMethod = "decode(byte)", + targetLocation = "ENTRY", + action = "org.hornetq.byteman.tests.DisconnectOnCriticalFailureTest.doThrow();" + ) + } + ) + public void testSendDisconnect() throws Exception + { + createQueue("queue1"); + final Connection producerConnection = nettyCf.createConnection(); + final CountDownLatch latch = new CountDownLatch(1); + + try + { + producerConnection.setExceptionListener(new ExceptionListener() + { + @Override + public void onException(JMSException e) + { + latch.countDown(); + } + }); + + corruptPacket.set(true); + producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + finally + { + corruptPacket.set(false); + + if (producerConnection != null) + { + producerConnection.close(); + } + } + } + + public static void doThrow() + { + if (corruptPacket.get()) + { + corruptPacket.set(false); + throw new IllegalArgumentException("Invalid type: -84"); + } + } +} diff --git a/tests/integration-tests/src/test/java/org/hornetq/tests/util/JMSTestBase.java b/tests/integration-tests/src/test/java/org/hornetq/tests/util/JMSTestBase.java index 3d10295dcbe..9ed595c2d19 100644 --- a/tests/integration-tests/src/test/java/org/hornetq/tests/util/JMSTestBase.java +++ b/tests/integration-tests/src/test/java/org/hornetq/tests/util/JMSTestBase.java @@ -48,6 +48,8 @@ public class JMSTestBase extends ServiceTestBase protected MBeanServer mbeanServer; protected ConnectionFactory cf; + protected ConnectionFactory nettyCf; + protected Connection conn; protected InVMContext context; @@ -124,6 +126,7 @@ public void setUp() throws Exception Configuration conf = createDefaultConfig(true); conf.setSecurityEnabled(useSecurity()); conf.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + conf.getConnectorConfigurations().put("netty", new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); conf.setTransactionTimeoutScanPeriod(100); server = HornetQServers.newHornetQServer(conf, mbeanServer, usePersistence()); @@ -198,10 +201,20 @@ protected void registerConnectionFactory() throws Exception List connectorConfigs = new ArrayList(); connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY)); + List connectorConfigs1 = new ArrayList(); + connectorConfigs1.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + createCF(connectorConfigs, "/cf"); + createCF("NettyCF", connectorConfigs1, "/nettyCf"); cf = (ConnectionFactory)context.lookup("/cf"); + nettyCf = (ConnectionFactory)context.lookup("/nettyCf"); + } + + protected void createCF(final List connectorConfigs, final String... jndiBindings) throws Exception + { + createCF("ManualReconnectionToSingleServerTest", connectorConfigs, jndiBindings); } /** @@ -209,14 +222,14 @@ protected void registerConnectionFactory() throws Exception * @param jndiBindings * @throws Exception */ - protected void createCF(final List connectorConfigs, final String... jndiBindings) throws Exception + protected void createCF(final String name, final List connectorConfigs, final String... jndiBindings) throws Exception { int retryInterval = 1000; double retryIntervalMultiplier = 1.0; int reconnectAttempts = -1; int callTimeout = 30000; - jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest", + jmsServer.createConnectionFactory(name, false, JMSFactoryType.CF, registerConnectors(server, connectorConfigs),