From cb793e0e9827535fc3ce1c66b4ffbc41f513e9f5 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 11 Jun 2018 17:11:25 -0400 Subject: [PATCH] ARTEMIS-1924 Add amqpIdleTimeout --- .../amqp/broker/ProtonProtocolManager.java | 28 ++++++- .../amqp/proton/AMQPConnectionContext.java | 7 +- .../amqp/proton/handler/ProtonHandler.java | 17 +++- docs/user-manual/en/amqp.md | 29 +++++++ ... => AmqpBrokerRequestedHearbeatsTest.java} | 30 ++++++- .../integration/amqp/AmqpNoHearbeatsTest.java | 83 +++++++++++++++++++ 6 files changed, 185 insertions(+), 9 deletions(-) rename tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/{AmqpBrokerReuqestedHearbeatsTest.java => AmqpBrokerRequestedHearbeatsTest.java} (85%) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index c1a92e01029..d86dc81826b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -45,12 +45,15 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import io.netty.channel.ChannelPipeline; +import org.jboss.logging.Logger; /** * A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources */ public class ProtonProtocolManager extends AbstractProtocolManager implements NotificationListener { + private static final Logger logger = Logger.getLogger(ProtonProtocolManager.class); + private static final List websocketRegistryNames = Arrays.asList("amqp"); private final List incomingInterceptors = new ArrayList<>(); @@ -72,6 +75,9 @@ public class ProtonProtocolManager extends AbstractProtocolManager flushExecutor.execute(() -> { + this.readyListener = () -> this.flushExecutor.execute(() -> { flush(); }); this.creationTime = System.currentTimeMillis(); @@ -105,8 +105,17 @@ public ProtonHandler(Executor flushExecutor, boolean isServer) { connection.collect(collector); } - public long tick(boolean firstTick) { - lock.lock(); + public Long tick(boolean firstTick) { + if (firstTick) { + // the first tick needs to guarantee a lock here + lock.lock(); + } else { + if (!lock.tryLock()) { + log.debug("Cannot hold a lock on ProtonHandler for Tick, it will retry shortly"); + // if we can't lock the scheduler will retry in a very short period of time instead of holding the lock here + return null; + } + } try { if (!firstTick) { try { @@ -122,7 +131,7 @@ public long tick(boolean firstTick) { transport.close(); connection.setCondition(new ErrorCondition()); } - return 0; + return 0L; } return transport.tick(TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); } finally { diff --git a/docs/user-manual/en/amqp.md b/docs/user-manual/en/amqp.md index a201fbd998b..30cb2aa081f 100644 --- a/docs/user-manual/en/amqp.md +++ b/docs/user-manual/en/amqp.md @@ -127,3 +127,32 @@ message for later delivery: If both annotations are present in the same message then the broker will prefer the more specific `x-opt-delivery-time` value. + +## Configuring AMQP Idle Timeout + +It is possible to configure the AMQP Server's IDLE Timeout by setting the property amqpIdleTimeout in milliseconds on the acceptor. + +This will make the server to send an AMQP frame open to the client, with your configured timeout / 2. + +So, if you configured your AMQP Idle Timeout to be 60000, the server will tell the client to send frames every 30,000 milliseconds. + + +```xml +.... ;amqpIdleTimeout=; ..... +``` + + +### Disabling Keep alive checks + +if you set amqpIdleTimeout=0 that will tell clients to not sending keep alive packets towards the server. On this case +you will rely on TCP to determine when the socket needs to be closed. + +```xml +.... ;amqpIdleTimeout=0; ..... +``` + +This contains a real example for configuring amqpIdleTimeout: + +```xml +tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300;directDeliver=false;batchDelay=10 +``` \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java similarity index 85% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java index 8221ef6d0f4..fcc7acdd33a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerReuqestedHearbeatsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBrokerRequestedHearbeatsTest.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,18 +30,41 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.engine.Connection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test handling of heartbeats requested by the broker. */ -public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { +@RunWith(Parameterized.class) +public class AmqpBrokerRequestedHearbeatsTest extends AmqpClientTestSupport { private final int TEST_IDLE_TIMEOUT = 1000; + @Parameterized.Parameters(name = "useOverride={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + @Parameterized.Parameter(0) + public boolean useOverride; + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + if (!useOverride) { + params.put("amqpIdleTimeout", "" + TEST_IDLE_TIMEOUT); + } + } + + @Override protected void addConfiguration(ActiveMQServer server) { server.getConfiguration().setConnectionTtlCheckInterval(TEST_IDLE_TIMEOUT / 3); - server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT); + if (useOverride) { + server.getConfiguration().setConnectionTTLOverride(TEST_IDLE_TIMEOUT); + } } @Test(timeout = 60000) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java new file mode 100644 index 00000000000..2d5b3cf599c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNoHearbeatsTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class AmqpNoHearbeatsTest extends AmqpClientTestSupport { + + @Parameterized.Parameters(name = "useOverride={0}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + {true}, {false} + }); + } + + @Parameterized.Parameter(0) + public boolean useOverride; + + @Override + protected void addConfiguration(ActiveMQServer server) { + if (useOverride) { + server.getConfiguration().setConnectionTTLOverride(0); + } else { + server.getConfiguration().setConnectionTtlCheckInterval(500); + } + } + + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + if (!useOverride) { + params.put("amqpIdleTimeout", "0"); + } + } + + + @Test(timeout = 60000) + public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + assertEquals("idle timeout was not disabled", 0, connection.getTransport().getRemoteIdleTimeout()); + } + }); + + AmqpConnection connection = addConnection(client.connect()); + assertNotNull(connection); + + connection.getStateInspector().assertValid(); + connection.close(); + } + +}