From e42117a6505c32d87999601c0810acccaa6312ed Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 7 Sep 2016 16:24:52 -0400 Subject: [PATCH] ARTEMIS-722 Add DELAYED_DELIVERY capability to server connection open The server should indicate to clients that it supports the message annotation that allows message delivery to be delayed 'x-opt-delivery-time' --- .../proton/plug/AMQPConnectionContext.java | 10 ++ .../java/org/proton/plug/AmqpSupport.java | 1 + .../context/AbstractConnectionContext.java | 19 ++- .../server/ProtonServerConnectionContext.java | 6 + .../tests/integration/proton/ProtonTest.java | 114 ++++++++++++++++++ 5 files changed, 144 insertions(+), 6 deletions(-) diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java index 45f98042774..9123006fb3c 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AMQPConnectionContext.java @@ -16,6 +16,8 @@ */ package org.proton.plug; +import org.apache.qpid.proton.amqp.Symbol; + import io.netty.buffer.ByteBuf; public interface AMQPConnectionContext { @@ -30,6 +32,14 @@ public interface AMQPConnectionContext { SASLResult getSASLResult(); + /** + * Load and return a []Symbol that contains the connection capabilities + * offered to new connections + * + * @return the capabilities that are offered to new remote peers on connect. + */ + Symbol[] getConnectionCapabilitiesOffered(); + /** * Even though we are currently always sending packets asynchronsouly * we have a possibility to start trusting on the network flow control diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java index f57fd81267d..158085563aa 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/AmqpSupport.java @@ -46,6 +46,7 @@ public class AmqpSupport { // Symbols used to announce connection information to remote peer. public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY"); public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java index c881031746b..9ece7903db8 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractConnectionContext.java @@ -16,6 +16,9 @@ */ package org.proton.plug.context; +import static org.proton.plug.AmqpSupport.PRODUCT; +import static org.proton.plug.AmqpSupport.VERSION; + import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -76,11 +79,13 @@ public AbstractConnectionContext(AMQPConnectionCallback connectionCallback, ScheduledExecutorService scheduledPool) { this.connectionCallback = connectionCallback; this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); - connectionProperties.put(Symbol.valueOf("product"), "apache-activemq-artemis"); - connectionProperties.put(Symbol.valueOf("version"), VersionLoader.getVersion().getFullVersion()); + + connectionProperties.put(PRODUCT, "apache-activemq-artemis"); + connectionProperties.put(VERSION, VersionLoader.getVersion().getFullVersion()); + this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); - this.handler = ProtonHandler.Factory.create(dispatchExecutor); + this.handler = ProtonHandler.Factory.create(dispatchExecutor); Transport transport = handler.getTransport(); transport.setEmitFlowEventOnSend(false); if (idleTimeout > 0) { @@ -211,6 +216,7 @@ public void onRemoteOpen(Connection connection) throws Exception { connection.setContext(AbstractConnectionContext.this); connection.setContainer(containerId); connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); connection.open(); } initialise(); @@ -326,9 +332,10 @@ public void onDelivery(Delivery delivery) throws Exception { System.err.println("Handler is null, can't delivery " + delivery); } } - } - - + @Override + public Symbol[] getConnectionCapabilitiesOffered() { + return null; + } } diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java index bdb3a69bd01..4124c2f5496 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -16,6 +16,9 @@ */ package org.proton.plug.context.server; +import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY; + +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; @@ -79,4 +82,7 @@ protected void remoteLinkOpened(Link link) throws Exception { } } + public Symbol[] getConnectionCapabilitiesOffered() { + return new Symbol[]{DELAYED_DELIVERY}; + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 711f6ff34e7..8da5aa2fe44 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -16,6 +16,11 @@ */ package org.apache.activemq.artemis.tests.integration.proton; +import static org.proton.plug.AmqpSupport.contains; +import static org.proton.plug.AmqpSupport.DELAYED_DELIVERY; +import static org.proton.plug.AmqpSupport.PRODUCT; +import static org.proton.plug.AmqpSupport.VERSION; + import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -63,6 +68,7 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -210,6 +216,114 @@ public void testBrokerConnectionProperties() throws Exception { } } + @Test(timeout = 60000) + public void testConnectionCarriesExpectedCapabilities() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) { + + Symbol[] offered = connection.getRemoteOfferedCapabilities(); + + if (!contains(offered, DELAYED_DELIVERY)) { + markAsInvalid("Broker did not indicate it support delayed message delivery"); + return; + } + + Map properties = connection.getRemoteProperties(); + if (!properties.containsKey(PRODUCT)) { + markAsInvalid("Broker did not send a queue product name value"); + return; + } + + if (!properties.containsKey(VERSION)) { + markAsInvalid("Broker did not send a queue version value"); + return; + } + } + }); + + AmqpConnection connection = client.connect(); + try { + assertNotNull(connection); + connection.getStateInspector().assertValid(); + } + finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeHoldsMessage() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + assertNotNull(client); + + AmqpConnection connection = client.connect(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(address); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5); + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + // Shouldn't get this since we delayed the message. + assertNull(receiver.receive(5, TimeUnit.SECONDS)); + } + finally { + connection.close(); + } + } + + @Test(timeout = 60000) + public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception { + if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + assertNotNull(client); + + AmqpConnection connection = client.connect(); + try { + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(address); + AmqpReceiver receiver = session.createReceiver(address); + + AmqpMessage message = new AmqpMessage(); + long deliveryTime = System.currentTimeMillis() + 2000; + message.setMessageAnnotation("x-opt-delivery-time", deliveryTime); + message.setText("Test-Message"); + sender.send(message); + + // Now try and get the message + receiver.flow(1); + + AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS); + assertNotNull(received); + received.accept(); + Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time"); + assertNotNull(msgDeliveryTime); + assertEquals(deliveryTime, msgDeliveryTime.longValue()); + } + finally { + connection.close(); + } + } + @Test public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol