-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[AMQ-9455] MessageStrategy policy handler
- Loading branch information
Showing
9 changed files
with
373 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
51 changes: 51 additions & 0 deletions
51
...q-broker/src/main/java/org/apache/activemq/broker/region/policy/ChainMessageStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.activemq.broker.region.policy; | ||
|
||
import java.util.Arrays; | ||
|
||
import org.apache.activemq.broker.ProducerBrokerExchange; | ||
import org.apache.activemq.command.Message; | ||
|
||
import jakarta.jms.MessageFormatRuntimeException; | ||
|
||
/** | ||
* Configurable chain of MessageProcessors | ||
* | ||
* @org.apache.xbean.XBean | ||
*/ | ||
public class ChainMessageStrategy implements MessageStrategy { | ||
|
||
private MessageProcessor[] messageProcessors; | ||
|
||
@Override | ||
public void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatRuntimeException { | ||
if(messageProcessors == null || messageProcessors.length == 0) { | ||
return; | ||
} | ||
|
||
Arrays.stream(messageProcessors).forEach(m -> m.process(producerExchange, message)); | ||
} | ||
|
||
public void setMessageProcessors(MessageProcessor[] messageProcessors) { | ||
this.messageProcessors = messageProcessors; | ||
} | ||
|
||
public MessageProcessor[] getMessageProcessor(MessageProcessor[] messageProcessors) { | ||
return this.messageProcessors; | ||
} | ||
} |
148 changes: 148 additions & 0 deletions
148
...broker/src/main/java/org/apache/activemq/broker/region/policy/HeaderMessageProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.activemq.broker.region.policy; | ||
|
||
import org.apache.activemq.broker.ProducerBrokerExchange; | ||
import org.apache.activemq.command.Message; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import jakarta.jms.MessageFormatRuntimeException; | ||
|
||
/** | ||
* Configurable chain of MessageProcessors | ||
* | ||
* @org.apache.xbean.XBean | ||
*/ | ||
public class HeaderMessageProcessor implements MessageProcessor { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(HeaderMessageProcessor.class); | ||
|
||
boolean forceDeliveryMode = false; | ||
|
||
boolean persistent = true; | ||
|
||
boolean forceExpiration = false; | ||
|
||
/** | ||
* variable which (when non-zero) is used to override | ||
* the expiration date for messages that arrive with | ||
* no expiration date set (in Milliseconds). | ||
*/ | ||
long zeroExpirationOverride = 0; | ||
|
||
/** | ||
* variable which (when non-zero) is used to limit | ||
* the expiration date (in Milliseconds). | ||
*/ | ||
long expirationCeiling = 0; | ||
|
||
/** | ||
* If true, the plugin will not update timestamp to past values | ||
* False by default | ||
*/ | ||
boolean futureOnly = false; | ||
|
||
/** | ||
* if true, update timestamp even if message has passed through a network | ||
* default false | ||
*/ | ||
boolean processNetworkMessages = false; | ||
|
||
/** | ||
* setter method for zeroExpirationOverride | ||
*/ | ||
public void setZeroExpirationOverride(long ttl) | ||
{ | ||
this.zeroExpirationOverride = ttl; | ||
} | ||
|
||
/** | ||
* setter method for expirationCeiling | ||
*/ | ||
public void setExpirationCeiling(long expirationCeiling) | ||
{ | ||
this.expirationCeiling = expirationCeiling; | ||
} | ||
|
||
public void setFutureOnly(boolean futureOnly) { | ||
this.futureOnly = futureOnly; | ||
} | ||
|
||
public void setProcessNetworkMessages(Boolean processNetworkMessages) { | ||
this.processNetworkMessages = processNetworkMessages; | ||
} | ||
|
||
@Override | ||
public void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatRuntimeException { | ||
if(isForceExpiration()) { | ||
if (message.getTimestamp() > 0 && !message.getDestination().isDLQ() && | ||
(processNetworkMessages && !producerExchange.getConnectionContext().isNetworkConnection())) { | ||
// timestamp not been disabled and has not passed through a network or processNetworkMessages=true | ||
|
||
long oldExpiration = message.getExpiration(); | ||
long newTimeStamp = System.currentTimeMillis(); | ||
long timeToLive = zeroExpirationOverride; | ||
long oldTimestamp = message.getTimestamp(); | ||
if (oldExpiration > 0) { | ||
timeToLive = oldExpiration - oldTimestamp; | ||
} | ||
if (timeToLive > 0 && expirationCeiling > 0 && timeToLive > expirationCeiling) { | ||
timeToLive = expirationCeiling; | ||
} | ||
long expiration = timeToLive + newTimeStamp; | ||
// In the scenario that the Broker is behind the clients we never want to set the | ||
// Timestamp and Expiration in the past | ||
if(!futureOnly || (expiration > oldExpiration)) { | ||
if (timeToLive > 0 && expiration > 0) { | ||
message.setExpiration(expiration); | ||
} | ||
message.setTimestamp(newTimeStamp); | ||
LOG.debug("Set message {} timestamp from {} to {}", message.getMessageId(), oldTimestamp, newTimeStamp); | ||
} | ||
} | ||
} | ||
|
||
if(forceDeliveryMode) { | ||
message.setPersistent(isPersistent()); | ||
} | ||
} | ||
|
||
public void setForceDeliveryMode(boolean forceDeliveryMode) { | ||
this.forceDeliveryMode = forceDeliveryMode; | ||
} | ||
|
||
public boolean isForceDeliveryMode() { | ||
return this.forceDeliveryMode; | ||
} | ||
|
||
public void setForceExpiration(boolean forceExpiration) { | ||
this.forceExpiration = forceExpiration; | ||
} | ||
|
||
public boolean isForceExpiration() { | ||
return this.forceExpiration; | ||
} | ||
|
||
public void setPersistent(boolean persistent) { | ||
this.persistent = persistent; | ||
} | ||
|
||
public boolean isPersistent() { | ||
return this.persistent; | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.activemq.broker.region.policy; | ||
|
||
import org.apache.activemq.broker.ProducerBrokerExchange; | ||
import org.apache.activemq.command.Message; | ||
|
||
import jakarta.jms.MessageFormatRuntimeException; | ||
|
||
public interface MessageProcessor { | ||
|
||
void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatRuntimeException; | ||
|
||
} |
28 changes: 28 additions & 0 deletions
28
activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/MessageStrategy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.activemq.broker.region.policy; | ||
|
||
import org.apache.activemq.broker.ProducerBrokerExchange; | ||
import org.apache.activemq.command.Message; | ||
|
||
import jakarta.jms.MessageFormatException; | ||
|
||
public interface MessageStrategy { | ||
|
||
void process(ProducerBrokerExchange producerExchange, Message message) throws MessageFormatException; | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.