Skip to content

Commit

Permalink
ARTEMIS-2067 Clean up some code in the AMQP protocol handling paths
Browse files Browse the repository at this point in the history
Cleans up some of the code on the proton event handler, most noteable:

1. Fix IOCallback creation on each outbound send, use single instance
as the handler only ever does a flush and has no attached state.
2. Fix redundent locking and unlocking of connection lock on the event
path that already ensures that lock is held.
3. Set presettle state on the server sender at attach as it cannot
change afterwards so checking on every message is not needed.
4. Improve buffer type checking on receive to reduce amount of work

(cherry picked from commit d193962)
  • Loading branch information
tabish121 authored and clebertsuconic committed Sep 12, 2018
1 parent 1ccb4c8 commit 3949744
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 119 deletions.
Expand Up @@ -205,23 +205,20 @@ private RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
}

/*
* called when Proton receives a message to be delivered via a Delivery.
*
* This may be called more than once per deliver so we have to cache the buffer until we have received it all.
*
* */
* called when Proton receives a message to be delivered via a Delivery.
*
* This may be called more than once per deliver so we have to cache the buffer until we have received it all.
*/
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Receiver receiver;
try {
Receiver receiver = ((Receiver) delivery.getLink());

if (!delivery.isReadable()) {
if (receiver.current() != delivery) {
return;
}

if (delivery.isAborted()) {
receiver = ((Receiver) delivery.getLink());

// Aborting implicitly remotely settles, so advance
// receiver to the next delivery and settle locally.
receiver.advance();
Expand All @@ -233,16 +230,11 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
}

return;
}

if (delivery.isPartial()) {
} else if (delivery.isPartial()) {
return;
}

receiver = ((Receiver) delivery.getLink());

Transaction tx = null;

ReadableBuffer data = receiver.recv();
receiver.advance();

Expand All @@ -267,13 +259,9 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {

condition.setDescription(e.getMessage());
rejected.setError(condition);
connection.lock();
try {
delivery.disposition(rejected);
delivery.settle();
} finally {
connection.unlock();
}

delivery.disposition(rejected);
delivery.settle();
}
}

Expand Down
Expand Up @@ -87,6 +87,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private static final Symbol SHARED = Symbol.valueOf("shared");
private static final Symbol GLOBAL = Symbol.valueOf("global");

private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();

private Consumer brokerConsumer;

protected final AMQPSessionContext protonSession;
Expand All @@ -101,6 +103,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
private boolean shared = false;
private boolean global = false;
private boolean isVolatile = false;
private boolean preSettle;
private SimpleString tempQueueName;

public ProtonServerSenderContext(AMQPConnectionContext connection,
Expand Down Expand Up @@ -417,6 +420,9 @@ public void initialise() throws Exception {
}
}

// Detect if sender is in pre-settle mode.
preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;

// We need to update the source with any filters we support otherwise the client
// is free to consider the attach as having failed if we don't send back what we
// do support or if we send something we don't support the client won't know we
Expand Down Expand Up @@ -538,17 +544,7 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {

try {
Message message = ((MessageReference) delivery.getContext()).getMessage();

boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;

DeliveryState remoteState;

connection.lock();
try {
remoteState = delivery.getRemoteState();
} finally {
connection.unlock();
}
DeliveryState remoteState = delivery.getRemoteState();

boolean settleImmediate = true;
if (remoteState instanceof Accepted) {
Expand All @@ -558,8 +554,7 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
return;
}
// we have to individual ack as we can't guarantee we will get the delivery updates
// (including acks) in order
// from dealer, a perf hit but a must
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(null, brokerConsumer, message);
} catch (Exception e) {
Expand All @@ -580,16 +575,10 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
txAccepted.setTxnId(txState.getTxnId());
connection.lock();
try {
delivery.disposition(txAccepted);
} finally {
connection.unlock();
}
delivery.disposition(txAccepted);
}
// we have to individual ack as we can't guarantee we will get the delivery
// updates (including acks) in order
// from dealer, a perf hit but a must
// (including acks) in order from dealer, a performance hit but a must
try {
sessionSPI.ack(tx, brokerConsumer, message);
tx.addDelivery(delivery, this);
Expand Down Expand Up @@ -636,23 +625,24 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
}

if (settleImmediate) {
settle(delivery);
delivery.settle();
}

} finally {
sessionSPI.afterIO(new IOCallback() {
@Override
public void done() {
connection.flush();
}
sessionSPI.afterIO(connectionFlusher);
sessionSPI.resetContext(oldContext);
}
}

@Override
public void onError(int errorCode, String errorMessage) {
connection.flush();
}
});
private final class ConnectionFlushIOCallback implements IOCallback {
@Override
public void done() {
connection.flush();
}

sessionSPI.resetContext(oldContext);
@Override
public void onError(int errorCode, String errorMessage) {
connection.flush();
}
}

Expand Down Expand Up @@ -681,16 +671,12 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount,
AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());

// presettle means we can settle the message on the dealer side before we send it, i.e.
// for browsers
boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;

// we only need a tag if we are going to settle later
byte[] tag = preSettle ? new byte[0] : protonSession.getTag();

// Let the Message decide how to present the message bytes
boolean attemptRelease = true;
ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
boolean releaseRequired = sendBuffer instanceof NettyReadable;

