Skip to content

Commit

Permalink
ARTEMIS-4560 Fixing defaults on Broker Connections for Broker Properties
Browse files Browse the repository at this point in the history
ARTEMIS-4566 Allow management of Mirror SNF internal queue
  • Loading branch information
clebertsuconic committed Jan 16, 2024
1 parent f990c9a commit 5e7a902
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static Process startServer(String artemisInstance, String serverName, int
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties);

// wait for start
if (timeout != 0) {
if (timeout > 0) {
waitForServerToStart(id, timeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ private Queue installMirrorController(AMQPMirrorBrokerConnectionElement replicaC
mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true);
}

server.registerQueueOnManagement(mirrorControlQueue, true);

logger.debug("Mirror queue {}", mirrorControlQueue.getName());

mirrorControlQueue.setMirrorController(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionElement {

boolean durable;
boolean durable = true;

boolean queueCreation = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,8 @@ Queue updateQueue(String name,
*/
void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;

void registerQueueOnManagement(Queue queue, boolean registerInternal) throws Exception;

/**
* Remove an {@code AddressInfo} from the broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3944,6 +3944,12 @@ public void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throw
removeAddressInfo(address, auth);
}

/** Register a queue on the management registry */
@Override
public void registerQueueOnManagement(Queue queue, boolean registerInternal) throws Exception {
managementService.registerQueue(queue, queue.getAddress(), storageManager, registerInternal);
}

@Override
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth, boolean force) throws Exception {
if (auth != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ ActiveMQServerControlImpl registerServer(PostOffice postOffice,

void registerQueue(Queue queue, SimpleString address, StorageManager storageManager) throws Exception;

void registerQueue(Queue queue, SimpleString address, StorageManager storageManager, boolean forceInternal) throws Exception;

void unregisterQueue(SimpleString name, SimpleString address, RoutingType routingType) throws Exception;

void registerAcceptor(Acceptor acceptor, TransportConfiguration configuration) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,15 @@ public synchronized void unregisterAddress(final SimpleString address) throws Ex
public synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager) throws Exception {
registerQueue(queue, addressInfo, storageManager, false);
}

private synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager,
boolean forceInternal) throws Exception {

if (addressInfo.isInternal() || queue.isInternalQueue()) {
if (!forceInternal && (addressInfo.isInternal() || queue.isInternalQueue())) {
logger.debug("won't register internal queue: {}", queue);
return;
}
Expand All @@ -314,6 +321,14 @@ public synchronized void registerQueue(final Queue queue,
registerQueue(queue, new AddressInfo(address), storageManager);
}

@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final StorageManager storageManager,
final boolean forceInternal) throws Exception {
registerQueue(queue, new AddressInfo(address), storageManager, forceInternal);
}

@Override
public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ public void registerQueue(Queue queue, SimpleString address, StorageManager stor

}

@Override
public void registerQueue(Queue queue, SimpleString address, StorageManager storageManager, boolean forceInternal) throws Exception {

}

@Override
public void unregisterQueue(SimpleString name, SimpleString address, RoutingType routingType) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,18 @@ public HelperBase setArgs(String... args) {
return this;
}

public HelperBase addArgs(String... args) {
int initialLength = this.args == null ? 0 : this.args.length;
String[] newArgs = new String[initialLength + args.length];
for (int i = 0; i < initialLength; i++) {
newArgs[i] = this.args[i];
}
for (int i = 0; i < args.length; i++) {
newArgs[i + initialLength] = args[i];
}
this.args = newArgs;
return this;
}

String[] args = new String[0];
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
Expand Down Expand Up @@ -94,6 +95,11 @@ public void stopLogging() throws Exception {
}
}

@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}

@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false);
Expand Down Expand Up @@ -579,6 +585,9 @@ public void testAddressFilter() throws Exception {
Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded() > 0);
}

SimpleManagement simpleManagement = new SimpleManagement("tcp://localhost:" + AMQP_PORT_2, null, null);
Wait.assertEquals(0, () -> simpleManagement.getMessageCountOnQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror-source"), 5000);

try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT).createConnection()) {
connection.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ public void testStats() throws Exception {
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));

Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null, 5000);

ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ public void setUp() throws Exception {
server1.getConfiguration().getAcceptorConfigurations().clear();
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
AMQPBrokerConnectConfiguration brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(false));
server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);

server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 * 1024, -1, -1);
server2.getConfiguration().getAcceptorConfigurations().clear();
server2.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61617");
brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(false));
server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);

server1.start();
Expand Down

0 comments on commit 5e7a902

Please sign in to comment.