From 881615e6464c4779917e275c2e9635442ccd77f4 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Wed, 14 Dec 2016 18:11:09 +0000 Subject: [PATCH] ARTEMIS-891 - upgrade proton to 0.16 https://issues.apache.org/jira/browse/ARTEMIS-891 --- .../proton/ProtonServerReceiverContext.java | 10 ++++ pom.xml | 2 +- .../transport/amqp/client/AmqpSender.java | 34 +++++++++++++ .../transport/amqp/client/AmqpSession.java | 51 +++++++++++++++++++ .../amqp/client/util/UnmodifiableLink.java | 30 +++++++++++ .../client/util/UnmodifiableReceiver.java | 6 +++ .../amqp/client/util/UnmodifiableSender.java | 6 +++ .../amqp/client/util/UnmodifiableSession.java | 48 +++++++++++++++++ .../integration/amqp/AmqpSendReceiveTest.java | 30 +++++++++++ 9 files changed, 216 insertions(+), 1 deletion(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index a265836ca10..7d50503f79d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -35,6 +35,9 @@ import org.apache.qpid.proton.engine.Receiver; import org.jboss.logging.Logger; +import java.util.Arrays; +import java.util.List; + public class ProtonServerReceiverContext extends ProtonInitializable implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonServerReceiverContext.class); @@ -110,6 +113,13 @@ public void initialise() throws Exception { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } } + Symbol[] remoteDesiredCapabilities = receiver.getRemoteDesiredCapabilities(); + if (remoteDesiredCapabilities != null) { + List list = Arrays.asList(remoteDesiredCapabilities); + if (list.contains(AmqpSupport.DELAYED_DELIVERY)) { + receiver.setOfferedCapabilities(new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); + } + } } flow(maxCreditAllocation, minCreditRefresh); } diff --git a/pom.xml b/pom.xml index c8833f86614..577c6eb4a53 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ 3.6.9.Final 2.4 4.1.5.Final - 0.15.0 + 0.16.0 3.0.19.Final 1.7.21 0.11.0 diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 9b2a70d689d..350a201a06b 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,6 +73,10 @@ public class AmqpSender extends AmqpAbstractResource { private final Set pending = new LinkedHashSet<>(); private byte[] encodeBuffer = new byte[1024 * 8]; + private Symbol[] desiredCapabilities; + private Symbol[] offeredCapabilities; + private Map properties; + /** * Create a new sender instance. * @@ -231,6 +236,31 @@ public void setSendTimeout(long sendTimeout) { this.sendTimeout = sendTimeout; } + + public void setDesiredCapabilities(Symbol[] desiredCapabilities) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.desiredCapabilities = desiredCapabilities; + } + + public void setOfferedCapabilities(Symbol[] offeredCapabilities) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.offeredCapabilities = offeredCapabilities; + } + + public void setProperties(Map properties) { + if (getEndpoint() != null) { + throw new IllegalStateException("Endpoint already established"); + } + + this.properties = properties; + } + //----- Private Sender implementation ------------------------------------// private void checkClosed() { @@ -265,6 +295,10 @@ protected void doOpen() { } sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + sender.setDesiredCapabilities(desiredCapabilities); + sender.setOfferedCapabilities(offeredCapabilities); + sender.setProperties(properties); + setEndpoint(sender); super.doOpen(); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index d4b16c15cf9..e9a90c15d46 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.client; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -24,6 +25,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.engine.Connection; @@ -99,6 +101,19 @@ public AmqpSender createSender(final String address) throws Exception { return createSender(address, false); } + /** + * Create a sender instance using the given address + * + * @param address the address to which the sender will produce its messages. + * @param desiredCapabilities the capabilities that the caller wants the remote to support. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, Symbol[] desiredCapabilities) throws Exception { + return createSender(address, false, desiredCapabilities, null, null); + } + + /** * Create a sender instance using the given address * @@ -108,10 +123,28 @@ public AmqpSender createSender(final String address) throws Exception { * @throws Exception if an error occurs while creating the sender. */ public AmqpSender createSender(final String address, boolean presettle) throws Exception { + return createSender(address, presettle, null, null, null); + } + + /** + * Create a sender instance using the given address + * + * @param address the address to which the sender will produce its messages. + * @param presettle controls if the created sender produces message that have already been marked settled. + * @param desiredCapabilities the capabilities that the caller wants the remote to support. + * @param offeredCapabilities the capabilities that the caller wants the advertise support for. + * @param properties the properties to send as part of the sender open. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender(final String address, boolean presettle, Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map properties) throws Exception { checkClosed(); final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId()); sender.setPresettle(presettle); + sender.setDesiredCapabilities(desiredCapabilities); + sender.setOfferedCapabilities(offeredCapabilities); + sender.setProperties(properties); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { @@ -150,9 +183,27 @@ public AmqpSender createSender(Target target) throws Exception { * @throws Exception if an error occurs while creating the receiver. */ public AmqpSender createSender(Target target, String senderId) throws Exception { + return createSender(target, senderId, null, null, null); + } + + /** + * Create a sender instance using the given Target + * + * @param target the caller created and configured Traget used to create the sender link. + * @param senderId the sender ID to assign to the newly created Sender. + * @param desiredCapabilities the capabilities that the caller wants the remote to support. + * @param offeredCapabilities the capabilities that the caller wants the advertise support for. + * @param properties the properties to send as part of the sender open. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target, String senderId, Symbol[] desiredCapabilities, Symbol[] offeredCapabilities, Map properties) throws Exception { checkClosed(); final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId); + sender.setDesiredCapabilities(desiredCapabilities); + sender.setOfferedCapabilities(offeredCapabilities); + sender.setProperties(properties); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java index ac0e83eaf09..7e4319d91b2 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableLink.java @@ -273,4 +273,34 @@ public void setProperties(Map properties) { public Map getRemoteProperties() { return link.getRemoteProperties(); } + + @Override + public Symbol[] getDesiredCapabilities() { + return link.getDesiredCapabilities(); + } + + @Override + public Symbol[] getOfferedCapabilities() { + return link.getOfferedCapabilities(); + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() { + return link.getRemoteDesiredCapabilities(); + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() { + return link.getRemoteOfferedCapabilities(); + } + + @Override + public void setDesiredCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setOfferedCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java index 92760db3372..f447d87d28e 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableReceiver.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.client.util; +import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Receiver; /** @@ -42,6 +43,11 @@ public int recv(byte[] bytes, int offset, int size) { throw new UnsupportedOperationException("Cannot alter the Link state"); } + @Override + public int recv(WritableBuffer buffer) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + @Override public void drain(int credit) { throw new UnsupportedOperationException("Cannot alter the Link state"); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java index 89742cb517e..3c67f682b11 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSender.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.client.util; +import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.engine.Sender; /** @@ -38,6 +39,11 @@ public int send(byte[] bytes, int offset, int length) { throw new UnsupportedOperationException("Cannot alter the Link state"); } + @Override + public int send(ReadableBuffer buffer) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + @Override public void abort() { throw new UnsupportedOperationException("Cannot alter the Link state"); diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java index a44028e0e5a..3fc26cbd743 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableSession.java @@ -17,7 +17,9 @@ package org.apache.activemq.transport.amqp.client.util; import java.util.EnumSet; +import java.util.Map; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; @@ -147,4 +149,50 @@ public long getOutgoingWindow() { public void setOutgoingWindow(long outgoingWindowSize) { throw new UnsupportedOperationException("Cannot alter the Session"); } + + + @Override + public Symbol[] getDesiredCapabilities() { + return session.getDesiredCapabilities(); + } + + @Override + public Symbol[] getOfferedCapabilities() { + return session.getOfferedCapabilities(); + } + + @Override + public Map getProperties() { + return session.getProperties(); + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() { + return session.getRemoteDesiredCapabilities(); + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() { + return session.getRemoteOfferedCapabilities(); + } + + @Override + public Map getRemoteProperties() { + return session.getRemoteProperties(); + } + + @Override + public void setDesiredCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setOfferedCapabilities(Symbol[] capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } + + @Override + public void setProperties(Map capabilities) { + throw new UnsupportedOperationException("Cannot alter the Link state"); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index aae265050ef..b8178348cc9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -19,6 +19,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS; import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS; import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter; +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; import java.util.ArrayList; import java.util.LinkedList; @@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -41,6 +43,7 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -833,6 +836,33 @@ public void testTwoPresettledReceiversReceiveAllMessages() throws Exception { connection.close(); } + @Test + public void testDeliveryDelayOfferedWhenRequested() throws Exception { + AmqpClient client = createAmqpClient(); + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Sender sender) { + + Symbol[] offered = sender.getRemoteOfferedCapabilities(); + if (!contains(offered, AmqpSupport.DELAYED_DELIVERY)) { + markAsInvalid("Broker did not indicate it support delayed message delivery"); + } + } + }); + + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY}); + assertNotNull(sender); + + connection.getStateInspector().assertValid(); + + sender.close(); + connection.close(); + } + public void sendMessages(String destinationName, int count) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect());