Skip to content

Commit

Permalink
ARTEMIS-2139 Fix JMS reply to Address is correct for all versions
Browse files Browse the repository at this point in the history
  • Loading branch information
mtaylor committed Oct 26, 2018
1 parent ec313ba commit 8ac9f8d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 41 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jndi.JNDIStorable;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;

Expand Down Expand Up @@ -94,24 +95,24 @@ public static Destination fromPrefixedName(final String name) {
public static Destination fromPrefixedName(final String addr, final String name) {

ActiveMQDestination destination;
String address = addr;
if (addr.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
address = addr.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
destination = createQueue(address);
} else if (addr.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
address = addr.substring(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX.length());
destination = createTopic(address);
} else if (addr.startsWith(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
address = addr.substring(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX.length());
destination = new ActiveMQTemporaryQueue(address, null);
} else if (addr.startsWith(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX)) {
String address = addr.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
address = addr.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
destination = new ActiveMQTemporaryTopic(address, null);
} else {
destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
}

String unprefixedName = name;

if (name.startsWith(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX)) {
unprefixedName = name.substring(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX.length());
} else if (name.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
Expand All @@ -122,6 +123,15 @@ public static Destination fromPrefixedName(final String addr, final String name)
unprefixedName = name.substring(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX.length());
}

if (unprefixedName.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
unprefixedName = unprefixedName.substring(PacketImpl.OLD_QUEUE_PREFIX.length());
} else if (unprefixedName.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
unprefixedName = unprefixedName.substring(PacketImpl.OLD_TOPIC_PREFIX.length());
} else if (unprefixedName.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
unprefixedName = unprefixedName.substring(PacketImpl.OLD_QUEUE_PREFIX.length());
} else if (unprefixedName.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
unprefixedName = unprefixedName.substring(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length());
}
destination.setName(unprefixedName);

return destination;
Expand Down
Expand Up @@ -46,10 +46,8 @@
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.UUID;

import static org.apache.activemq.artemis.jms.client.ActiveMQDestination.QUEUE_QUALIFIED_PREFIX;
Expand Down Expand Up @@ -370,26 +368,46 @@ public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
SimpleString address = MessageUtil.getJMSReplyTo(message);
if (address != null) {
String name = address.toString();

// swap the old prefixes for the new ones so the proper destination type gets created
if (enable1xPrefixes) {
if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_QUEUE_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_QUEUE_QUALIFED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TOPIC_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_TOPIC_QUALIFIED_PREFIX.length(), address.length()).toString();
} else if (address.startsWith(OLD_TEMP_TOPIC_QUALIFED_PREFIX)) {
name = address.subSeq(OLD_TEMP_TOPIC_QUALIFED_PREFIX.length(), address.length()).toString();
}
replyTo = ActiveMQDestination.fromPrefixedName(get1xPrefixedName(address.toString()));
} else {
replyTo = ActiveMQDestination.fromPrefixedName(address.toString());
}
replyTo = ActiveMQDestination.fromPrefixedName(address.toString(), name);
}
}
return replyTo;
}

// Is only ever called for use with older clients.
private String get1xPrefixedName(String address) {
if (address.startsWith(QUEUE_QUALIFIED_PREFIX)) {
if (!address.startsWith(QUEUE_QUALIFIED_PREFIX + PacketImpl.OLD_QUEUE_PREFIX.toString())) {
address = QUEUE_QUALIFIED_PREFIX + PacketImpl.OLD_QUEUE_PREFIX + address.substring(QUEUE_QUALIFIED_PREFIX.length());
}
} else if (address.startsWith(TOPIC_QUALIFIED_PREFIX)) {
if (!address.startsWith(TOPIC_QUALIFIED_PREFIX + PacketImpl.OLD_TOPIC_PREFIX.toString())) {
address = TOPIC_QUALIFIED_PREFIX + PacketImpl.OLD_TOPIC_PREFIX + address.substring(TOPIC_QUALIFIED_PREFIX.length());
}
} else if (address.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
if (!address.startsWith(TEMP_QUEUE_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
address = TEMP_QUEUE_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_QUEUE_PREFIX + address.substring(TEMP_QUEUE_QUALIFED_PREFIX.length());
}
} else if (address.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
if (!address.startsWith(TEMP_TOPIC_QUALIFED_PREFIX + PacketImpl.OLD_TOPIC_PREFIX.toString())) {
address = TEMP_TOPIC_QUALIFED_PREFIX + PacketImpl.OLD_TEMP_TOPIC_PREFIX + address.substring(TEMP_TOPIC_QUALIFED_PREFIX.length());
}
} else if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
address = QUEUE_QUALIFIED_PREFIX + address;
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
address = TOPIC_QUALIFIED_PREFIX + address;
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
address = TEMP_QUEUE_QUALIFED_PREFIX + address;
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
address = TEMP_TOPIC_QUALIFED_PREFIX + address;
}
return address;
}

@Override
public void setJMSReplyTo(final Destination dest) throws JMSException {

Expand Down Expand Up @@ -426,29 +444,21 @@ public Destination getJMSDestination() throws JMSException {
SimpleString name = address;

if (address != null & enable1xPrefixes) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TOPIC_PREFIX.length(), address.length());
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX)) {
name = address.subSeq(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length(), address.length());
}
}

if (address == null) {
dest = null;
} else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createQueue(address);
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createTopic(address);
dest = ActiveMQDestination.fromPrefixedName(get1xPrefixedName(address.toString()));
} else {
dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
}
if (address == null) {
dest = null;
} else if (RoutingType.ANYCAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createQueue(address);
} else if (RoutingType.MULTICAST.equals(message.getRoutingType())) {
dest = ActiveMQDestination.createTopic(address);
} else {
dest = (ActiveMQDestination) ActiveMQDestination.fromPrefixedName(address.toString());
}

if (name != null) {
((ActiveMQDestination) dest).setName(name.toString());
if (name != null) {
((ActiveMQDestination) dest).setName(name.toString());
}
}
}

Expand Down

0 comments on commit 8ac9f8d

Please sign in to comment.