Skip to content

Commit

Permalink
ARTEMIS-2142 Support JMSXGroupSeq -1 to close/reset group.
Browse files Browse the repository at this point in the history
Add test cases
Add GroupSequence to Message Interface
Implement Support closing/reset group in queue impl
Update Documentation (copy from activemq5)

Change/Fix OpenWireMessageConverter to use default of 0 if not set, for OpenWire as per documentation http://activemq.apache.org/activemq-message-properties.html
  • Loading branch information
michaelandrepearce authored and clebertsuconic committed Oct 30, 2018
1 parent 8bf549b commit f30ca44
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 49 deletions.
Expand Up @@ -113,6 +113,8 @@ public interface Message {
*/
SimpleString HDR_GROUP_ID = new SimpleString("_AMQ_GROUP_ID");

SimpleString HDR_GROUP_SEQUENCE = new SimpleString("_AMQ_GROUP_SEQUENCE");

/**
* to determine if the Large Message was compressed.
*/
Expand Down Expand Up @@ -248,6 +250,10 @@ default SimpleString getGroupID() {
return null;
}

default int getGroupSequence() {
return 0;
}

SimpleString getReplyTo();

Message setReplyTo(SimpleString address);
Expand Down
Expand Up @@ -289,6 +289,12 @@ public SimpleString getGroupID() {
return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
}

@Override
public int getGroupSequence() {
final Integer integer = this.getIntProperty(Message.HDR_GROUP_SEQUENCE);
return integer == null ? 0 : integer;
}

/**
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
Expand Down Expand Up @@ -49,6 +48,8 @@ public class MessageUtil {

public static final String JMSXGROUPID = "JMSXGroupID";

public static final String JMSXGROUPSEQ = "JMSXGroupSeq";

public static final String JMSXUSERID = "JMSXUserID";

public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
Expand Down Expand Up @@ -154,6 +155,8 @@ public static Set<String> getPropertyNames(Message message) {
for (SimpleString propName : message.getPropertyNames()) {
if (propName.equals(Message.HDR_GROUP_ID)) {
set.add(MessageUtil.JMSXGROUPID);
} else if (propName.equals(Message.HDR_GROUP_SEQUENCE)) {
set.add(MessageUtil.JMSXGROUPSEQ);
} else if (propName.equals(Message.HDR_VALIDATED_USER)) {
set.add(MessageUtil.JMSXUSERID);
} else if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
Expand All @@ -169,6 +172,7 @@ public static Set<String> getPropertyNames(Message message) {
public static boolean propertyExists(Message message, String name) {
return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
(MessageUtil.JMSXGROUPID.equals(name) && message.containsProperty(Message.HDR_GROUP_ID)) ||
(MessageUtil.JMSXGROUPSEQ.equals(name) && message.containsProperty(Message.HDR_GROUP_SEQUENCE)) ||
(MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER));
}
}
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.jms.client;

import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
Expand All @@ -30,11 +36,6 @@
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -561,25 +562,29 @@ public short getShortProperty(final String name) throws JMSException {

@Override
public int getIntProperty(final String name) throws JMSException {
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return message.getDeliveryCount();
}

try {
return message.getIntProperty(name);
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return message.getDeliveryCount();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getIntProperty(name);
}
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
}

@Override
public long getLongProperty(final String name) throws JMSException {
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return message.getDeliveryCount();
}

try {
return message.getLongProperty(name);
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return message.getDeliveryCount();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return message.getGroupSequence();
} else {
return message.getLongProperty(name);
}
} catch (ActiveMQPropertyConversionException e) {
throw new MessageFormatException(e.getMessage());
}
Expand Down Expand Up @@ -611,7 +616,9 @@ public String getStringProperty(final String name) throws JMSException {

try {
if (MessageUtil.JMSXGROUPID.equals(name)) {
return message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID);
return Objects.toString(message.getGroupID(), null);
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
return Integer.toString(message.getGroupSequence());
} else if (MessageUtil.JMSXUSERID.equals(name)) {
return message.getValidatedUserID();
} else {
Expand All @@ -624,13 +631,20 @@ public String getStringProperty(final String name) throws JMSException {

@Override
public Object getObjectProperty(final String name) throws JMSException {
final Object val;
if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
return String.valueOf(message.getDeliveryCount());
val = message.getDeliveryCount();
} else if (MessageUtil.JMSXGROUPID.equals(name)) {
val = message.getGroupID();
} else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
val = message.getGroupSequence();
} else if (MessageUtil.JMSXUSERID.equals(name)) {
val = message.getValidatedUserID();
} else {
val = message.getObjectProperty(name);
}

Object val = message.getObjectProperty(name);
if (val instanceof SimpleString) {
val = val.toString();
return val.toString();
}
return val;
}
Expand Down Expand Up @@ -662,12 +676,18 @@ public void setShortProperty(final String name, final short value) throws JMSExc
@Override
public void setIntProperty(final String name, final int value) throws JMSException {
checkProperty(name);
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
}
message.putIntProperty(name, value);
}

@Override
public void setLongProperty(final String name, final long value) throws JMSException {
checkProperty(name);
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
}
message.putLongProperty(name, value);
}

Expand All @@ -687,9 +707,11 @@ public void setDoubleProperty(final String name, final double value) throws JMSE
public void setStringProperty(final String name, final String value) throws JMSException {
checkProperty(name);

if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
return;
} else if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
} else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
} else if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
return;
} else {
message.putStringProperty(name, value);
Expand All @@ -698,11 +720,13 @@ public void setStringProperty(final String name, final String value) throws JMSE

@Override
public void setObjectProperty(final String name, final Object value) throws JMSException {
if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
return;
}

if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
return;
}
if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
return;
}

Expand All @@ -716,14 +740,14 @@ public void setObjectProperty(final String name, final Object value) throws JMSE
return;
}

checkProperty(name);

if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name)) {
setInputStream((InputStream) value);

return;
}

checkProperty(name);

try {
message.putObjectProperty(name, value);
} catch (ActiveMQPropertyConversionException e) {
Expand Down Expand Up @@ -979,10 +1003,47 @@ private void checkPriority(final int priority) throws JMSException {
}
}

private boolean handleCoreProperty(final String name,
final Object value,
String jmsPropertyName,
SimpleString corePropertyName) {
private boolean handleCoreIntegerProperty(final String name,
final Object value,
String jmsPropertyName,
SimpleString corePropertyName) {
if (jmsPropertyName.equals(name)) {
return handleCoreIntegerProperty(name, getInteger(value), jmsPropertyName, corePropertyName);
}
return false;
}

private boolean handleCoreIntegerProperty(final String name,
final int value,
String jmsPropertyName,
SimpleString corePropertyName) {
boolean result = false;

if (jmsPropertyName.equals(name)) {
message.putIntProperty(corePropertyName, value);
result = true;
}

return result;
}

private static int getInteger(final Object value) {
Objects.requireNonNull(value);
final int integer;
if (value instanceof Integer) {
integer = (Integer) value;
} else if (value instanceof Number) {
integer = ((Number) value).intValue();
} else {
integer = Integer.parseInt(value.toString());
}
return integer;
}

private boolean handleCoreStringProperty(final String name,
final Object value,
String jmsPropertyName,
SimpleString corePropertyName) {
boolean result = false;

if (jmsPropertyName.equals(name)) {
Expand Down
Expand Up @@ -1099,6 +1099,17 @@ public SimpleString getGroupID() {
}
}

@Override
public int getGroupSequence() {
ensureMessageDataScanned();

if (properties != null && properties.getGroupSequence() != null) {
return properties.getGroupSequence().intValue();
} else {
return 0;
}
}

@Override
public Long getScheduledDeliveryTime() {
if (scheduledTime < 0) {
Expand Down
Expand Up @@ -83,7 +83,7 @@ public final class OpenWireMessageConverter {
private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE;
private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
Expand Down Expand Up @@ -616,7 +616,7 @@ private static ActiveMQMessage toAMQMessage(MessageReference reference,

Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE);
if (groupSequence == null) {
groupSequence = -1;
groupSequence = 0;
}
amqMsg.setGroupSequence(groupSequence);

Expand Down
Expand Up @@ -1613,6 +1613,10 @@ void slowConsumerDetected(String sessionID,
format = Message.Format.MESSAGE_FORMAT)
void problemAddingConfigReloadCallback(String propertyName, @Cause Exception e);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 222278, value = "Unable to extract GroupSequence from message", format = Message.Format.MESSAGE_FORMAT)
void unableToExtractGroupSequence(@Cause Throwable e);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
Expand Down
Expand Up @@ -2531,8 +2531,8 @@ private void deliver() {

removeMessageReference(holder, ref);

if (groupID != null && groupConsumer == null && redistributor == null) {
groups.put(groupID, consumer);
if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}

handled++;
Expand Down Expand Up @@ -2635,6 +2635,20 @@ private SimpleString extractGroupID(MessageReference ref) {
}
}

private int extractGroupSequence(MessageReference ref) {
if (internalQueue) {
return 0;
} else {
try {
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
return ref.getMessage().getGroupSequence();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.unableToExtractGroupSequence(e);
return 0;
}
}
}

protected void refRemoved(MessageReference ref) {
queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
pendingMetrics.decrementMetrics(ref);
Expand Down Expand Up @@ -3110,8 +3124,9 @@ private boolean deliverDirect(final MessageReference ref) {
HandleStatus status = handle(ref, consumer);

if (status == HandleStatus.HANDLED) {
if (groupID != null && groupConsumer == null && redistributor == null) {
groups.put(groupID, consumer);

if (redistributor == null) {
handleMessageGroup(ref, consumer, groupConsumer, groupID);
}

messagesAdded.incrementAndGet();
Expand All @@ -3130,6 +3145,17 @@ private boolean deliverDirect(final MessageReference ref) {
}
}

private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
if (groupID != null) {
if (extractGroupSequence(ref) == -1) {
groups.remove(groupID);
}
if (groupConsumer == null) {
groups.put(groupID, consumer);
}
}
}

private void proceedDeliver(Consumer consumer, MessageReference reference) {
try {
consumer.proceedDeliver(reference);
Expand Down

0 comments on commit f30ca44

Please sign in to comment.