try {
int size = sendBuffer.remaining();
Expand All @@ -713,14 +699,13 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount,
delivery.setMessageFormat((int) message.getMessageFormat());
delivery.setContext(messageReference);

if (sendBuffer instanceof NettyReadable) {
if (releaseRequired) {
sender.send(sendBuffer);
// Above send copied, so release now if needed
attemptRelease = false;
releaseRequired = false;
((NettyReadable) sendBuffer).getByteBuf().release();
} else {
// Don't have pooled content, no need to release or copy.
attemptRelease = false;
sender.sendNoCopy(sendBuffer);
}

Expand All @@ -731,14 +716,15 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount,
} else {
sender.advance();
}

connection.flush();
} finally {
connection.unlock();
}

return size;
} finally {
if (attemptRelease && sendBuffer instanceof NettyReadable) {
if (releaseRequired) {
((NettyReadable) sendBuffer).getByteBuf().release();
}
}
Expand Down
Expand Up @@ -243,24 +243,9 @@ public void inputBuffer(ByteBuf buffer) {
int capacity = transport.capacity();

if (!receivedFirstPacket) {
try {
byte auth = buffer.getByte(4);
if (auth == SASL || auth == BARE) {
if (isServer) {
dispatchAuth(auth == SASL);
} else if (auth == BARE && clientSASLMechanism == null) {
dispatchAuthSuccess();
}
/*
* there is a chance that if SASL Handshake has been carried out that the capacity may change.
* */
capacity = transport.capacity();
}
} catch (Throwable e) {
log.warn(e.getMessage(), e);
}

receivedFirstPacket = true;
handleFirstPacket(buffer);
// there is a chance that if SASL Handshake has been carried out that the capacity may change.
capacity = transport.capacity();
}

if (capacity > 0) {
Expand Down Expand Up @@ -537,4 +522,21 @@ public void createClientSASL() {
sasl.client();
sasl.setListener(this);
}

private void handleFirstPacket(ByteBuf buffer) {
try {
byte auth = buffer.getByte(4);
if (auth == SASL || auth == BARE) {
if (isServer) {
dispatchAuth(auth == SASL);
} else if (auth == BARE && clientSASLMechanism == null) {
dispatchAuthSuccess();
}
}
} catch (Throwable e) {
log.warn(e.getMessage(), e);
}

receivedFirstPacket = true;
}
}
Expand Up @@ -73,32 +73,27 @@ public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
ByteBuffer buffer;
MessageImpl msg;

connection.lock();
try {
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() < amqpLowMark) {
receiver.flow(amqpCredit);
}
// Replenish coordinator receiver credit on exhaustion so sender can continue
// transaction declare and discahrge operations.
if (receiver.getCredit() < amqpLowMark) {
receiver.flow(amqpCredit);
}

// Declare is generally 7 bytes and discharge is around 48 depending on the
// encoded size of the TXN ID. Decode buffer has a bit of extra space but if
// the incoming request is to big just use a scratch buffer.
if (delivery.available() > DECODE_BUFFER.capacity()) {
buffer = ByteBuffer.allocate(delivery.available());
} else {
buffer = (ByteBuffer) DECODE_BUFFER.clear();
}
// Declare is generally 7 bytes and discharge is around 48 depending on the
// encoded size of the TXN ID. Decode buffer has a bit of extra space but if
// the incoming request is to big just use a scratch buffer.
if (delivery.available() > DECODE_BUFFER.capacity()) {
buffer = ByteBuffer.allocate(delivery.available());
} else {
buffer = (ByteBuffer) DECODE_BUFFER.clear();
}

// Update Buffer for the next incoming command.
buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));
// Update Buffer for the next incoming command.
buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));

receiver.advance();
receiver.advance();

msg = decodeMessage(buffer);
} finally {
connection.unlock();
}
msg = decodeMessage(buffer);

Object action = ((AmqpValue) msg.getBody()).getValue();
if (action instanceof Declare) {
Expand Down Expand Up @@ -160,23 +155,13 @@ public void onError(int errorCode, String errorMessage) {
}
} catch (ActiveMQAMQPException amqpE) {
log.warn(amqpE.getMessage(), amqpE);
connection.lock();
try {
delivery.settle();
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
} finally {
connection.unlock();
}
delivery.settle();
delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
connection.flush();
} catch (Throwable e) {
log.warn(e.getMessage(), e);
connection.lock();
try {
delivery.settle();
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
} finally {
connection.unlock();
}
delivery.settle();
delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
connection.flush();
}
}
Expand Down
Expand Up @@ -49,17 +49,19 @@ private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws Active
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);

Delivery mockDelivery = mock(Delivery.class);
when(mockDelivery.isReadable()).thenReturn(true);
when(mockDelivery.isAborted()).thenReturn(true);
when(mockDelivery.isPartial()).thenReturn(true);
when(mockDelivery.getLink()).thenReturn(mockReceiver);

when(mockReceiver.current()).thenReturn(mockDelivery);

if (drain) {
when(mockReceiver.getDrain()).thenReturn(true);
}

rc.onMessage(mockDelivery);

verify(mockReceiver, times(1)).current();
verify(mockReceiver, times(1)).advance();
verify(mockDelivery, times(1)).settle();

Expand Down

0 comments on commit 3949744

Please sign in to comment.