Skip to content

Commit

Permalink
BZ-1193085 critical IO error on client disconnect
Browse files Browse the repository at this point in the history
When a Netty OIO thread servicing a client performs work on the journal
(e.g. creating a new page file, etc.) there is a small window where if
the connection is closed (either via administrative intervention or
because of ping failure, etc.) the Netty thread can be interrupted and
HornetQ will interpret this as a critical journal failure and shut
itself down.

(cherry picked from commit 54165e6)
  • Loading branch information
jbertram authored and clebertsuconic committed Jul 6, 2015
1 parent 89c2fb4 commit a2f77d2
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 5 deletions.
Expand Up @@ -296,6 +296,11 @@ public interface HornetQClientLogger extends BasicLogger
format = Message.Format.MESSAGE_FORMAT)
void compressedLargeMessageError(int length, int nReadBytes);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 212051, value = "Waited {0} milliseconds for packet to complete. Now forcibly terminating thread.",
format = Message.Format.MESSAGE_FORMAT)
void packetTimeout(long timeout);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e);
Expand Down
Expand Up @@ -21,12 +21,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.HornetQInterruptedException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
Expand Down Expand Up @@ -74,6 +78,9 @@ public class RemotingConnectionImpl implements CoreRemotingConnection

private final List<Interceptor> outgoingInterceptors;


// The current logic expect that this will only bet set to true once
// once destroyed this object (connection) needs to be replaced by a new one
private volatile boolean destroyed;

private final boolean client;
Expand All @@ -100,6 +107,11 @@ public class RemotingConnectionImpl implements CoreRemotingConnection

private String clientID;

/**
* a readLock will be issued around every call here
*/
private final ReadWriteLock callingLock = new ReentrantReadWriteLock();

// Constructors
// ---------------------------------------------------------------------------------

Expand Down Expand Up @@ -337,6 +349,35 @@ public void fail(final HornetQException me)

HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());

/* Wait for any current packets to finish processing. This is to avoid interrupting threads which are doing
* journal work as such an interrupt could trigger a critical IO error. See https://bugzilla.redhat.com/show_bug.cgi?id=1193085
* for more information.
*/
boolean acquiredLock = false;

try
{
long timeout = HornetQClient.DEFAULT_CALL_TIMEOUT;

acquiredLock = callingLock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);

if (!acquiredLock)
{
HornetQClientLogger.LOGGER.packetTimeout(timeout);
}
}
catch (InterruptedException e)
{
HornetQClientLogger.LOGGER.debug(e.getMessage(), e);
}
finally
{
if (acquiredLock)
{
callingLock.writeLock().unlock();
}
}


try
{
Expand Down Expand Up @@ -543,19 +584,41 @@ public void run()

private void doBufferReceived(final Packet packet)
{
if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null)
if (destroyed)
{
// ooptimization: check once without a lock
HornetQClientLogger.LOGGER.debug("Ignoring a packet " + packet + " that has arrived after connection destroyed: " + this);
return;
}

synchronized (transferLock)
callingLock.readLock().lock();
try
{
final Channel channel = channels.get(packet.getChannelID());
if (destroyed)
{
// check once again withing a lock
HornetQClientLogger.LOGGER.debug("Ignoring a packet " + packet + " that has arrived after connection destroyed: " + this);
return;
}

if (channel != null)
if (ChannelImpl.invokeInterceptors(packet, incomingInterceptors, this) != null)
{
channel.handlePacket(packet);
return;
}

synchronized (transferLock)
{
final Channel channel = channels.get(packet.getChannelID());

if (channel != null)
{
channel.handlePacket(packet);
}
}
}
finally
{
callingLock.readLock().unlock();
}
}

Expand Down
@@ -0,0 +1,188 @@
/*
* Copyright 2005-2014 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.hornetq.byteman.tests;

import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;

import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.HornetQServerControl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.integration.IntegrationTestLogger;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.util.ServiceTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(BMUnitRunner.class)
public class ClosingConnectionTest extends ServiceTestBase
{
public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");

private ServerLocator locator;

private HornetQServer server;

private static MBeanServer mBeanServer;

private static boolean readyToKill = false;

protected boolean isNetty()
{
return true;
}


@Override
@Before
public void setUp() throws Exception
{
super.setUp();
mBeanServer = MBeanServerFactory.createMBeanServer();
server = newHornetQServer();
server.getConfiguration().setJournalType(JournalType.NIO);
server.getConfiguration().setJMXManagementEnabled(true);
server.start();
waitForServer(server);
locator = createFactory(isNetty());
readyToKill = false;
}

public static void killConnection() throws InterruptedException
{
if (readyToKill)
{
// We have to kill the connection in a new thread otherwise Netty won't interrupt the current thread
Thread closeConnectionThread = new Thread(new Runnable()
{
@Override
public void run()
{
try
{
HornetQServerControl serverControl = ManagementControlHelper.createHornetQServerControl(mBeanServer);
serverControl.closeConnectionsForUser("guest");
readyToKill = false;
}
catch (Exception e)
{
e.printStackTrace();
}
}
});

closeConnectionThread.start();

try
{
/* We want to simulate a long-running remoting thread here. If closing the connection in the closeConnectionThread
* interrupts this thread then it will cause sleep() to throw and InterruptedException. Therefore we catch
* the InterruptedException and re-interrupt the current thread so the interrupt will be passed properly
* back to the caller. It's a bit of a hack, but I couldn't find any other way to simulate it.
*/
Thread.sleep(1500);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
}

/*
* Test for https://bugzilla.redhat.com/show_bug.cgi?id=1193085
* */
@Test
@BMRules
(
rules =
{
@BMRule
(
name = "rule to kill connection",
targetClass = "org.hornetq.core.journal.impl.NIOSequentialFile",
targetMethod = "open(int, boolean)",
targetLocation = "AT INVOKE java.nio.channels.FileChannel.size()",
action = "org.hornetq.byteman.tests.ClosingConnectionTest.killConnection();"

)
}
)
public void testKillConnection() throws Exception
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);

ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession("guest", null, false, true, true, false, 0);

session.createQueue(ADDRESS, ADDRESS, null, true);

ClientProducer producer = session.createProducer(ADDRESS);

ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);

for (int i = 0; i < 200; i++)
{
producer.send(message);
}

assertTrue(server.locateQueue(ADDRESS).getPageSubscription().getPagingStore().isPaging());

readyToKill = true;
try
{
for (int i = 0; i < 8; i++)
{
producer.send(message);
}
fail("Sending message here should result in failure.");
}
catch (Exception e)
{
IntegrationTestLogger.LOGGER.info("Caught exception: " + e.getMessage());
}

Thread.sleep(1000);

assertTrue(server.isStarted());

session.close();
}

private HornetQServer newHornetQServer() throws Exception
{
HornetQServer server = createServer(true, createDefaultConfig(isNetty()), mBeanServer);

AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(10 * 1024);
defaultSetting.setMaxSizeBytes(20 * 1024);

server.getAddressSettingsRepository().addMatch("#", defaultSetting);

return server;
}
}

0 comments on commit a2f77d2

Please sign in to comment.