From 23ebf67bf66ce6bacd5744583178a04d44d70e6e Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 14 Mar 2017 16:07:58 -0400 Subject: [PATCH] ARTEMIS-1039 Transaction Coordinator credit refill The coordinator needs to refill credit on the receiver once it has been exhausted, otherwise the remote cannot send additional declare or discharge commands to the broker. --- .../protocol/amqp/proton/AMQPSessionContext.java | 2 +- .../transaction/ProtonTransactionHandler.java | 13 +++++++++++-- .../integration/amqp/AmqpTransactionTest.java | 16 ++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index 89b6ed33067..d1fc0e15245 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -148,7 +148,7 @@ public void addTransactionHandler(Coordinator coordinator, Receiver receiver) { receiver.setContext(transactionHandler); receiver.open(); - receiver.flow(100); + receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT); } public void addSender(Sender sender) throws Exception { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 2cdb0729ab2..721bd33dfd7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -16,8 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton.transaction; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; @@ -38,6 +36,9 @@ import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + /** * handles an amqp Coordinator to deal with transaction boundaries etc */ @@ -45,6 +46,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class); + public static final int DEFAULT_COORDINATOR_CREDIT = 100; + final AMQPSessionCallback sessionSPI; public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) { @@ -98,6 +101,12 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); } } + + // Replenish coordinator receiver credit on exhaustion so sender can continue + // transaction declare and discahrge operations. + if (receiver.getCredit() == 0) { + receiver.flow(DEFAULT_COORDINATOR_CREDIT); + } } } catch (ActiveMQAMQPException amqpE) { delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index 1b2a1b0df54..c00cc1c244f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -48,6 +48,22 @@ public void testBeginAndCommitTransaction() throws Exception { connection.close(); } + @Test(timeout = 30000) + public void testCoordinatorReplenishesCredit() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + + for (int i = 0; i < 1000; ++i) { + session.begin(); + assertTrue(session.isInTransaction()); + session.commit(); + } + + connection.close(); + } + @Test(timeout = 30000) public void testBeginAndRollbackTransaction() throws Exception { AmqpClient client = createAmqpClient();