Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import org.apache.activemq.artemis.spi.core.remoting.Connection;

import io.netty.channel.ChannelPipeline;
import org.jboss.logging.Logger;

/**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
*/
public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, AmqpInterceptor, ActiveMQProtonRemotingConnection> implements NotificationListener {

private static final Logger logger = Logger.getLogger(ProtonProtocolManager.class);

private static final List<String> websocketRegistryNames = Arrays.asList("amqp");

private final List<AmqpInterceptor> incomingInterceptors = new ArrayList<>();
Expand All @@ -72,6 +75,9 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,

private String saslLoginConfigScope = "amqp-sasl-gssapi";

private Long amqpIdleTimeout;


/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
* the address. This can be changed on the acceptor.
Expand Down Expand Up @@ -115,6 +121,17 @@ public boolean acceptsNoHandshake() {
return false;
}

public Long getAmqpIdleTimeout() {
return amqpIdleTimeout;
}

public ProtonProtocolManager setAmqpIdleTimeout(Long ttl) {
logger.debug("Setting up " + ttl + " as the connectionTtl");
this.amqpIdleTimeout = ttl;
return this;
}


@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) {
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(this, remotingConnection, server.getExecutorFactory().getExecutor(), server);
Expand All @@ -124,6 +141,14 @@ public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection r
ttl = server.getConfiguration().getConnectionTTLOverride();
}

if (getAmqpIdleTimeout() != null) {
ttl = getAmqpIdleTimeout().longValue();
}

if (ttl < 0) {
ttl = 0;
}

String id = server.getConfiguration().getName();
boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), true, null, null);
Expand All @@ -136,7 +161,8 @@ public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection r

connectionCallback.setProtonConnectionDelegate(delegate);

ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl);
// connection entry only understands -1 otherwise we would see disconnects for no reason
ConnectionEntry entry = new ConnectionEntry(delegate, executor, System.currentTimeMillis(), ttl <= 0 ? -1 : ttl);

return entry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,11 @@ public void onRemoteOpen(Connection connection) throws Exception {
scheduledPool.schedule(new Runnable() {
@Override
public void run() {
long rescheduleAt = handler.tick(false);
if (rescheduleAt != 0) {
Long rescheduleAt = handler.tick(false);
if (rescheduleAt == null) {
// this mean tick could not acquire a lock, we will just retry in 10 milliseconds.
scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS);
} else if (rescheduleAt != 0) {
scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {

public ProtonHandler(Executor flushExecutor, boolean isServer) {
this.flushExecutor = flushExecutor;
this.readyListener = () -> flushExecutor.execute(() -> {
this.readyListener = () -> this.flushExecutor.execute(() -> {
flush();
});
this.creationTime = System.currentTimeMillis();
Expand All @@ -105,8 +105,17 @@ public ProtonHandler(Executor flushExecutor, boolean isServer) {
connection.collect(collector);
}

public long tick(boolean firstTick) {
lock.lock();
public Long tick(boolean firstTick) {
if (firstTick) {
// the first tick needs to guarantee a lock here
lock.lock();
} else {
if (!lock.tryLock()) {
log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly");
// if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here
return null;
}
}
try {
if (!firstTick) {
try {
Expand All @@ -122,7 +131,7 @@ public long tick(boolean firstTick) {
transport.close();
connection.setCondition(new ErrorCondition());
}
return 0;
return 0L;
}
return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime()));
} finally {
Expand Down
29 changes: 29 additions & 0 deletions docs/user-manual/en/amqp.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,32 @@ message for later delivery:

If both annotations are present in the same message then the broker will prefer
the more specific `x-opt-delivery-time` value.

## Configuring AMQP Idle Timeout

It is possible to configure the AMQP Server's IDLE Timeout by setting the property amqpIdleTimeout in milliseconds on the acceptor.

This will make the server to send an AMQP frame open to the client, with your configured timeout / 2.

So, if you configured your AMQP Idle Timeout to be 60000, the server will tell the client to send frames every 30,000 milliseconds.


```xml
<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>
```


### Disabling Keep alive checks

if you set amqpIdleTimeout=0 that will tell clients to not sending keep alive packets towards the server. On this case
you will rely on TCP to determine when the socket needs to be closed.

```xml
<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>
```

This contains a real example for configuring amqpIdleTimeout:

```xml
<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10</acceptor>
```
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -27,18 +30,41 @@
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/**
* Test handling of heartbeats requested by the broker.
*/
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
@RunWith(Parameterized.class)
public class AmqpBrokerRequestedHearbeatsTest extends AmqpClientTestSupport {

private final int TEST_IDLE_TIMEOUT = 1000;

@Parameterized.Parameters(name = "useOverride={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}

@Parameterized.Parameter(0)
public boolean useOverride;

@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
if (!useOverride) {
params.put("amqpIdleTimeout", "" + TEST_IDLE_TIMEOUT);
}
}


@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setConnectionTtlCheckInterval(TEST_IDLE_TIMEOUT / 3);
server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT);
if (useOverride) {
server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT);
}
}

@Test(timeout = 60000)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class AmqpNoHearbeatsTest extends AmqpClientTestSupport {

@Parameterized.Parameters(name = "useOverride={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{true}, {false}
});
}

@Parameterized.Parameter(0)
public boolean useOverride;

@Override
protected void addConfiguration(ActiveMQServer server) {
if (useOverride) {
server.getConfiguration().setConnectionTTLOverride(0);
} else {
server.getConfiguration().setConnectionTtlCheckInterval(500);
}
}


@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
if (!useOverride) {
params.put("amqpIdleTimeout", "0");
}
}


@Test(timeout = 60000)
public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the test doesn't appear to match what is being tested here so I'd name it something that actually describes that to avoid confusing folks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch.. you're right.. and I missed it.. I will rename right away.

the joy of copy & paste

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will call it testHeartless :)

AmqpClient client = createAmqpClient();
assertNotNull(client);

client.setValidator(new AmqpValidator() {

@Override
public void inspectOpenedResource(Connection connection) {
assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout());
}
});

AmqpConnection connection = addConnection(client.connect());
assertNotNull(connection);

connection.getStateInspector().assertValid();
connection.close();
}

}