From 291a4719b6b114b1452a272fd13393262f736a05 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 17 Mar 2017 11:14:18 -0400 Subject: [PATCH 1/2] ARTEMIS-1045 Performance improvements on AMQP --- .../protocol/amqp/broker/AMQPMessage.java | 68 +++++++++++-------- .../proton/ProtonServerSenderContext.java | 13 ---- 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 60aae4cfb03..653ee5f77a9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage { private DeliveryAnnotations _deliveryAnnotations; private MessageAnnotations _messageAnnotations; private Properties _properties; + private int appLocation = -1; private ApplicationProperties applicationProperties; private long scheduledTime = -1; private String connectionID; @@ -93,7 +94,7 @@ public AMQPMessage(long messageFormat) { public AMQPMessage(long messageFormat, Message message) { this.messageFormat = messageFormat; - this.protonMessage = (MessageImpl)message; + this.protonMessage = (MessageImpl) message; } @@ -124,7 +125,7 @@ private void initalizeObjects() { _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>()); _properties = new Properties(); this.applicationProperties = new ApplicationProperties(new HashMap<>()); - this.protonMessage = (MessageImpl)Message.Factory.create(); + this.protonMessage = (MessageImpl) Message.Factory.create(); this.protonMessage.setApplicationProperties(applicationProperties); this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations); } @@ -148,6 +149,20 @@ private Map getApplicationPropertiesMap() { private ApplicationProperties getApplicationProperties() { parseHeaders(); + + if (applicationProperties == null && appLocation >= 0) { + ByteBuffer buffer = getBuffer().nioBuffer(); + buffer.position(appLocation); + TLSEncode.getDecoder().setByteBuffer(buffer); + Object section = TLSEncode.getDecoder().readObject(); + if (section instanceof ApplicationProperties) { + this.applicationProperties = (ApplicationProperties) section; + } + this.appLocation = -1; + TLSEncode.getDecoder().setByteBuffer(null); + + } + return applicationProperties; } @@ -161,6 +176,7 @@ private void parseHeaders() { parsedHeaders = true; } } + @Override public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) { this.connectionID = connectionID; @@ -172,7 +188,6 @@ public String getConnectionID() { return connectionID; } - public MessageAnnotations getMessageAnnotations() { parseHeaders(); return _messageAnnotations; @@ -202,7 +217,6 @@ private Object getSymbol(Symbol symbol) { return null; } - private void setSymbol(String symbol, Object value) { setSymbol(Symbol.getSymbol(symbol), value); } @@ -231,11 +245,9 @@ public RoutingType getRouteType() { return null; } */ - return null; } - @Override public SimpleString getGroupID() { parseHeaders(); @@ -247,7 +259,6 @@ public SimpleString getGroupID() { } } - @Override public Long getScheduledDeliveryTime() { @@ -339,15 +350,19 @@ private synchronized void partialDecode(ByteBuffer buffer) { this.expiration = _properties.getAbsoluteExpiryTime().getTime(); } - if (buffer.hasRemaining()) { - section = (Section) decoder.readObject(); - } else { - section = null; - } + // We don't read the next section on purpose, as we will parse ApplicationProperties + // lazily + section = null; } if (section instanceof ApplicationProperties) { applicationProperties = (ApplicationProperties) section; + } else { + if (buffer.hasRemaining()) { + this.appLocation = buffer.position(); + } else { + this.appLocation = -1; + } } } finally { decoder.setByteBuffer(null); @@ -446,13 +461,11 @@ public boolean isDurable() { } } - @Override public Object getDuplicateProperty() { return null; } - @Override public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) { return null; @@ -463,7 +476,7 @@ public String getAddress() { if (address == null) { Properties properties = getProtonMessage().getProperties(); if (properties != null) { - return properties.getTo(); + return properties.getTo(); } else { return null; } @@ -539,7 +552,7 @@ public void sendBuffer(ByteBuf buffer, int deliveryCount) { header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 1)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer)); TLSEncode.getEncoder().writeObject(header); - TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null); + TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null); } } buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom); @@ -676,27 +689,27 @@ public boolean containsProperty(String key) { @Override public Boolean getBooleanProperty(String key) throws ActiveMQPropertyConversionException { - return (Boolean)getApplicationPropertiesMap().get(key); + return (Boolean) getApplicationPropertiesMap().get(key); } @Override public Byte getByteProperty(String key) throws ActiveMQPropertyConversionException { - return (Byte)getApplicationPropertiesMap().get(key); + return (Byte) getApplicationPropertiesMap().get(key); } @Override public Double getDoubleProperty(String key) throws ActiveMQPropertyConversionException { - return (Double)getApplicationPropertiesMap().get(key); + return (Double) getApplicationPropertiesMap().get(key); } @Override public Integer getIntProperty(String key) throws ActiveMQPropertyConversionException { - return (Integer)getApplicationPropertiesMap().get(key); + return (Integer) getApplicationPropertiesMap().get(key); } @Override public Long getLongProperty(String key) throws ActiveMQPropertyConversionException { - return (Long)getApplicationPropertiesMap().get(key); + return (Long) getApplicationPropertiesMap().get(key); } @Override @@ -712,12 +725,12 @@ public Object getObjectProperty(String key) { @Override public Short getShortProperty(String key) throws ActiveMQPropertyConversionException { - return (Short)getApplicationPropertiesMap().get(key); + return (Short) getApplicationPropertiesMap().get(key); } @Override public Float getFloatProperty(String key) throws ActiveMQPropertyConversionException { - return (Float)getApplicationPropertiesMap().get(key); + return (Float) getApplicationPropertiesMap().get(key); } @Override @@ -727,7 +740,7 @@ public String getStringProperty(String key) throws ActiveMQPropertyConversionExc } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) { return getConnectionID(); } else { - return (String)getApplicationPropertiesMap().get(key); + return (String) getApplicationPropertiesMap().get(key); } } @@ -747,7 +760,7 @@ public Object getDeliveryAnnotationProperty(SimpleString key) { @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { - return SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key)); + return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); } @Override @@ -842,8 +855,7 @@ public Set getPropertyNames() { @Override public int getMemoryEstimate() { if (memoryEstimate == -1) { - memoryEstimate = memoryOffset + - (data != null ? data.capacity() : 0); + memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0); } return memoryEstimate; @@ -858,7 +870,6 @@ public ICoreMessage toCore() { } } - @Override public SimpleString getReplyTo() { if (getProperties() != null) { @@ -877,7 +888,6 @@ public AMQPMessage setReplyTo(SimpleString address) { return this; } - @Override public int getPersistSize() { checkBuffer(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 962110e2552..0e0447fbe2a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -42,7 +42,6 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; -import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; @@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr private boolean multicast; //todo get this from somewhere private RoutingType defaultRoutingType = RoutingType.ANYCAST; - protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); private RoutingType routingTypeToUse = defaultRoutingType; private boolean shared = false; private boolean global = false; @@ -110,7 +108,6 @@ public Object getBrokerConsumer() { @Override public void onFlow(int currentCredits, boolean drain) { - this.creditsSemaphore.setCredits(currentCredits); sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain); } @@ -590,16 +587,6 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount) return 0; } - if (!creditsSemaphore.tryAcquire()) { - try { - creditsSemaphore.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - // nothing to be done here.. we just keep going - throw new IllegalStateException(e.getMessage(), e); - } - } - // presettle means we can settle the message on the dealer side before we send it, i.e. // for browsers boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; From 1ef4dcf7d921379618363d035ea8a09794c3637d Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 17 Mar 2017 15:59:34 -0400 Subject: [PATCH 2/2] ARTEMIS-1046 Fixing TX eventually stalling with AMQP I have also reviewed the model in which we used transactions --- .../amqp/broker/AMQPConnectionCallback.java | 25 ++++-- .../amqp/broker/AMQPSessionCallback.java | 30 +++---- .../amqp/proton/AMQPSessionContext.java | 2 +- .../proton/ProtonServerReceiverContext.java | 4 +- .../proton/ProtonServerSenderContext.java | 2 +- .../transaction/ProtonTransactionHandler.java | 78 +++++++++------- .../protocol/amqp/util/DeliveryUtil.java | 18 +--- .../integration/amqp/AmqpTransactionTest.java | 90 ++++++++++++++++++- 8 files changed, 165 insertions(+), 84 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 7e7dc60c268..4265f28c5b4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class); - private ConcurrentMap transactions = new ConcurrentHashMap<>(); + private ConcurrentMap transactions = new ConcurrentHashMap<>(); private final ProtonProtocolManager manager; @@ -224,25 +224,32 @@ public void connectionFailed(ActiveMQException exception, boolean failedOver, St public Binary newTransaction() { XidImpl xid = newXID(); + Binary binary = new Binary(xid.getGlobalTransactionId()); Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1); - transactions.put(xid, transaction); - return new Binary(xid.getGlobalTransactionId()); + transactions.put(binary, transaction); + return binary; } - public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { - XidImpl xid = newXID(txid.getArray()); - Transaction tx = transactions.get(xid); + public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException { + Transaction tx; + + if (remove) { + tx = transactions.remove(txid); + } else { + tx = transactions.get(txid); + } if (tx == null) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString()); + logger.warn("Couldn't find txid = " + txid); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString()); } return tx; } - public void removeTransaction(Binary txid) { + public Transaction removeTransaction(Binary txid) { XidImpl xid = newXID(txid.getArray()); - transactions.remove(xid); + return transactions.remove(xid); } protected XidImpl newXID() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 7f7e22b730b..3592dbc77c2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext; -import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl; import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; @@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback { private final AtomicBoolean draining = new AtomicBoolean(false); + public Object getProtonLock() { + return connection.getLock(); + } + public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -382,8 +385,10 @@ private void rejectMessage(Delivery delivery, Symbol errorCondition, String erro condition.setDescription(errorMessage); Rejected rejected = new Rejected(); rejected.setError(condition); - delivery.disposition(rejected); - delivery.settle(); + synchronized (connection.getLock()) { + delivery.disposition(rejected); + delivery.settle(); + } connection.flush(); } @@ -536,29 +541,14 @@ public boolean hasCredits(ServerConsumer consumer) { } } - public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException { - return protonSPI.getTransaction(txid); + public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException { + return protonSPI.getTransaction(txid, remove); } public Binary newTransaction() { return protonSPI.newTransaction(); } - public void commitTX(Binary txid) throws Exception { - Transaction tx = protonSPI.getTransaction(txid); - tx.commit(true); - protonSPI.removeTransaction(txid); - } - - public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception { - Transaction tx = protonSPI.getTransaction(txid); - tx.rollback(); - protonSPI.removeTransaction(txid); - } - - public void dischargeTx(Binary txid) throws ActiveMQAMQPException { - ((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge(); - } public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception { return serverSession.getMatchingQueue(address, routingType); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index d1fc0e15245..ccc4a6ce275 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -142,7 +142,7 @@ public void removeReceiver(Receiver receiver) { } public void addTransactionHandler(Coordinator coordinator, Receiver receiver) { - ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI); + ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection); coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index f08c1fc4e2a..54467cfd6ce 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -155,7 +155,7 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { if (delivery.getRemoteState() instanceof TransactionalState) { TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - tx = this.sessionSPI.getTransaction(txState.getTxnId()); + tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); } sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data); @@ -201,8 +201,8 @@ public void flow(int credits, int threshold) { } else { synchronized (connection.getLock()) { receiver.flow(credits); - connection.flush(); } + connection.flush(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 0e0447fbe2a..5a97c02da34 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -493,7 +493,7 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { if (remoteState instanceof TransactionalState) { TransactionalState txState = (TransactionalState) remoteState; - ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId()); + ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); if (txState.getOutcome() != null) { settleImmediate = false; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index 721bd33dfd7..12498b0890d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -18,10 +18,9 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; -import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -36,9 +35,6 @@ import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - /** * handles an amqp Coordinator to deal with transaction boundaries etc */ @@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class); public static final int DEFAULT_COORDINATOR_CREDIT = 100; + public static final int CREDIT_LOW_WATERMARK = 30; final AMQPSessionCallback sessionSPI; + final AMQPConnectionContext connection; - public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) { + public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { this.sessionSPI = sessionSPI; + this.connection = connection; } @Override public void onMessage(Delivery delivery) throws ActiveMQAMQPException { - ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); - final Receiver receiver; try { receiver = ((Receiver) delivery.getLink()); @@ -66,9 +63,21 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { return; } - receiver.recv(new NettyWritable(buffer)); + byte[] buffer; + + synchronized (connection.getLock()) { + // Replenish coordinator receiver credit on exhaustion so sender can continue + // transaction declare and discahrge operations. + if (receiver.getCredit() < CREDIT_LOW_WATERMARK) { + receiver.flow(DEFAULT_COORDINATOR_CREDIT); + } + + buffer = new byte[delivery.available()]; + receiver.recv(buffer, 0, buffer.length); + receiver.advance(); + } + - receiver.advance(); MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer); @@ -78,44 +87,47 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { Binary txID = sessionSPI.newTransaction(); Declared declared = new Declared(); declared.setTxnId(txID); - delivery.disposition(declared); + synchronized (connection.getLock()) { + delivery.disposition(declared); + } } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; Binary txID = discharge.getTxnId(); - sessionSPI.dischargeTx(txID); + ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true); + tx.discharge(); + if (discharge.getFail()) { - try { - sessionSPI.rollbackTX(txID, true); + tx.rollback(); + synchronized (connection.getLock()) { delivery.disposition(new Accepted()); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage()); } + connection.flush(); } else { - try { - sessionSPI.commitTX(txID); + tx.commit(); + synchronized (connection.getLock()) { delivery.disposition(new Accepted()); - } catch (ActiveMQAMQPException amqpE) { - throw amqpE; - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); } - } - - // Replenish coordinator receiver credit on exhaustion so sender can continue - // transaction declare and discahrge operations. - if (receiver.getCredit() == 0) { - receiver.flow(DEFAULT_COORDINATOR_CREDIT); + connection.flush(); } } } catch (ActiveMQAMQPException amqpE) { - delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); - } catch (Exception e) { + log.warn(amqpE.getMessage(), amqpE); + synchronized (connection.getLock()) { + delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); + } + connection.flush(); + } catch (Throwable e) { log.warn(e.getMessage(), e); - delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); + synchronized (connection.getLock()) { + delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); + } + connection.flush(); } finally { - delivery.settle(); - buffer.release(); + synchronized (connection.getLock()) { + delivery.settle(); + } + connection.flush(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java index 9257c6bfaa9..4267b85b283 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java @@ -16,28 +16,14 @@ */ package org.apache.activemq.artemis.protocol.amqp.util; -import io.netty.buffer.ByteBuf; -import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; public class DeliveryUtil { - public static int readDelivery(Receiver receiver, ByteBuf buffer) { - int initial = buffer.writerIndex(); - // optimization by norman - int count; - while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) { - // Increment the writer index by the number of bytes written into it while calling recv. - buffer.writerIndex(buffer.writerIndex() + count); - buffer.ensureWritable(count); - } - return buffer.writerIndex() - initial; - } - - public static MessageImpl decodeMessageImpl(ByteBuf buffer) { + public static MessageImpl decodeMessageImpl(byte[] data) { MessageImpl message = (MessageImpl) Message.Factory.create(); - message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes()); + message.decode(data, 0, data.length); return message; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java index c00cc1c244f..41bc5e782c5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,9 +17,20 @@ package org.apache.activemq.artemis.tests.integration.amqp; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; @@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; import org.junit.Test; /** @@ -788,4 +801,77 @@ public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() th connection.close(); } + + @Test(timeout = 120000) + public void testSendPersistentTX() throws Exception { + int MESSAGE_COUNT = 100000; + AtomicInteger errors = new AtomicInteger(0); + server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true); + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616"); + Connection sendConnection = factory.createConnection(); + Connection consumerConnection = factory.createConnection(); + try { + + Thread receiverThread = new Thread() { + @Override + public void run() { + try { + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); + javax.jms.Queue q1 = consumerSession.createQueue("q1"); + + MessageConsumer consumer = consumerSession.createConsumer(q1); + + for (int i = 1; i <= MESSAGE_COUNT; i++) { + Message message = consumer.receive(5000); + if (message == null) { + throw new IOException("No message read in time."); + } + + if (i % 100 == 0) { + if (i % 1000 == 0) System.out.println("Read message " + i); + consumerSession.commit(); + } + } + + // Assure that all messages are consumed + consumerSession.commit(); + } catch (Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + + } + }; + + receiverThread.start(); + + Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED); + + javax.jms.Queue q1 = sendingSession.createQueue("q1"); + MessageProducer producer = sendingSession.createProducer(q1); + producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT); + for (int i = 0; i < MESSAGE_COUNT; i++) { + producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + if (i % 100 == 0) { + if (i % 1000 == 0) System.out.println("Sending " + i); + sendingSession.commit(); + } + } + + sendingSession.commit(); + + receiverThread.join(50000); + Assert.assertFalse(receiverThread.isAlive()); + + Assert.assertEquals(0, errors.get()); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + sendConnection.close(); + consumerConnection.close(); + } + + } }