Skip to content

Commit

Permalink
ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty …
Browse files Browse the repository at this point in the history
…thread
  • Loading branch information
clebertsuconic committed Mar 5, 2024
1 parent 579b925 commit 6e2966d
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public void withinContext(Runnable run) throws Exception {
}
}

public void execute(Runnable run) {
sessionExecutor.execute(run);
}

public void afterIO(IOCallback ioCallback) {
OperationContext context = recoverContext();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public void disableAutoRead() {
handler.requireHandler();
connectionCallback.getTransportConnection().setAutoRead(false);
handler.setReadable(false);
}

public void enableAutoRead() {
handler.requireHandler();
connectionCallback.getTransportConnection().setAutoRead(true);
getHandler().setReadable(true);
flush();
}

public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,34 +82,55 @@ public Message readBytes(Delivery delivery) throws Exception {
throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed");
}

final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
try {
serverReceiver.connection.requireInHandler();

final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();

if (currentMessage == null) {
final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null,
sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);

sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage);
}
if (currentMessage == null) {
final long id = sessionSPI.getStorageManager().generateID();
currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);

sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage);
}

serverReceiver.getConnection().disableAutoRead();

currentMessage.addBytes(dataBuffer);
byte[] bytes = new byte[dataBuffer.remaining()];
dataBuffer.get(bytes);

final AMQPLargeMessage result;
boolean partial = delivery.isPartial();

if (!delivery.isPartial()) {
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
result = currentMessage;
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
deliveryAnnotations = result.getDeliveryAnnotations();
} else {
result = null;
sessionSPI.execute(() -> addBytes(delivery, bytes, partial));

return null;
} catch (Exception e) {
// if an exception happened we must enable it back
serverReceiver.getConnection().enableAutoRead();
throw e;
}
}

