feat(#1739): Add requestTimeout to ConnectionFactory to prevent hanging threads in syncSendPacket()#1740
feat(#1739): Add requestTimeout to ConnectionFactory to prevent hanging threads in syncSendPacket()#1740jbonofre wants to merge 2 commits intoapache:mainfrom
Conversation
activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
Outdated
Show resolved
Hide resolved
activemq-unit-tests/src/test/java/org/apache/activemq/transport/ResponseCorrelatorTest.java
Outdated
Show resolved
Hide resolved
activemq-unit-tests/src/test/java/org/apache/activemq/transport/ResponseCorrelatorTest.java
Outdated
Show resolved
Hide resolved
|
What is the use case for this? When were/are you seeing timeouts? |
|
@cshannon it's my new work on an old issue from Jira (AMQ-6763). I was able to reproduce it when using XA: the broker never sent back a response to these XA commands. In my case, I reproduced the issue with simulating network failure: the TCP connection to the broker silently dropped, the request was sent but the response never arrives, and no TCP error was detected, meaning the XA thread hangs forever. I believe that the similar behavior can happen:
This PR aims to set timeout to avoid the thread handing forever. |
|
Usually the timeout is set on a Connection and then passed when making the request as seen at this line. Maybe we need to be passing the timeout value in more spots? I think the code just defaults to 0 and not set in many places so the timeout is not used. I'm also not a huge fan of system properties as they are global and can't be easily set for a single connection, for example unless there is no other option. |
|
I can pass the timeout from the The System property is more for "transition" as I think we should have a timeout (0/no timeout is not a good idea for |
Looking at it closer, it's getting stuck here in setXid() when calling syncSendPacket() on the connection. If you want to add a new configuration option for a default timeout I think you should move it one step deeper into the ActiveMQConnection, inside the syncSendPacket() method. You can replace the 0 that is passed with a new configuration option (that will of course be 0 by default for compatibility). The configuration option can be configured on the connection as a URL parameter or setter like any other connection property and we can avoid the system property and also avoid having the timeout stuff in different locations. There's already a closeTimeout so this could fit in with that and be called something like |
cshannon
left a comment
There was a problem hiding this comment.
Requesting changes to move the timeout to ActiveMQConnection as requestTimeout instead as described in #1740 (comment)
There's also a sendTimeout and possibly a connectTimeout in ActiveMQConnection. I recall adding sendTimeout a long time ago for similar reasons. |
Ah yeah, i see the sendTimeout already defined just below it, forgot about that. It is just used to pass to the producer (which makes sense it is only used on message sending). It would be nice to re-used a timeout but I think we likely need a new one that is more broad, ie |
Yes you would likely add a new timeout option such as 'requestTimeout'. We have similar options in Qpid JMS which cover all cases of blocking calls, connectTimeout, closeTimeout, sendTimeout, requestTimeout and drainTimeout. |
|
@jbonofre - One more thing, if adding a new requestTimeout property for the connection, you should added the property to the |
|
@cshannon @tabish121 @jeanouii I changed the approach as you suggested. I introduce a configurable When set, NB: it can be configured programmatically via |
There was a problem hiding this comment.
The requestTimeout change looks good but I noticed a few things to fix.
1. I made a comment inline but you need to do something else besides create a session, like creating a consumer, to trigger the sync send. creating a session is actually done async.
2. Inside of TransactionContext there is a spot that needs to be updated to use the request timeout value here. The line should be:
this.connection.syncSendPacket(info, this.connection.isClosing() ?
this.connection.getCloseTimeout() : this.connection.getRequestTimeout());3. You should add a test to shows the timeout actually works, such as:
@Test
public void testSyncSendPacketFailFromTimeout() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
// Set to super short 1 millisecond so we always time out
factory.setRequestTimeout(1);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
final Exception exception = assertThrows(JMSException.class,
() -> {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue("test"));
});
assertEquals(RequestTimedOutIOException.class,
TransportConnector.getRootCause(exception).getClass());
}
}4. You could also add a test to verify that the method that passes along the timeout value overrides the default as intended:
@Test
public void testSyncSendPacketOverrideDefaultRequestTimeout() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// After session creation set the timeout default to be very short to test that
// overriding directly works
connection.setRequestTimeout(1);
ConsumerInfo info = new ConsumerInfo(session.getSessionInfo(),
session.getNextConsumerId().getValue());
info.setDestination(new ActiveMQQueue("test"));
// Send info packet with timeout override
assertNotNull("Consumer should be created successfully with no timeout",
connection.syncSendPacket(info, 5000));
}
}
activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
Outdated
Show resolved
Hide resolved
activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
Outdated
Show resolved
Hide resolved
… syncSendPacket() When the broker becomes unreachable without the TCP connection being properly closed (network partition, half-open connection), FutureResponse.getResult() calls ArrayBlockingQueue.take() which blocks indefinitely, causing threads to hang forever. Introduce a configurable requestTimeout (default 0, no timeout) on ActiveMQConnectionFactory and ActiveMQConnection, similar to the existing sendTimeout. When set, syncSendPacket(Command) uses poll(timeout) instead of take(), throwing RequestTimedOutIOException when the timeout expires. Can be configured programmatically via factory.setRequestTimeout(ms) or via URL parameter jms.requestTimeout=ms.
|
@cshannon I addressed your comments. |
cshannon
left a comment
There was a problem hiding this comment.
I left 2 minor comments inline but otherwise I think is about ready
activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
Outdated
Show resolved
Hide resolved
activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
Outdated
Show resolved
Hide resolved
|
@cshannon updated according to your comments. |
|
This closes #1739 |
This is a rework of #644 (new investigation).