From b549bb243c2b4536f2ca9d84e777cca9bff019b9 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Tue, 26 Jul 2016 11:11:33 +0100 Subject: [PATCH] Add default AMQP flow behaviour and fix proton test --- .../ProtonSessionIntegrationCallback.java | 1 + .../AbstractProtonReceiverContext.java | 11 +- .../tests/integration/proton/ProtonTest.java | 115 +++++++++++++----- 3 files changed, 92 insertions(+), 35 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index ab57fe170c1..b2d029f2647 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -405,6 +405,7 @@ public void offerProducerCredit(final String address, final int credits, final i public void run() { if (receiver.getRemoteCredit() < threshold) { receiver.flow(credits); + connection.flush(); } } }); diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java index 5a430293cdd..c21095062b6 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java @@ -58,10 +58,17 @@ public void close(ErrorCondition condition) throws ActiveMQAMQPException { } public void flow(int credits, int threshold) { - synchronized (connection.getLock()) { + // Use the SessionSPI to allocate producer credits, or default, always allocate credit. + if (sessionSPI != null) { sessionSPI.offerProducerCredit(address, credits, threshold, receiver); } - connection.flush(); + else { + synchronized (connection.getLock()) { + receiver.flow(credits); + connection.flush(); + } + } + } public void drain(int credits) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java index 887427149a6..2c68dde43d6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java @@ -45,7 +45,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -136,9 +138,9 @@ public void setUp() throws Exception { server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); + // Default Page AddressSettings addressSettings = new AddressSettings(); - addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); - addressSettings.setMaxSizeBytes(1 * 1024 * 1024); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); server.getConfiguration().getAddressesSettings().put("#", addressSettings); server.start(); @@ -230,20 +232,25 @@ public void testTemporaryQueue() throws Throwable { @Test public void testResourceLimitExceptionOnAddressFull() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + setAddressFullBlockPolicy(); + fillAddress(address + 1); } @Test public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + setAddressFullBlockPolicy(); + String destinationAddress = address + 1; + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination d = session.createQueue(destinationAddress); + MessageProducer p = session.createProducer(d); + fillAddress(destinationAddress); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Exception e = null; try { - Destination d = session.createQueue(destinationAddress); - MessageProducer p = session.createProducer(d); p.send(session.createBytesMessage()); } catch (ResourceAllocationException rae) { @@ -256,6 +263,7 @@ public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception { @Test public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + setAddressFullBlockPolicy(); // Only allow 1 credit to be submitted at a time. Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation"); @@ -269,9 +277,13 @@ public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { try { AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(destinationAddress); - sender.setSendTimeout(1000); - sendUntilFull(sender); - assertTrue(sender.getSender().getCredit() <= 0); + + // Use blocking send to ensure buffered messages do not interfere with credit. + sender.setSendTimeout(-1); + sendUntilFull(sender, destinationAddress); + + // This should be -1. A single message is buffered in the client, and 0 credit has been allocated. + assertTrue(sender.getSender().getCredit() == -1); } finally { amqpConnection.close(); @@ -282,13 +294,14 @@ public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { @Test public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + setAddressFullBlockPolicy(); String destinationAddress = address + 1; int messagesSent = fillAddress(destinationAddress); - AmqpConnection amqpConnection = null; + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = amqpConnection = client.connect(); try { - amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri)); AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(destinationAddress); @@ -308,7 +321,8 @@ public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { // Wait for address to unblock and flow frame to arrive Thread.sleep(500); - assertTrue(sender.getSender().getCredit() > 0); + + assertTrue(sender.getSender().getCredit() == 0); assertNotNull(receiver.receive()); } finally { @@ -319,11 +333,12 @@ public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { @Test public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception { if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol + setAddressFullBlockPolicy(); fillAddress(address + 1); - AmqpConnection amqpConnection = null; + AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); + AmqpConnection amqpConnection = amqpConnection = client.connect(); try { - amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri)); AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address + 1); // Wait for a potential flow frame. @@ -344,38 +359,62 @@ public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws private int fillAddress(String address) throws Exception { AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password); AmqpConnection amqpConnection = client.connect(); + int messagesSent = 0; + Exception exception = null; try { AmqpSession session = amqpConnection.createSession(); AmqpSender sender = session.createSender(address); - return sendUntilFull(sender); + messagesSent = sendUntilFull(sender, null); + } + catch (Exception e) { + exception = e; } finally { amqpConnection.close(); } + + // Should receive a rejected error + assertNotNull(exception); + assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded")); + + return messagesSent; } - private int sendUntilFull(AmqpSender sender) throws IOException { - AmqpMessage message = new AmqpMessage(); + private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception { + final AmqpMessage message = new AmqpMessage(); byte[] payload = new byte[50 * 1024]; + message.setBytes(payload); - int sentMessages = 0; - int maxMessages = 50; + final int maxMessages = 50; + final AtomicInteger sentMessages = new AtomicInteger(0); + final Exception[] errors = new Exception[1]; + final CountDownLatch timeout = new CountDownLatch(1); - Exception e = null; - try { - for (int i = 0; i < maxMessages; i++) { - message.setBytes(payload); - sender.send(message); - sentMessages++; + Runnable sendMessages = new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < maxMessages; i++) { + sender.send(message); + sentMessages.getAndIncrement(); + } + timeout.countDown(); + } + catch (IOException e) { + errors[0] = e; + } } - } - catch (IOException ioe) { - e = ioe; - } + }; + + Thread t = new Thread(sendMessages); + t.start(); + + timeout.await(5, TimeUnit.SECONDS); - assertNotNull(e); - assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded")); - return sentMessages; + if (errors[0] != null) { + throw errors[0]; + } + return sentMessages.get(); } @Test @@ -398,7 +437,6 @@ public void testReplyTo() throws Throwable { Destination jmsReplyTo = message.getJMSReplyTo(); Assert.assertNotNull(jmsReplyTo); Assert.assertNotNull(message); - } @Test @@ -729,10 +767,13 @@ public void testClientAckMessages() throws Exception { consumer.close(); connection.close(); + + // Wait for Acks to be processed and message removed from queue. + Thread.sleep(500); + Assert.assertEquals(0, getMessageCount(q)); long taken = (System.currentTimeMillis() - time) / 1000; System.out.println("taken = " + taken); - } @Test @@ -1140,6 +1181,14 @@ public void onException(JMSException exception) { return connection; } + private void setAddressFullBlockPolicy() { + // For BLOCK tests + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#"); + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + addressSettings.setMaxSizeBytes(1 * 1024 * 1024); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + } + public static class AnythingSerializable implements Serializable { private int count;