return result;
private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) {
ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes);
try {
currentMessage.addBytes(dataBuffer);

if (!isPartial) {
final AMQPLargeMessage message = currentMessage;
message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
serverReceiver.connection.runNow(() -> serverReceiver.onMessageComplete(delivery, message, message.getDeliveryAnnotations()));
}
} catch (Throwable e) {
serverReceiver.onExceptionWhileReading(e);
} finally {
serverReceiver.connection.runNow(serverReceiver.getConnection()::enableAutoRead);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter {

private MessageReference reference;
private AMQPLargeMessage message;

private LargeBodyReader largeBodyReader;

private Delivery delivery;
private long position;
private boolean initialPacketHandled;
Expand All @@ -81,33 +84,55 @@ public boolean isWriting() {
public void close() {
if (!closed) {
try {
try {
if (largeBodyReader != null) {
largeBodyReader.close();
}
} catch (Exception e) {
// if we get an error only at this point, there's nothing else we could do other than log.warn
logger.warn("{}", e.getMessage(), e);
}
if (message != null) {
message.usageDown();
}
} finally {
reset(true);
resetClosed();
}
}
}

@Override
public AMQPLargeMessageWriter open() {
if (!closed) {
throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
public AMQPLargeMessageWriter open(MessageReference reference) {
this.reference = reference;
this.message = (AMQPLargeMessage) reference.getMessage();
this.message.usageUp();

try {
largeBodyReader = message.getLargeBodyReader();
largeBodyReader.open();
} catch (Exception e) {
serverSender.reportDeliveryError(this, reference, e);
}

reset(false);
resetOpen();

return this;
}

private void reset(boolean closedState) {
private void resetClosed() {
message = null;
reference = null;
delivery = null;
largeBodyReader = null;
position = 0;
initialPacketHandled = false;
closed = closedState;
closed = true;
}

private void resetOpen() {
position = 0;
initialPacketHandled = false;
closed = false;
}

@Override
Expand All @@ -121,17 +146,15 @@ public void writeBytes(MessageReference messageReference) {
throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
}

this.reference = messageReference;
this.message = (AMQPLargeMessage) messageReference.getMessage();

if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
// an interceptor rejected the delivery
// since we opened the message as part of the queue executor we must close it now
close();
return;
}

this.delivery = serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat());

message.usageUp();

tryDelivering();
}

Expand All @@ -150,15 +173,14 @@ private void tryDelivering() {
final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
final NettyReadable frameView = new NettyReadable(frameBuffer);

try (LargeBodyReader context = message.getLargeBodyReader()) {
context.open();
context.position(position);
long bodySize = context.getSize();
try {
largeBodyReader.position(position);
long bodySize = largeBodyReader.getSize();
// materialize it so we can use its internal NIO buffer
frameBuffer.ensureWritable(frameSize);

if (!initialPacketHandled && protonSender.getLocalState() != EndpointState.CLOSED) {
if (!deliverInitialPacket(context, frameBuffer)) {
if (!deliverInitialPacket(largeBodyReader, frameBuffer)) {
return;
}

Expand All @@ -171,7 +193,7 @@ private void tryDelivering() {
}
frameBuffer.clear();

final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
final int readSize = largeBodyReader.readInto(frameBuffer.internalNioBuffer(0, frameSize));

frameBuffer.writerIndex(readSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void close() {
}

@Override
public AMQPTunneledCoreLargeMessageWriter open() {
public AMQPTunneledCoreLargeMessageWriter open(MessageReference reference) {
if (state != State.CLOSED) {
throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public interface MessageReader {
* and is no longer partial the readBytes method will return the decoded message
* for dispatch.
*
* Notice that asynchronous Readers will never return the Message but will rather call a complete operation on the
* Server Receiver.
*
* @param delivery
* The delivery that has pending incoming bytes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ default void close() {
* be called on every handler by the sender context as it doesn't know which instances need
* opened.
*/
default MessageWriter open() {
default MessageWriter open(MessageReference reference) {
// Default for stateless handlers is to do nothing here.
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;

import java.lang.invoke.MethodHandles;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.RoutingContext;
Expand All @@ -26,12 +28,17 @@
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

protected final AMQPConnectionContext connection;

protected final AMQPSessionContext protonSession;
Expand Down Expand Up @@ -302,8 +309,6 @@ protected MessageReader trySelectMessageReader(Receiver receiver, Delivery deliv
public final void onMessage(Delivery delivery) throws ActiveMQAMQPException {
connection.requireInHandler();

final Receiver receiver = ((Receiver) delivery.getLink());

if (receiver.current() != delivery) {
return;
}
Expand All @@ -320,28 +325,45 @@ public final void onMessage(Delivery delivery) throws ActiveMQAMQPException {
return;
}

final Message message = messageReader.readBytes(delivery);
{
Message completeMessage;
if ((completeMessage = messageReader.readBytes(delivery)) != null) {
// notice the AMQP Large Message Reader will always return false
// and call the onMessageComplete directly
// since that happens asynchronously
onMessageComplete(delivery, completeMessage, messageReader.getDeliveryAnnotations());
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
}

if (message != null) {
// Fetch this before the close of the reader as that will clear any read message
// delivery annotations.
final DeliveryAnnotations deliveryAnnotations = messageReader.getDeliveryAnnotations();
public void onMessageComplete(Delivery delivery,
Message message, DeliveryAnnotations deliveryAnnotations) {
connection.requireInHandler();

this.messageReader.close();
this.messageReader = null;
try {
final Receiver receiver = ((Receiver) delivery.getLink());

receiver.advance();
receiver.advance();

Transaction tx = null;
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
Transaction tx = null;
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
try {
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
} catch (Exception e) {
this.onExceptionWhileReading(e);
}

actualDelivery(message, delivery, deliveryAnnotations, receiver, tx);
}
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);

actualDelivery(message, delivery, deliveryAnnotations, receiver, tx);
} finally {
// reader is complete, we give it up now
this.messageReader.close();
this.messageReader = null;
}
}

Expand All @@ -351,6 +373,17 @@ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
closeCurrentReader();
}

public void onExceptionWhileReading(Throwable e) {
logger.warn(e.getMessage(), e);
connection.runNow(() -> {
// setting it enabled just in case a large message reader disabled it
connection.enableAutoRead();
ErrorCondition ec = new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage());
connection.close(ec);
connection.flush();
});
}

@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public int deliverMessage(final MessageReference messageReference, final ServerC
credits--;
}

final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open();
final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(messageReference);

// Preserve for hasCredits to check for busy state and possible abort on close
this.messageWriter = messageWriter;
Expand Down
Loading

0 comments on commit 6e2966d

Please sign in to comment.