Skip to content

Commit

Permalink
ARTEMIS-4530 clean up SessionCallback interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Dec 21, 2023
1 parent 29a2b2a commit 159416b
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.Executor;

Expand All @@ -30,9 +31,9 @@
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.security.CheckType;
Expand Down Expand Up @@ -76,7 +77,6 @@
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

public class AMQPSessionCallback implements SessionCallback {

Expand Down Expand Up @@ -649,7 +649,7 @@ public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}

@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
public int sendMessage(MessageReference ref, ServerConsumer consumer, int deliveryCount) {

ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();

Expand All @@ -667,7 +667,6 @@ public int sendMessage(MessageReference ref, Message message, ServerConsumer con

@Override
public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
Expand Down Expand Up @@ -44,14 +43,13 @@ public boolean isWritable(ReadyListener callback, Object protocolContext) {
}

@Override
public int sendMessage(MessageReference reference,
Message message,
public int sendMessage(MessageReference ref,
ServerConsumer consumer,
int deliveryCount) {
try {
session.getMqttPublishManager().sendMessage(message.toCore(), consumer, deliveryCount);
session.getMqttPublishManager().sendMessage(ref.getMessage().toCore(), consumer, deliveryCount);
} catch (Exception e) {
MQTTLogger.LOGGER.unableToSendMessage(reference, e);
MQTTLogger.LOGGER.unableToSendMessage(ref, e);
}
return 1;
}
Expand All @@ -70,12 +68,11 @@ public int sendLargeMessageContinuation(ServerConsumer consumerID,
}

@Override
public int sendLargeMessage(MessageReference reference,
Message message,
public int sendLargeMessage(MessageReference ref,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
return sendMessage(reference, message, consumer, deliveryCount);
return sendMessage(ref, consumer, deliveryCount);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,17 @@ public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}

@Override
public int sendMessage(MessageReference reference,
org.apache.activemq.artemis.api.core.Message message,
public int sendMessage(MessageReference ref,
ServerConsumer consumer,
int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
//clear up possible rolledback ids.
theConsumer.removeRolledback(reference);
return theConsumer.handleDeliver(reference, message.toCore());
theConsumer.removeRolledback(ref);
return theConsumer.handleDeliver(ref, ref.getMessage().toCore());
}

@Override
public int sendLargeMessage(MessageReference reference,
org.apache.activemq.artemis.api.core.Message message,
public int sendLargeMessage(MessageReference ref,
ServerConsumer consumerID,
long bodySize,
int deliveryCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.stomp;

import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -53,7 +54,6 @@
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE;
import static org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory.STOMP_PROTOCOL_NAME;
Expand Down Expand Up @@ -153,7 +153,7 @@ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageRe
}

@Override
public int sendMessage(MessageReference ref, Message serverMessage, final ServerConsumer consumer, int deliveryCount) {
public int sendMessage(MessageReference ref, final ServerConsumer consumer, int deliveryCount) {
ICoreMessage message = ref.getMessage().toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
Expand Down Expand Up @@ -206,7 +206,6 @@ public int sendLargeMessageContinuation(ServerConsumer consumer,

@Override
public int sendLargeMessage(MessageReference ref,
Message msg,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
Expand Down Expand Up @@ -229,13 +228,12 @@ public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}

@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
public int sendMessage(MessageReference ref, ServerConsumer consumerID, int deliveryCount) {
return 0;
}

@Override
public int sendLargeMessage(MessageReference reference,
Message message,
public int sendLargeMessage(MessageReference ref,
ServerConsumer consumerID,
long bodySize,
int deliveryCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.core.Channel;
Expand Down Expand Up @@ -88,11 +87,10 @@ public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageRe

@Override
public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
Packet packet = new SessionReceiveLargeMessage(consumer.getID(), message, bodySize, deliveryCount);
Packet packet = new SessionReceiveLargeMessage(consumer.getID(), ref.getMessage(), bodySize, deliveryCount);

channel.send(packet);

Expand All @@ -114,13 +112,13 @@ public int sendLargeMessageContinuation(ServerConsumer consumer,
}

@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
public int sendMessage(MessageReference ref, ServerConsumer consumer, int deliveryCount) {

Packet packet;
if (channel.getConnection().isVersionBeforeAddressChange()) {
packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
packet = new SessionReceiveMessage_1X(consumer.getID(), ref.getMessage().toCore(coreMessageObjectPools), deliveryCount);
} else {
packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount);
packet = new SessionReceiveMessage(consumer.getID(), ref.getMessage().toCore(coreMessageObjectPools), deliveryCount);
}

int size = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
// The deliverer will increase the usageUp, so the preAck has to be done after this is created
// otherwise we may have a removed message early on
if (message instanceof CoreLargeServerMessage && this.supportLargeMessage) {
largeMessageDeliverer = new CoreLargeMessageDeliverer((LargeServerMessage) message, ref);
largeMessageDeliverer = new CoreLargeMessageDeliverer(ref);
}

if (preAcknowledge) {
Expand All @@ -507,27 +507,25 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
@Override
public void proceedDeliver(MessageReference reference) throws Exception {
try {
Message message = reference.getMessage();

if (AuditLogger.isMessageLoggingEnabled()) {
AuditLogger.coreConsumeMessage(session.getRemotingConnection().getSubject(), session.getRemotingConnection().getRemoteAddress(), getQueueName().toString(), reference.toString());
}
if (server.hasBrokerMessagePlugins()) {
server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
}

if (message instanceof CoreLargeServerMessage && supportLargeMessage) {
if (reference.getMessage() instanceof CoreLargeServerMessage && supportLargeMessage) {
if (largeMessageDeliverer == null) {
// This can't really happen as handle had already crated the deliverer
// This can't really happen as handle had already created the deliverer
// instead of throwing an exception in weird cases there is no problem on just go ahead and create it
// again here
largeMessageDeliverer = new CoreLargeMessageDeliverer((LargeServerMessage) message, reference);
largeMessageDeliverer = new CoreLargeMessageDeliverer(reference);
}
// The deliverer was prepared during handle, as we can't have more than one pending large message
// as it would return busy if there is anything pending
largeMessageDeliverer.deliver();
} else {
deliverStandardMessage(reference, message);
deliverStandardMessage(reference);
}
} finally {
pendingDelivery.countDown();
Expand Down Expand Up @@ -681,15 +679,15 @@ public void removeItself() throws Exception {
@Override
public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50)
.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence)
.setAddress(messageQueue.getName());

MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
reference.setDeliveryCount(0);

forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());

applyPrefixForLegacyConsumer(forcedDeliveryMessage);
callback.sendMessage(reference, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
callback.sendMessage(reference, ServerConsumerImpl.this, 0);
});
}

Expand Down Expand Up @@ -1220,13 +1218,9 @@ private void resumeLargeMessage() {
messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
}

/**
* @param ref
* @param message
*/
private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
applyPrefixForLegacyConsumer(message);
int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
private void deliverStandardMessage(final MessageReference ref) {
applyPrefixForLegacyConsumer(ref.getMessage());
int packetSize = callback.sendMessage(ref, ServerConsumerImpl.this, ref.getDeliveryCount());

if (availableCredits != null) {
availableCredits.addAndGet(-packetSize);
Expand Down Expand Up @@ -1301,12 +1295,12 @@ private final class CoreLargeMessageDeliverer {

private ByteBuffer chunkBytes;

private CoreLargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref) throws Exception {
largeMessage = message;
private CoreLargeMessageDeliverer(final MessageReference ref) {
this.ref = ref;

largeMessage.toMessage().usageUp();
largeMessage = (LargeServerMessage) ref.getMessage();

this.ref = ref;
largeMessage.toMessage().usageUp();

this.chunkBytes = null;
}
Expand Down Expand Up @@ -1357,7 +1351,7 @@ public boolean deliver() throws Exception {

sentInitialPacket = true;

int packetSize = callback.sendLargeMessage(ref, currentLargeMessage.toMessage(), ServerConsumerImpl.this, context.getSize(), ref.getDeliveryCount());
int packetSize = callback.sendLargeMessage(ref, ServerConsumerImpl.this, context.getSize(), ref.getDeliveryCount());

if (availableCredits != null) {
final int credits = availableCredits.addAndGet(-packetSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.spi.core.protocol;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerConsumer;
Expand Down Expand Up @@ -69,16 +68,9 @@ default boolean hasCredits(ServerConsumer consumerID, MessageReference ref) {

void sendProducerCreditsFailMessage(int credits, SimpleString address);

// Note: don't be tempted to remove the parameter message
// Even though ref will contain the message in certain cases
// such as paging the message could be a SoftReference or WeakReference
// and I wanted to avoid re-fetching paged data in case of GCs on this specific case.
//
// Future developments may change this, but beware why I have chosen to keep the parameter separated here
int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
int sendMessage(MessageReference ref, ServerConsumer consumerID, int deliveryCount);

int sendLargeMessage(MessageReference reference,
Message message,
int sendLargeMessage(MessageReference ref,
ServerConsumer consumerID,
long bodySize,
int deliveryCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -518,7 +517,7 @@ public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
* @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
public int sendMessage(MessageReference ref, ServerConsumer consumer, int deliveryCount) {
inCall.countDown();
try {
callbackSemaphore.acquire();
Expand All @@ -528,7 +527,7 @@ public int sendMessage(MessageReference ref, Message message, ServerConsumer con
}

try {
return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
return targetCallback.sendMessage(ref, consumer, deliveryCount);
} finally {
callbackSemaphore.release();
inCall.countUp();
Expand All @@ -539,12 +538,11 @@ public int sendMessage(MessageReference ref, Message message, ServerConsumer con
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
@Override
public int sendLargeMessage(MessageReference reference,
Message message,
public int sendLargeMessage(MessageReference ref,
ServerConsumer consumer,
long bodySize,
int deliveryCount) {
return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
return targetCallback.sendLargeMessage(ref, consumer, bodySize, deliveryCount);
}

/* (non-Javadoc)
Expand Down

0 comments on commit 159416b

Please sign in to comment.