Skip to content

Commit

Permalink
ARTEMIS-3153 Add tests for address prefixes for AMQP links
Browse files Browse the repository at this point in the history
Adds some tests to validate that the destination prefixes if set and
are used properly by the client are honored over the default address
auto create routing type condiguration.
  • Loading branch information
tabish121 committed Jan 26, 2023
1 parent ac7b0e8 commit 6e10908
Showing 1 changed file with 157 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand All @@ -34,6 +35,7 @@
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
Expand Down Expand Up @@ -70,6 +72,12 @@ protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
// Don't create anything by default since we are testing auto create
}

@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("anycastPrefix", "anycast://");
params.put("multicastPrefix", "multicast://");
}

@Override
protected void configureAddressPolicy(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
Expand All @@ -83,15 +91,13 @@ protected void configureAddressPolicy(ActiveMQServer server) {
AddressSettings settings = entry.getValue();
settings.setAutoCreateQueues(true);
settings.setDefaultAddressRoutingType(routingType);
settings.setDefaultQueueRoutingType(routingType);
logger.info("server config, isauto? {}", entry.getValue().isAutoCreateQueues());
logger.info("server config, default queue routing type? {}", entry.getValue().getDefaultQueueRoutingType());
logger.info("server config, default address routing type? {}", entry.getValue().getDefaultAddressRoutingType());
}

@Test(timeout = 30_000)
public void testCreateSender() throws Exception {
final String addressName = "sender-address";
final String addressName = getTestName();

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
Expand All @@ -104,25 +110,49 @@ public void testCreateSender() throws Exception {
assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());

final AmqpReceiver receiver = session.createReceiver(addressName);
receiver.flow(1);

final AmqpMessage message = new AmqpMessage();

message.setMessageId("msg:1");
message.setText("Test-Message");

sender.send(message);

assertNotNull(receiver.receive(5, TimeUnit.SECONDS));

sender.close();
connection.close();
}

@Test(timeout = 30_000)
public void testCreateReceiver() throws Exception {
final String addressName = "receiver-address";
final String addressName = getTestName();

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();

AmqpReceiver receiver = session.createReceiver(addressName);
final AmqpReceiver receiver = session.createReceiver(addressName);
receiver.flow(1);

AddressQueryResult address = getProxyToAddress(addressName);

assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());

final AmqpSender sender = session.createSender(addressName);
final AmqpMessage message = new AmqpMessage();

message.setMessageId("msg:1");
message.setText("Test-Message");

sender.send(message);

assertNotNull(receiver.receive(5, TimeUnit.SECONDS));

sender.close();
receiver.close();
connection.close();
}
Expand All @@ -138,7 +168,7 @@ public void testCreateSenderThatRequestsAnyCast() throws Exception {
}

private void dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType routingType) throws Exception {
final String addressName = "sender-defined-address";
final String addressName = getTestName();

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
Expand All @@ -159,6 +189,18 @@ private void dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType routi
assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());

final AmqpReceiver receiver = session.createReceiver(addressName);
receiver.flow(1);

final AmqpMessage message = new AmqpMessage();

message.setMessageId("msg:1");
message.setText("Test-Message");

sender.send(message);

assertNotNull(receiver.receive(5, TimeUnit.SECONDS));

sender.close();
connection.close();
}
Expand All @@ -174,7 +216,7 @@ public void testCreateReceiverThatRequestsAnyCast() throws Exception {
}

private void dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType routingType) throws Exception {
final String addressName = "receiver-defined-address";
final String addressName = getTestName();

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
Expand All @@ -188,13 +230,120 @@ private void dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType rou
source.setCapabilities(TOPIC_CAPABILITY);
}

AmqpReceiver receiver = session.createReceiver(source);
final AmqpReceiver receiver = session.createReceiver(source);
receiver.flow(1);

AddressQueryResult address = getProxyToAddress(addressName);

assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());

final AmqpSender sender = session.createSender(addressName);
final AmqpMessage message = new AmqpMessage();

message.setMessageId("msg:1");
message.setText("Test-Message");

sender.send(message);

assertNotNull(receiver.receive(5, TimeUnit.SECONDS));

sender.close();
receiver.close();
connection.close();
}

@Test(timeout = 30_000)
public void testCreateSenderThatRequestsMultiCastViaPrefix() throws Exception {
dotestCreateSenderThatRequestsSpecificRoutingTypeViaPrefix(RoutingType.MULTICAST);
}

@Test(timeout = 30_000)
public void testCreateSenderThatRequestsAnyCastViaPrefix() throws Exception {
dotestCreateSenderThatRequestsSpecificRoutingTypeViaPrefix(RoutingType.ANYCAST);
}

private void dotestCreateSenderThatRequestsSpecificRoutingTypeViaPrefix(RoutingType routingType) throws Exception {
final String addressName = getTestName();

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();

final String prefixedName;
if (routingType == RoutingType.ANYCAST) {
prefixedName = "anycast://" + addressName;
} else {
prefixedName = "multicast://" + addressName;
}

AmqpSender sender = session.createSender(prefixedName);

AddressQueryResult address = getProxyToAddress(addressName);

assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());

final AmqpReceiver receiver = session.createReceiver(addressName);
receiver.flow(1);

final AmqpMessage message = new AmqpMessage();

message.setMessageId("msg:1");
message.setText("Test-Message");

sender.send(message);

assertNotNull(receiver.receive(5, TimeUnit.SECONDS));

sender.close();
receiver.close();
connection.close();
}

@Test(timeout = 30_000)
public void testCreateReceiverThatRequestsMultiCastViaPrefix() throws Exception {
dotestCreateReceiverThatRequestsSpecificRoutingTypeViaPrefix(RoutingType.MULTICAST);
}

@Test(timeout = 30_000)
public void testCreateReceiverThatRequestsAnyCastViaPrefix() throws Exception {
dotestCreateReceiverThatRequestsSpecificRoutingTypeViaPrefix(RoutingType.ANYCAST);
}

private void dotestCreateReceiverThatRequestsSpecificRoutingTypeViaPrefix(RoutingType routingType) throws Exception {
final String addressName = getTestName();

AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();

final String prefixedName;
if (routingType == RoutingType.ANYCAST) {
prefixedName = "anycast://" + addressName;
} else {
prefixedName = "multicast://" + addressName;
}

final AmqpReceiver receiver = session.createReceiver(prefixedName);
receiver.flow(1);

AddressQueryResult address = getProxyToAddress(addressName);

assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());

final AmqpSender sender = session.createSender(addressName);
final AmqpMessage message = new AmqpMessage();

message.setMessageId("msg:1");
message.setText("Test-Message");

sender.send(message);

assertNotNull(receiver.receive(5, TimeUnit.SECONDS));

sender.close();
receiver.close();
connection.close();
}
Expand Down

0 comments on commit 6e10908

Please sign in to comment.