Skip to content

Commit

Permalink
ARTEMIS-876 Internalise Cluster Namespace and remove JMS Prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
mtaylor committed Dec 9, 2016
1 parent 84e8a87 commit 0006627
Show file tree
Hide file tree
Showing 18 changed files with 148 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ under the License.
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<jms xmlns="urn:activemq:jms">
<queue name="DLQ"/>
<queue name="ExpiryQueue"/>${jms-list.settings}
</jms>

<core xmlns="urn:activemq:core">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">

<name>${name}</name>

Expand Down Expand Up @@ -89,14 +85,28 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
</address-setting>
</address-settings>

<addresses>
<address name="DLQ" type="anycast">
<queues>
<queue name="DLQ" />
</queues>
</address>
<address name="ExpiryQueue" type="anycast">
<queues>
<queue name="ExpiryQueue" />
</queues>
</address>
</addresses>

</core>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
<connector-ref>artemis</connector-ref>
<message-load-balancing>${message-load-balancing}</message-load-balancing>
<max-hops>${max-hops}</max-hops>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ public static String getDefaultHapolicyBackupStrategy() {

public static String DEFAULT_NETWORK_CHECK_NIC = null;

public static final String DEFAULT_INTERNAL_NAMING_PREFIX = "$.artemis.internal.";

/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
Expand Down Expand Up @@ -1199,6 +1201,10 @@ public static boolean getDefaultDeleteQueueOnNoConsumers() {
return DEFAULT_DELETE_QUEUE_ON_NO_CONSUMERS;
}

public static String getInternalNamingPrefix() {
return DEFAULT_INTERNAL_NAMING_PREFIX;
}

public static String getDefaultSystemPropertyPrefix() {
return DEFAULT_SYSTEM_PROPERTY_PREFIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,19 +299,31 @@ public MessageProducer createProducer(final Destination destination) throws JMSE
if (jbd != null) {
ClientSession.AddressQuery response = session.addressQuery(jbd.getSimpleAddress());

if (!response.isExists() && response.isAutoCreateJmsQueues()) {
if (jbd.isQueue()) {
session.createAddress(jbd.getSimpleAddress(), false);
session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true);
} else {
session.createAddress(jbd.getSimpleAddress(), true);
if (jbd.isQueue()) {
if (!response.isExists()) {
if (response.isAutoCreateJmsQueues()) {
session.createAddress(jbd.getSimpleAddress(), false);
} else {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
}

} else if (!response.isExists() && !response.isAutoCreateJmsQueues()) {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
if (response.getQueueNames().isEmpty()) {
if (response.isAutoCreateJmsQueues()) {
session.createQueue(jbd.getSimpleAddress(), jbd.getSimpleAddress(), null, true);
} else {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
}
} else {
if (!response.isExists()) {
if (response.isAutoCreateJmsTopics()) {
session.createAddress(jbd.getSimpleAddress(), true);
} else {
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
}
}
}

connection.addKnownDestination(jbd.getSimpleAddress());
}

ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,8 @@ Configuration addDiscoveryGroupConfiguration(final String key,

Configuration setMaxDiskUsage(int maxDiskUsage);

ConfigurationImpl setInternalNamingPrefix(String internalNamingPrefix);

Configuration setDiskScanPeriod(int diskScanPeriod);

int getDiskScanPeriod();
Expand Down Expand Up @@ -1078,4 +1080,5 @@ Configuration addDiscoveryGroupConfiguration(final String key,

Configuration setNetworkCheckPing6Command(String command);

String getInternalNamingPrefix();
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ public class ConfigurationImpl implements Configuration, Serializable {

private String networkCheckPing6Command = NetworkHealthCheck.IPV6_DEFAULT_COMMAND;

private String internalNamingPrefix = ActiveMQDefaultConfiguration.getInternalNamingPrefix();

/**
* Parent folder for all data folders.
*/
Expand Down Expand Up @@ -1900,6 +1902,17 @@ public int getDiskScanPeriod() {
return diskScanPeriod;
}

@Override
public String getInternalNamingPrefix() {
return internalNamingPrefix;
}

@Override
public ConfigurationImpl setInternalNamingPrefix(String internalNamingPrefix) {
this.internalNamingPrefix = internalNamingPrefix;
return this;
}

@Override
public ConfigurationImpl setDiskScanPeriod(int diskScanPeriod) {
this.diskScanPeriod = diskScanPeriod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String DISK_SCAN_PERIOD = "disk-scan-period";

private static final String INTERNAL_NAMING_PREFIX = "internal-naming-prefix";

// Attributes ----------------------------------------------------

private boolean validateAIO = false;
Expand Down Expand Up @@ -298,6 +300,8 @@ public void parseMainConfig(final Element e, final Configuration config) throws

config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO));

config.setInternalNamingPrefix(getString(e, INTERNAL_NAMING_PREFIX, config.getInternalNamingPrefix(), Validators.NO_CHECK));

// parsing cluster password
String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,11 @@ Queue createQueue(SimpleString addressName,

void removeClientConnection(String clientId);

AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception;

AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;

AddressInfo removeAddressInfo(SimpleString address) throws Exception;

String getInternalNamingPrefix();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class ClusterConnectionBridge extends BridgeImpl {

private final ServerLocatorInternal discoveryLocator;

private final String storeAndForwardPrefix;

public ClusterConnectionBridge(final ClusterConnection clusterConnection,
final ClusterManager clusterManager,
final ServerLocatorInternal targetLocator,
Expand All @@ -104,7 +106,8 @@ public ClusterConnectionBridge(final ClusterConnection clusterConnection,
final SimpleString managementAddress,
final SimpleString managementNotificationAddress,
final MessageFlowRecord flowRecord,
final TransportConfiguration connector) {
final TransportConfiguration connector,
final String storeAndForwardPrefix) {
super(targetLocator, initialConnectAttempts, reconnectAttempts, 0, // reconnectAttemptsOnSameNode means nothing on the clustering bridge since we always try the same
retryInterval, retryMultiplier, maxRetryInterval, nodeUUID, name, queue, executor, filterString, forwardingAddress, scheduledExecutor, transformer, useDuplicateDetection, user, password, storageManager);

Expand All @@ -128,6 +131,8 @@ public ClusterConnectionBridge(final ClusterConnection clusterConnection,
if (logger.isTraceEnabled()) {
logger.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception("trace"));
}

this.storeAndForwardPrefix = storeAndForwardPrefix;
}

@Override
Expand Down Expand Up @@ -216,6 +221,8 @@ private void setupNotificationConsumer() throws Exception {

SimpleString notifQueueName = new SimpleString(qName);

String filterString = flowRecord.getAddress();

SimpleString filter = new SimpleString(ManagementHelper.HDR_BINDING_TYPE + "<>" +
BindingType.DIVERT.toInt() +
" AND " +
Expand All @@ -239,7 +246,7 @@ private void setupNotificationConsumer() throws Exception {
"<" +
flowRecord.getMaxHops() +
" AND (" +
createSelectorFromAddress(flowRecord.getAddress()) +
createSelectorFromAddress(appendIgnoresToFilter(flowRecord.getAddress())) +
")");

sessionConsumer.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter);
Expand All @@ -266,6 +273,7 @@ private void setupNotificationConsumer() throws Exception {
}
}


/**
* Takes in a string of an address filter or comma separated list and generates an appropriate JMS selector for
* filtering queues.
Expand Down Expand Up @@ -332,6 +340,16 @@ public static String buildSelectorFromArray(String[] list) {
return builder.toString();
}

private String appendIgnoresToFilter(String filterString) {
if (filterString != null && !filterString.isEmpty()) {
filterString += ",";
}
filterString += "!" + storeAndForwardPrefix;
filterString += ",!" + managementAddress;
filterString += ",!" + managementNotificationAddress;
return filterString;
}

@Override
protected void afterConnect() throws Exception {
super.afterConnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn

private static final Logger logger = Logger.getLogger(ClusterConnectionImpl.class);

private static final String SN_PREFIX = "sf.";
/**
* When getting member on node-up and down we have to remove the name from the transport config
* as the setting we build here doesn't need to consider the name, so use the same name on all
Expand Down Expand Up @@ -170,6 +171,8 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn

private final int clusterNotificationAttempts;

private final String storeAndForwardPrefix;

public ClusterConnectionImpl(final ClusterManager manager,
final TransportConfiguration[] staticTranspConfigs,
final TransportConfiguration connector,
Expand Down Expand Up @@ -277,6 +280,7 @@ public ClusterConnectionImpl(final ClusterManager manager,
}
}

this.storeAndForwardPrefix = server.getInternalNamingPrefix() + SN_PREFIX;
}

public ClusterConnectionImpl(final ClusterManager manager,
Expand Down Expand Up @@ -375,6 +379,8 @@ public ClusterConnectionImpl(final ClusterManager manager,
clusterConnector = new DiscoveryClusterConnector(dg);

this.manager = manager;

this.storeAndForwardPrefix = server.getInternalNamingPrefix() + SN_PREFIX;
}

@Override
Expand Down Expand Up @@ -702,7 +708,7 @@ public void nodeUP(final TopologyMember topologyMember, final boolean last) {

// New node - create a new flow record

final SimpleString queueName = new SimpleString("sf." + name + "." + nodeID);
final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);

Binding queueBinding = postOffice.getBinding(queueName);

Expand Down Expand Up @@ -799,7 +805,7 @@ private void createNewRecord(final long eventUID,
targetLocator.addIncomingInterceptor(new IncomingInterceptorLookingForExceptionMessage(manager, executorFactory.getExecutor()));
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, eventUID, targetNodeID, connector, queueName, queue);

ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server.getStorageManager(), managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector());
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this, manager, targetLocator, serverLocator, initialConnectAttempts, reconnectAttempts, retryInterval, retryIntervalMultiplier, maxRetryInterval, nodeManager.getUUID(), record.getEventUID(), record.getTargetNodeID(), record.getQueueName(), record.getQueue(), executorFactory.getExecutor(), null, null, scheduledExecutor, null, useDuplicateDetection, clusterUser, clusterPassword, server.getStorageManager(), managementService.getManagementAddress(), managementService.getManagementNotificationAddress(), record, record.getConnector(), storeAndForwardPrefix);

targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {

private static final Logger logger = Logger.getLogger(ActiveMQServerImpl.class);

public static final String INTERNAL_NAMING_PREFIX = "$.artemis.internal";

/**
* JMS Topics (which are outside of the scope of the core API) will require a dumb subscription
* with a dummy-filter at this current version as a way to keep its existence valid and TCK
Expand Down Expand Up @@ -719,7 +721,11 @@ public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
}
}

return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues, autoCreateJmsTopics);
if (autoCreateJmsTopics) {
putAddressInfoIfAbsent(new AddressInfo(address));
}

return new BindingQueryResult(getAddressInfo(address) != null, names, autoCreateJmsQueues, autoCreateJmsTopics);
}

@Override
Expand Down Expand Up @@ -2220,14 +2226,7 @@ private void deployAddressesFromConfiguration() throws Exception {

private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
deployQueue(SimpleString.toSimpleString(config.getAddress()),
SimpleString.toSimpleString(config.getName()),
SimpleString.toSimpleString(config.getFilterString()),
config.isDurable(),
false,
false,
config.getMaxConsumers(),
config.getDeleteOnNoConsumers());
deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false, false, config.getMaxConsumers(), config.getDeleteOnNoConsumers());
}
}

Expand Down Expand Up @@ -2330,6 +2329,18 @@ private void recoverStoredConfigs() throws Exception {
}
}

@Override
public AddressInfo putAddressInfoIfAbsent(AddressInfo addressInfo) throws Exception {
AddressInfo result = postOffice.addAddressInfo(addressInfo);

// TODO: is this the right way to do this?
long txID = storageManager.generateID();
storageManager.addAddressBinding(txID, addressInfo);
storageManager.commitBindings(txID);

return result;
}

@Override
public AddressInfo createOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception {
AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo);
Expand All @@ -2354,6 +2365,11 @@ public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
return result;
}

@Override
public String getInternalNamingPrefix() {
return configuration.getInternalNamingPrefix();
}

@Override
public AddressInfo getAddressInfo(SimpleString address) {
return postOffice.getAddressInfo(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public long scaleDownMessages(ClientSessionFactory sessionFactory,
}
}

if (address.toString().startsWith("sf.")) {
String sfPrefix = ((PostOfficeImpl) postOffice).getServer().getInternalNamingPrefix() + "sf.";
if (address.toString().startsWith(sfPrefix)) {
messageCount += scaleDownSNF(address, queues, producer);
} else {
messageCount += scaleDownRegularMessages(address, queues, session, producer);
Expand Down
Loading

0 comments on commit 0006627

Please sign in to comment.