Skip to content

Commit

Permalink
ARTEMIS-2139 Fix setJMSReplyTo for 1.x clients with enable1xPrefixes
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and jbertram committed Dec 14, 2018
1 parent 0acd706 commit 672f536
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 18 deletions.
Expand Up @@ -24,10 +24,11 @@
import java.util.UUID;

import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;
import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.jndi.JNDIStorable;
import org.apache.activemq.artemis.api.core.ParameterisedAddress;

/**
* ActiveMQ Artemis implementation of a JMS Destination.
Expand Down Expand Up @@ -127,6 +128,40 @@ public static Destination fromPrefixedName(final String addr, final String name)
return destination;
}

public static Destination fromPrefixed1XName(final String addr, final String name) {
ActiveMQDestination destination;
if (addr.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
destination = createQueue(addr);
} else if (addr.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
destination = createTopic(addr);
} else if (addr.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
destination = new ActiveMQTemporaryQueue(addr, null);
} else if (addr.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
destination = new ActiveMQTemporaryTopic(addr, null);
} else {
destination = new ActiveMQDestination(addr, TYPE.DESTINATION, null);
}

String unprefixedName = name;

if (name.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_QUEUE_PREFIX.length());
} else if (name.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_TOPIC_PREFIX.length());
} else if (name.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_TEMP_QUEUE_PREFIX.length());
} else if (name.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
unprefixedName = name.substring(PacketImpl.OLD_TEMP_TOPIC_PREFIX.length());
}

destination.setName(unprefixedName);

return destination;
}




public static SimpleString createQueueNameForSubscription(final boolean isDurable,
final String clientID,
final String subscriptionName) {
Expand Down
Expand Up @@ -385,16 +385,7 @@ public void setJMSReplyTo(final Destination dest) throws JMSException {
throw new InvalidDestinationException("Foreign destination " + dest);
}

String prefix = "";
if (dest instanceof ActiveMQTemporaryQueue) {
prefix = TEMP_QUEUE_QUALIFED_PREFIX;
} else if (dest instanceof ActiveMQQueue) {
prefix = QUEUE_QUALIFIED_PREFIX;
} else if (dest instanceof ActiveMQTemporaryTopic) {
prefix = TEMP_TOPIC_QUALIFED_PREFIX;
} else if (dest instanceof ActiveMQTopic) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
String prefix = prefixOf(dest);
ActiveMQDestination jbd = (ActiveMQDestination) dest;

MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
Expand All @@ -403,6 +394,20 @@ public void setJMSReplyTo(final Destination dest) throws JMSException {
}
}

protected static String prefixOf(Destination dest) {
String prefix = "";
if (dest instanceof ActiveMQTemporaryQueue) {
prefix = TEMP_QUEUE_QUALIFED_PREFIX;
} else if (dest instanceof ActiveMQQueue) {
prefix = QUEUE_QUALIFIED_PREFIX;
} else if (dest instanceof ActiveMQTemporaryTopic) {
prefix = TEMP_TOPIC_QUALIFED_PREFIX;
} else if (dest instanceof ActiveMQTopic) {
prefix = TOPIC_QUALIFIED_PREFIX;
}
return prefix;
}

protected SimpleString checkPrefix(SimpleString address) {
return address;
}
Expand Down
Expand Up @@ -219,9 +219,9 @@ public StreamMessage createStreamMessage() throws JMSException {

ActiveMQStreamMessage message;
if (enable1xPrefixes) {
message = new ActiveMQStreamMessage(session);
} else {
message = new ActiveMQStreamCompatibleMessage(session);
} else {
message = new ActiveMQStreamMessage(session);
}
return message;
}
Expand Down
Expand Up @@ -33,6 +33,10 @@ protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}

@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}

@Override
public Destination getJMSReplyTo() throws JMSException {
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.activemq.artemis.jms.client.compatible1X;

import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
Expand Down Expand Up @@ -69,11 +70,39 @@ public Destination getJMSReplyTo() throws JMSException {
return replyTo;
}

public static Destination findCompatibleReplyTo(ClientMessage message) {
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = setCompatibleReplyTo(dest, message);
}

static Destination setCompatibleReplyTo(Destination dest, ClientMessage message) throws InvalidDestinationException {
if (dest == null) {
MessageUtil.setJMSReplyTo(message, (String) null);
return null;
} else {
if (dest instanceof ActiveMQDestination == false) {
throw new InvalidDestinationException("Foreign destination " + dest);
}
ActiveMQDestination jbd = (ActiveMQDestination) dest;
final String address = jbd.getAddress();
if (hasPrefix1X(address)) {
MessageUtil.setJMSReplyTo(message, jbd.getAddress());
} else {
String prefix = prefixOf(dest);
MessageUtil.setJMSReplyTo(message, prefix + jbd.getAddress());
}
return jbd;
}
}

static Destination findCompatibleReplyTo(ClientMessage message) {
SimpleString address = MessageUtil.getJMSReplyTo(message);
if (address != null) {
final SimpleString checkedAddress = checkPrefix1X(address);
if (checkedAddress != null) {
return ActiveMQDestination.fromPrefixed1XName(address.toString(), checkedAddress.toString());
}
String name = address.toString();

// swap the old prefixes for the new ones so the proper destination type gets created
if (address.startsWith(OLD_QUEUE_QUALIFIED_PREFIX)) {
name = address.subSeq(OLD_QUEUE_QUALIFIED_PREFIX.length(), address.length()).toString();
Expand All @@ -95,6 +124,22 @@ public SimpleString checkPrefix(SimpleString address) {
return checkPrefix1X(address);
}

private static boolean hasPrefix1X(String address) {
if (address != null) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
return true;
} else if (address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX.toString())) {
return true;
} else if (address.startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
return true;
} else if (address.startsWith(PacketImpl.OLD_TEMP_TOPIC_PREFIX.toString())) {
return true;
}
}

return false;
}

protected static SimpleString checkPrefix1X(SimpleString address) {
if (address != null) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX)) {
Expand Down
Expand Up @@ -33,6 +33,11 @@ protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}

@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}

@Override
public Destination getJMSReplyTo() throws JMSException {
if (replyTo == null) {
Expand Down
Expand Up @@ -34,6 +34,10 @@ protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}

@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}

@Override
public Destination getJMSReplyTo() throws JMSException {
Expand Down
Expand Up @@ -33,6 +33,10 @@ protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}

@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}

@Override
public Destination getJMSReplyTo() throws JMSException {
Expand Down
Expand Up @@ -27,6 +27,10 @@

public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {

@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
replyTo = ActiveMQCompatibleMessage.setCompatibleReplyTo(dest, message);
}

@Override
public Destination getJMSReplyTo() throws JMSException {
Expand Down
Expand Up @@ -76,7 +76,7 @@ void check(List<Message> messages) {
checkMessage(streamMessage);

TextMessage textMessage = iterator.next();
checkMessage(objectMessage);
checkMessage(textMessage);
}


Expand Down
Expand Up @@ -113,8 +113,9 @@ public static Collection getParameters() {
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, ONE_FIVE});
combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, SNAPSHOT});

// TODO: It's not currently possible to mix reply to between 1.x and SNAPSHOT. Both sides need to be on the same version!
// combinations.addAll(combinatory(SNAPSHOT, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}, new Object[]{SNAPSHOT, ONE_FIVE}));
combinations.add(new Object[]{ONE_FIVE, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{ONE_FIVE, ONE_FIVE, SNAPSHOT});

return combinations;
}

Expand Down

0 comments on commit 672f536

Please sign in to comment.