Skip to content

Commit

Permalink
Bug1428338 Invalid Type exception handling improvements
Browse files Browse the repository at this point in the history
If broker fails to decode any packets from buffer, it should
treat it as a critical bug and disconnect immediately.
Currently broker only logs an error message.
  • Loading branch information
howardgao committed Mar 14, 2017
1 parent bfee64f commit 7bde08c
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 3 deletions.
Expand Up @@ -579,6 +579,7 @@ public void run()
catch (Exception e)
{
HornetQClientLogger.LOGGER.errorDecodingPacket(e);
throw new IllegalStateException(e);
}
}

Expand Down
Expand Up @@ -584,7 +584,15 @@ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer

if (conn != null)
{
conn.connection.bufferReceived(connectionID, buffer);
try
{
conn.connection.bufferReceived(connectionID, buffer);
}
catch (RuntimeException e)
{
HornetQServerLogger.LOGGER.warn("Failed to decode buffer, disconnect immediately.", e);
conn.connection.fail(new HornetQException(e.getMessage()));
}
}
else
{
Expand Down
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hornetq.byteman.tests;

import org.hornetq.tests.util.JMSTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;

import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@RunWith(BMUnitRunner.class)
public class DisconnectOnCriticalFailureTest extends JMSTestBase
{

private static AtomicBoolean corruptPacket = new AtomicBoolean(false);

@Test
@BMRules
(
rules =
{
@BMRule
(
name = "Corrupt Decoding",
targetClass = "org.hornetq.core.protocol.core.impl.PacketDecoder",
targetMethod = "decode(byte)",
targetLocation = "ENTRY",
action = "org.hornetq.byteman.tests.DisconnectOnCriticalFailureTest.doThrow();"
)
}
)
public void testSendDisconnect() throws Exception
{
createQueue("queue1");
final Connection producerConnection = nettyCf.createConnection();
final CountDownLatch latch = new CountDownLatch(1);

try
{
producerConnection.setExceptionListener(new ExceptionListener()
{
@Override
public void onException(JMSException e)
{
latch.countDown();
}
});

corruptPacket.set(true);
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

assertTrue(latch.await(5, TimeUnit.SECONDS));
}
finally
{
corruptPacket.set(false);

if (producerConnection != null)
{
producerConnection.close();
}
}
}

public static void doThrow()
{
if (corruptPacket.get())
{
corruptPacket.set(false);
throw new IllegalArgumentException("Invalid type: -84");
}
}
}
Expand Up @@ -48,6 +48,8 @@ public class JMSTestBase extends ServiceTestBase
protected MBeanServer mbeanServer;

protected ConnectionFactory cf;
protected ConnectionFactory nettyCf;

protected Connection conn;

protected InVMContext context;
Expand Down Expand Up @@ -124,6 +126,7 @@ public void setUp() throws Exception
Configuration conf = createDefaultConfig(true);
conf.setSecurityEnabled(useSecurity());
conf.getConnectorConfigurations().put("invm", new TransportConfiguration(INVM_CONNECTOR_FACTORY));
conf.getConnectorConfigurations().put("netty", new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
conf.setTransactionTimeoutScanPeriod(100);

server = HornetQServers.newHornetQServer(conf, mbeanServer, usePersistence());
Expand Down Expand Up @@ -198,25 +201,35 @@ protected void registerConnectionFactory() throws Exception
List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
connectorConfigs.add(new TransportConfiguration(INVM_CONNECTOR_FACTORY));

List<TransportConfiguration> connectorConfigs1 = new ArrayList<TransportConfiguration>();
connectorConfigs1.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));

createCF(connectorConfigs, "/cf");
createCF("NettyCF", connectorConfigs1, "/nettyCf");

cf = (ConnectionFactory)context.lookup("/cf");
nettyCf = (ConnectionFactory)context.lookup("/nettyCf");
}


protected void createCF(final List<TransportConfiguration> connectorConfigs, final String... jndiBindings) throws Exception
{
createCF("ManualReconnectionToSingleServerTest", connectorConfigs, jndiBindings);
}

/**
* @param connectorConfigs
* @param jndiBindings
* @throws Exception
*/
protected void createCF(final List<TransportConfiguration> connectorConfigs, final String... jndiBindings) throws Exception
protected void createCF(final String name, final List<TransportConfiguration> connectorConfigs, final String... jndiBindings) throws Exception
{
int retryInterval = 1000;
double retryIntervalMultiplier = 1.0;
int reconnectAttempts = -1;
int callTimeout = 30000;

jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
jmsServer.createConnectionFactory(name,
false,
JMSFactoryType.CF,
registerConnectors(server, connectorConfigs),
Expand Down

0 comments on commit 7bde08c

Please sign in to comment.