From 5d1b7e0bea8d30d4c2be084700e667d8e8ff4888 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 29 Aug 2017 14:26:38 -0400 Subject: [PATCH] ARTEMIS-1377 Refactor the disposition handling code Avoid null checking each disposition before then checking the type, also account for not knowing the type. Rearrange the handling code to prioritize the most common case which is "Accepted" --- .../proton/ProtonServerSenderContext.java | 154 +++++++++--------- 1 file changed, 77 insertions(+), 77 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index c774b4d1f9f..50d2ef47732 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -541,94 +541,94 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException { } boolean settleImmediate = true; - if (remoteState != null) { - // If we are transactional then we need ack if the msg has been accepted - if (remoteState instanceof TransactionalState) { - - TransactionalState txState = (TransactionalState) remoteState; - ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); - - if (txState.getOutcome() != null) { - settleImmediate = false; - Outcome outcome = txState.getOutcome(); - if (outcome instanceof Accepted) { - if (!delivery.remotelySettled()) { - TransactionalState txAccepted = new TransactionalState(); - txAccepted.setOutcome(Accepted.getInstance()); - txAccepted.setTxnId(txState.getTxnId()); - connection.lock(); - try { - delivery.disposition(txAccepted); - } finally { - connection.unlock(); - } - } - // we have to individual ack as we can't guarantee we will get the delivery - // updates (including acks) in order - // from dealer, a perf hit but a must + if (remoteState instanceof Accepted) { + // this can happen in the twice ack mode, that is the receiver accepts and settles separately + // acking again would show an exception but would have no negative effect but best to handle anyway. + if (delivery.isSettled()) { + return; + } + // we have to individual ack as we can't guarantee we will get the delivery updates + // (including acks) in order + // from dealer, a perf hit but a must + try { + sessionSPI.ack(null, brokerConsumer, message); + } catch (Exception e) { + log.warn(e.toString(), e); + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + } else if (remoteState instanceof TransactionalState) { + // When the message arrives with a TransactionState disposition the ack should + // enlist the message into the transaction associated with the given txn ID. + TransactionalState txState = (TransactionalState) remoteState; + ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false); + + if (txState.getOutcome() != null) { + settleImmediate = false; + Outcome outcome = txState.getOutcome(); + if (outcome instanceof Accepted) { + if (!delivery.remotelySettled()) { + TransactionalState txAccepted = new TransactionalState(); + txAccepted.setOutcome(Accepted.getInstance()); + txAccepted.setTxnId(txState.getTxnId()); + connection.lock(); try { - sessionSPI.ack(tx, brokerConsumer, message); - tx.addDelivery(delivery, this); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + delivery.disposition(txAccepted); + } finally { + connection.unlock(); } } - } - } else if (remoteState instanceof Accepted) { - //this can happen in the twice ack mode, that is the receiver accepts and settles separately - //acking again would show an exception but would have no negative effect but best to handle anyway. - if (delivery.isSettled()) { - return; - } - // we have to individual ack as we can't guarantee we will get the delivery updates - // (including acks) in order - // from dealer, a perf hit but a must - try { - sessionSPI.ack(null, brokerConsumer, message); - } catch (Exception e) { - log.warn(e.toString(), e); - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); - } - } else if (remoteState instanceof Released) { - try { - sessionSPI.cancel(brokerConsumer, message, false); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); - } - } else if (remoteState instanceof Rejected) { - try { - sessionSPI.reject(brokerConsumer, message); - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); - } - } else if (remoteState instanceof Modified) { - try { - Modified modification = (Modified) remoteState; - - if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { - message.rejectConsumer(brokerConsumer.sequentialID()); - } - - if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { - sessionSPI.cancel(brokerConsumer, message, true); - } else { - sessionSPI.cancel(brokerConsumer, message, false); + // we have to individual ack as we can't guarantee we will get the delivery + // updates (including acks) in order + // from dealer, a perf hit but a must + try { + sessionSPI.ack(tx, brokerConsumer, message); + tx.addDelivery(delivery, this); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); } - } catch (Exception e) { - throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } } - - if (!preSettle) { - protonSession.replaceTag(delivery.getTag()); + } else if (remoteState instanceof Released) { + try { + sessionSPI.cancel(brokerConsumer, message, false); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } + } else if (remoteState instanceof Rejected) { + try { + sessionSPI.reject(brokerConsumer, message); + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } + } else if (remoteState instanceof Modified) { + try { + Modified modification = (Modified) remoteState; - if (settleImmediate) - settle(delivery); + if (Boolean.TRUE.equals(modification.getUndeliverableHere())) { + message.rejectConsumer(brokerConsumer.sequentialID()); + } + if (Boolean.TRUE.equals(modification.getDeliveryFailed())) { + sessionSPI.cancel(brokerConsumer, message, true); + } else { + sessionSPI.cancel(brokerConsumer, message, false); + } + } catch (Exception e) { + throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } } else { - // todo not sure if we need to do anything here + log.debug("Received null or unknown disposition for delivery update: " + remoteState); + return; } + + if (!preSettle) { + protonSession.replaceTag(delivery.getTag()); + } + + if (settleImmediate) { + settle(delivery); + } + } finally { sessionSPI.afterIO(new IOCallback() { @Override