Skip to content

Commit

Permalink
[AMQ-9455] MessageStrategy policy handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mattrpav committed Apr 30, 2024
1 parent e116b5a commit 960c8d8
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.MessageStrategy;
import org.apache.activemq.broker.region.policy.SlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
Expand Down Expand Up @@ -99,6 +100,7 @@ public abstract class BaseDestination implements Destination {
private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
protected int cursorMemoryHighWaterMark = 70;
protected int storeUsageHighWaterMark = 100;
private MessageStrategy messageStrategy;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
Expand Down Expand Up @@ -942,4 +944,12 @@ public boolean isPersistJMSRedelivered() {
public SystemUsage getSystemUsage() {
return systemUsage;
}

public MessageStrategy getMessageStrategy() {
return this.messageStrategy;
}

public void setMessageStrategy(MessageStrategy messageStrategy) {
this.messageStrategy = messageStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import jakarta.jms.InvalidSelectorException;
import jakarta.jms.JMSException;
import jakarta.jms.MessageFormatException;
import jakarta.jms.MessageFormatRuntimeException;
import jakarta.jms.ResourceAllocationException;

import org.apache.activemq.broker.BrokerService;
Expand Down Expand Up @@ -625,6 +627,15 @@ public void send(final ProducerBrokerExchange producerExchange, final Message me
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();

if(getMessageStrategy() != null) {
try {
getMessageStrategy().process(producerExchange, message);
} catch (MessageFormatRuntimeException e) {
throw new MessageFormatException(e.getMessage(), e.getErrorCode());
}
}

if (message.isExpired()) {
// message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import org.slf4j.LoggerFactory;

import jakarta.jms.JMSException;
import jakarta.jms.MessageFormatException;
import jakarta.jms.MessageFormatRuntimeException;

import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;

Expand Down Expand Up @@ -371,6 +373,14 @@ public void send(final ProducerBrokerExchange producerExchange, final Message me

message.setRegionDestination(this);

if(getMessageStrategy() != null) {
try {
getMessageStrategy().process(producerExchange, message);
} catch (MessageFormatRuntimeException e) {
throw new MessageFormatException(e.getMessage(), e.getErrorCode());
}
}

// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if (message.isExpired()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;

import java.util.Arrays;

import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;

import jakarta.jms.MessageFormatRuntimeException;

/**
* Configurable chain of MessageProcessors
*
* @org.apache.xbean.XBean
*/
public class ChainMessageStrategy implements MessageStrategy {

private MessageProcessor[] messageProcessors;

@Override
public void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatRuntimeException {
if(messageProcessors == null || messageProcessors.length == 0) {
return;
}

Arrays.stream(messageProcessors).forEach(m -> m.process(producerExchange, message));
}

public void setMessageProcessors(MessageProcessor[] messageProcessors) {
this.messageProcessors = messageProcessors;
}

public MessageProcessor[] getMessageProcessor(MessageProcessor[] messageProcessors) {
return this.messageProcessors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;

import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.jms.MessageFormatRuntimeException;

/**
* Configurable chain of MessageProcessors
*
* @org.apache.xbean.XBean
*/
public class HeaderMessageProcessor implements MessageProcessor {

private static final Logger LOG = LoggerFactory.getLogger(HeaderMessageProcessor.class);

boolean forceDeliveryMode = false;

boolean persistent = true;

boolean forceExpiration = false;

/**
* variable which (when non-zero) is used to override
* the expiration date for messages that arrive with
* no expiration date set (in Milliseconds).
*/
long zeroExpirationOverride = 0;

/**
* variable which (when non-zero) is used to limit
* the expiration date (in Milliseconds).
*/
long expirationCeiling = 0;

/**
* If true, the plugin will not update timestamp to past values
* False by default
*/
boolean futureOnly = false;

/**
* if true, update timestamp even if message has passed through a network
* default false
*/
boolean processNetworkMessages = false;

/**
* setter method for zeroExpirationOverride
*/
public void setZeroExpirationOverride(long ttl)
{
this.zeroExpirationOverride = ttl;
}

/**
* setter method for expirationCeiling
*/
public void setExpirationCeiling(long expirationCeiling)
{
this.expirationCeiling = expirationCeiling;
}

public void setFutureOnly(boolean futureOnly) {
this.futureOnly = futureOnly;
}

public void setProcessNetworkMessages(Boolean processNetworkMessages) {
this.processNetworkMessages = processNetworkMessages;
}

@Override
public void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatRuntimeException {
if(isForceExpiration()) {
if (message.getTimestamp() > 0 && !message.getDestination().isDLQ() &&
(processNetworkMessages && !producerExchange.getConnectionContext().isNetworkConnection())) {
// timestamp not been disabled and has not passed through a network or processNetworkMessages=true

long oldExpiration = message.getExpiration();
long newTimeStamp = System.currentTimeMillis();
long timeToLive = zeroExpirationOverride;
long oldTimestamp = message.getTimestamp();
if (oldExpiration > 0) {
timeToLive = oldExpiration - oldTimestamp;
}
if (timeToLive > 0 && expirationCeiling > 0 && timeToLive > expirationCeiling) {
timeToLive = expirationCeiling;
}
long expiration = timeToLive + newTimeStamp;
// In the scenario that the Broker is behind the clients we never want to set the
// Timestamp and Expiration in the past
if(!futureOnly || (expiration > oldExpiration)) {
if (timeToLive > 0 && expiration > 0) {
message.setExpiration(expiration);
}
message.setTimestamp(newTimeStamp);
LOG.debug("Set message {} timestamp from {} to {}", message.getMessageId(), oldTimestamp, newTimeStamp);
}
}
}

if(forceDeliveryMode) {
message.setPersistent(isPersistent());
}
}

public void setForceDeliveryMode(boolean forceDeliveryMode) {
this.forceDeliveryMode = forceDeliveryMode;
}

public boolean isForceDeliveryMode() {
return this.forceDeliveryMode;
}

public void setForceExpiration(boolean forceExpiration) {
this.forceExpiration = forceExpiration;
}

public boolean isForceExpiration() {
return this.forceExpiration;
}

public void setPersistent(boolean persistent) {
this.persistent = persistent;
}

public boolean isPersistent() {
return this.persistent;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;

import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;

import jakarta.jms.MessageFormatRuntimeException;

public interface MessageProcessor {

void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatRuntimeException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;

import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;

import jakarta.jms.MessageFormatException;

public interface MessageStrategy {

void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class PolicyEntry extends DestinationMapEntry {
private int sendFailIfNoSpace = -1;
private long sendFailIfNoSpaceAfterTimeout = -1;

private MessageStrategy messageStrategy = null;

public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
Expand All @@ -139,6 +140,7 @@ public void configure(Broker broker,Queue queue) {
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
queue.setMessageStrategy(getMessageStrategy());
}

public void update(Queue queue) {
Expand Down Expand Up @@ -201,6 +203,7 @@ public void configure(Broker broker,Topic topic) {
topic.getMemoryUsage().setLimit(memoryLimit);
}
topic.setLazyDispatch(isLazyDispatch());
topic.setMessageStrategy(getMessageStrategy());
}

public void update(Topic topic) {
Expand Down Expand Up @@ -1165,4 +1168,12 @@ public boolean isUseTopicSubscriptionInflightStats() {
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}

public void setMessageStrategy(MessageStrategy messageStrategy) {
this.messageStrategy = messageStrategy;
}

public MessageStrategy getMessageStrategy() {
return this.messageStrategy;
}
}

0 comments on commit 960c8d8

Please sign in to comment.