From 603462a1a5e56418c3bc65ac221919feef598c60 Mon Sep 17 00:00:00 2001 From: Domenico Francesco Bruscino Date: Tue, 8 Mar 2022 12:02:39 +0100 Subject: [PATCH] ARTEMIS-3708 Collapse key transformer into policy --- .../ConnectionRouterConfiguration.java | 9 --- .../impl/FileConfigurationParser.java | 15 ----- .../impl/ConnectionRouterControlImpl.java | 4 +- .../core/server/routing/ConnectionRouter.java | 50 ++++++--------- .../routing/ConnectionRouterManager.java | 21 +------ .../core/server/routing/KeyResolver.java | 13 ++-- .../ConsistentHashModuloPolicy.java} | 41 +++++++----- .../policies/ConsistentHashPolicy.java | 6 +- .../core/server/routing/policies/Policy.java | 8 ++- .../policies/PolicyFactoryResolver.java | 1 + .../routing/transformer/KeyTransformer.java | 30 --------- .../transformer/TransformerFactory.java | 22 ------- .../TransformerFactoryResolver.java | 62 ------------------- .../schema/artemis-configuration.xsd | 4 +- .../config/impl/FileConfigurationTest.java | 8 +-- .../routing/ConnectionRouterManagerTest.java | 14 ++--- .../server/routing/ConnectionRouterTest.java | 15 ++--- ...ResolverTest.java => KeyResolverTest.java} | 53 +++++++++++----- .../ConsistentHashModuloPolicyTest.java} | 20 +++--- .../TransformerFactoryResolverTest.java | 43 ------------- .../ConfigurationTest-full-config.xml | 6 +- .../ConfigurationTest-xinclude-config.xml | 6 +- docs/user-manual/en/connection-routers.md | 33 +++++----- .../connection-router/evenly-redirect/pom.xml | 1 + .../resources/activemq/server0/broker.xml | 1 - .../symmetric-redirect/pom.xml | 2 + .../resources/activemq/server0/broker.xml | 1 - .../resources/activemq/server1/broker.xml | 1 - .../routing/AutoClientIDShardClusterTest.java | 14 ++--- 29 files changed, 165 insertions(+), 339 deletions(-) rename artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/{transformer/ConsistentHashModulo.java => policies/ConsistentHashModuloPolicy.java} (62%) delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/KeyTransformer.java delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactory.java delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolver.java rename artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/{targets/KeyTypeResolverTest.java => KeyResolverTest.java} (73%) rename artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/{transformer/ConsistentHashModuloTest.java => policies/ConsistentHashModuloPolicyTest.java} (68%) delete mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolverTest.java diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java index 47596d91693..2b6af82ac15 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java @@ -29,7 +29,6 @@ public class ConnectionRouterConfiguration implements Serializable { private CacheConfiguration cacheConfiguration = null; private PoolConfiguration poolConfiguration = null; private NamedPropertyConfiguration policyConfiguration = null; - private NamedPropertyConfiguration transformerConfiguration = null; public String getName() { return name; @@ -93,12 +92,4 @@ public ConnectionRouterConfiguration setPoolConfiguration(PoolConfiguration pool this.poolConfiguration = poolConfiguration; return this; } - - public void setTransformerConfiguration(NamedPropertyConfiguration configuration) { - this.transformerConfiguration = configuration; - } - - public NamedPropertyConfiguration getTransformerConfiguration() { - return transformerConfiguration; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index c12fbcb1a10..26fbcbc40a9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -94,7 +94,6 @@ import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryResolver; import org.apache.activemq.artemis.core.server.routing.KeyType; -import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin; @@ -2686,10 +2685,6 @@ private void parseConnectionRouterConfiguration(final Element e, final Configura poolConfiguration = new PoolConfiguration(); parsePoolConfiguration((Element) child, config, poolConfiguration); connectionRouterConfiguration.setPoolConfiguration(poolConfiguration); - } else if (child.getNodeName().equals("local-target-key-transformer")) { - policyConfiguration = new NamedPropertyConfiguration(); - parseTransformerConfiguration((Element) child, policyConfiguration); - connectionRouterConfiguration.setTransformerConfiguration(policyConfiguration); } } @@ -2704,16 +2699,6 @@ private void parseCacheConfiguration(final Element e, final CacheConfiguration c cacheConfiguration.getTimeout(), Validators.GE_ZERO)); } - private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException { - String name = e.getAttribute("name"); - - TransformerFactoryResolver.getInstance().resolve(name); - - policyConfiguration.setName(name); - - policyConfiguration.setProperties(getMapOfChildPropertyElements(e)); - } - private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException { String name = e.getAttribute("name"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ConnectionRouterControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ConnectionRouterControlImpl.java index 416ad716463..dc05f0576e6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ConnectionRouterControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ConnectionRouterControlImpl.java @@ -119,12 +119,12 @@ public String getLocalTargetFilter() { @Override public void setTargetKeyFilter(String regExp) { - connectionRouter.getTargetKeyResolver().setKeyFilter(regExp); + connectionRouter.getKeyResolver().setKeyFilter(regExp); } @Override public String getTargetKeyFilter() { - return connectionRouter.getTargetKeyResolver().getKeyFilter(); + return connectionRouter.getKeyResolver().getKeyFilter(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java index a51dd420441..fed0bc0a80e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java @@ -24,7 +24,6 @@ import org.apache.activemq.artemis.core.server.routing.pools.Pool; import org.apache.activemq.artemis.core.server.routing.targets.Target; import org.apache.activemq.artemis.core.server.routing.targets.TargetResult; -import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.jboss.logging.Logger; @@ -51,8 +50,6 @@ public class ConnectionRouter implements ActiveMQComponent { private final Policy policy; - private final KeyTransformer transformer; - private final Cache cache; private volatile boolean started = false; @@ -61,10 +58,14 @@ public String getName() { return name; } - public KeyType getTargetKey() { + public KeyType getKey() { return keyType; } + public KeyResolver getKeyResolver() { + return keyResolver; + } + public Target getLocalTarget() { return localTarget.getTarget(); } @@ -73,6 +74,14 @@ public String getLocalTargetFilter() { return localTargetFilter != null ? localTargetFilter.pattern() : null; } + public void setLocalTargetFilter(String regExp) { + if (regExp == null || regExp.trim().isEmpty()) { + this.localTargetFilter = null; + } else { + this.localTargetFilter = Pattern.compile(regExp); + } + } + public Pool getPool() { return pool; } @@ -98,14 +107,11 @@ public ConnectionRouter(final String name, final String localTargetFilter, final Cache cache, final Pool pool, - final Policy policy, - KeyTransformer transformer) { + final Policy policy) { this.name = name; this.keyType = keyType; - this.transformer = transformer; - this.keyResolver = new KeyResolver(keyType, targetKeyFilter); this.localTarget = new TargetResult(localTarget); @@ -158,8 +164,11 @@ public TargetResult getTarget(Connection connection, String clientID, String use } public TargetResult getTarget(String key) { + if (policy != null && !KeyResolver.NULL_KEY_VALUE.equals(key)) { + key = policy.transformKey(key); + } - if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) { + if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) { if (logger.isDebugEnabled()) { logger.debug("The " + keyType + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern()); } @@ -216,27 +225,4 @@ public TargetResult getTarget(String key) { return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT; } - - public void setLocalTargetFilter(String regExp) { - if (regExp == null || regExp.trim().isEmpty()) { - this.localTargetFilter = null; - } else { - this.localTargetFilter = Pattern.compile(regExp); - } - } - - public KeyResolver getTargetKeyResolver() { - return keyResolver; - } - - private String transform(String key) { - String result = key; - if (transformer != null) { - result = transformer.transform(key); - if (logger.isDebugEnabled()) { - logger.debug("Key: " + key + ", transformed to " + result); - } - } - return result; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java index 67d6d5581b0..86aea1dc320 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java @@ -42,9 +42,6 @@ import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget; import org.apache.activemq.artemis.core.server.routing.targets.Target; import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory; -import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer; -import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactory; -import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.jboss.logging.Logger; @@ -115,14 +112,8 @@ public void deployConnectionRouter(ConnectionRouterConfiguration config) throws policy = deployPolicy(policyConfiguration, pool); } - KeyTransformer transformer = null; - NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration(); - if (transformerConfiguration != null) { - transformer = deployTransformer(transformerConfiguration); - } - ConnectionRouter connectionRouter = new ConnectionRouter(config.getName(), config.getKeyType(), - config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer); + config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy); connectionRouters.put(connectionRouter.getName(), connectionRouter); @@ -202,16 +193,6 @@ private Policy deployPolicy(NamedPropertyConfiguration policyConfig, Pool pool) return policy; } - private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception { - TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName()); - - KeyTransformer transformer = factory.create(); - - transformer.init(configuration.getProperties()); - - return transformer; - } - public ConnectionRouter getRouter(String name) { return connectionRouters.get(name); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/KeyResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/KeyResolver.java index 6f9637ada00..4df235aa781 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/KeyResolver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/KeyResolver.java @@ -27,7 +27,7 @@ import java.util.regex.Pattern; public class KeyResolver { - public static final String DEFAULT_KEY_VALUE = "DEFAULT"; + public static final String NULL_KEY_VALUE = "NULL"; private static final Logger logger = Logger.getLogger(KeyResolver.class); @@ -120,7 +120,7 @@ public String resolve(Connection connection, String clientID, String username) { } if (keyValue == null) { - keyValue = DEFAULT_KEY_VALUE; + keyValue = NULL_KEY_VALUE; } else if (keyFilter != null) { Matcher keyMatcher = keyFilter.matcher(keyValue); @@ -128,12 +128,17 @@ public String resolve(Connection connection, String clientID, String username) { keyValue = keyMatcher.group(); if (logger.isDebugEnabled()) { - logger.debugf("keyValue with filter %s: %s", keyFilter, keyValue); + logger.debugf("keyValue for %s matches filter %s: %s", key, keyFilter, keyValue); + } + } else { + keyValue = NULL_KEY_VALUE; + + if (logger.isDebugEnabled()) { + logger.debugf("keyValue for %s doesn't matches filter %s", key, keyFilter); } } } - return keyValue; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/ConsistentHashModulo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashModuloPolicy.java similarity index 62% rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/ConsistentHashModulo.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashModuloPolicy.java index f2d96e2f39c..a9af756a2d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/ConsistentHashModulo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashModuloPolicy.java @@ -15,33 +15,40 @@ * limitations under the License. */ -package org.apache.activemq.artemis.core.server.routing.transformer; +package org.apache.activemq.artemis.core.server.routing.policies; -import java.util.Map; +import org.apache.activemq.artemis.core.server.routing.targets.Target; -import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy; -import org.apache.activemq.artemis.core.server.routing.KeyResolver; +import java.util.List; +import java.util.Map; -public class ConsistentHashModulo implements KeyTransformer { +public class ConsistentHashModuloPolicy extends ConsistentHashPolicy { public static final String NAME = "CONSISTENT_HASH_MODULO"; - public static final String MODULO = "modulo"; + + public static final String MODULO = "MODULO"; + int modulo = 0; - @Override - public String transform(String str) { - if (KeyResolver.DEFAULT_KEY_VALUE.equals(str)) { - // we only want to transform resolved keys - return str; - } - if (modulo == 0) { - return str; - } - int hash = ConsistentHashPolicy.getHash(str); - return String.valueOf( hash % modulo ); + public ConsistentHashModuloPolicy() { + super(NAME); } @Override public void init(Map properties) { modulo = Integer.parseInt(properties.get(MODULO)); } + + @Override + public String transformKey(String key) { + if (modulo > 0) { + return String.valueOf(getHash(key) % modulo); + } + + return key; + } + + @Override + public Target selectTarget(List targets, String key) { + return null; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashPolicy.java index d3806ff1532..cd093087fc2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashPolicy.java @@ -31,6 +31,10 @@ public ConsistentHashPolicy() { super(NAME); } + protected ConsistentHashPolicy(String name) { + super(name); + } + @Override public Target selectTarget(List targets, String key) { if (targets.size() > 1) { @@ -56,7 +60,7 @@ public Target selectTarget(List targets, String key) { return null; } - public static int getHash(String str) { + protected int getHash(String str) { final int FNV_INIT = 0x811c9dc5; final int FNV_PRIME = 0x01000193; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/Policy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/Policy.java index 2dfc3a34425..4e425f9cfe4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/Policy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/Policy.java @@ -32,5 +32,11 @@ public interface Policy { void init(Map properties); - Target selectTarget(List targets, String key); + default String transformKey(String key) { + return key; + } + + default Target selectTarget(List targets, String key) { + return null; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/PolicyFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/PolicyFactoryResolver.java index 9c8c031b641..4ebcaec379a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/PolicyFactoryResolver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/policies/PolicyFactoryResolver.java @@ -40,6 +40,7 @@ private PolicyFactoryResolver() { policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy()); policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy()); policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy()); + policyFactories.put(ConsistentHashModuloPolicy.NAME, () -> new ConsistentHashModuloPolicy()); loadPolicyFactories(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/KeyTransformer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/KeyTransformer.java deleted file mode 100644 index d9859ce2f7f..00000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/KeyTransformer.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server.routing.transformer; - -import java.util.Map; - -public interface KeyTransformer { - - default void init(Map properties) { - } - - default String transform(String key) { - return key; - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactory.java deleted file mode 100644 index e347029ddec..00000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server.routing.transformer; - -public interface TransformerFactory { - KeyTransformer create(); -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolver.java deleted file mode 100644 index 75743ce0a74..00000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolver.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.core.server.routing.transformer; - -import java.util.HashMap; -import java.util.Map; -import java.util.ServiceLoader; - -import org.apache.activemq.artemis.core.server.routing.ConnectionRouter; - -public class TransformerFactoryResolver { - private static TransformerFactoryResolver instance; - - public static TransformerFactoryResolver getInstance() { - if (instance == null) { - instance = new TransformerFactoryResolver(); - } - return instance; - } - - private final Map factories = new HashMap<>(); - - private TransformerFactoryResolver() { - factories.put(ConsistentHashModulo.NAME, () -> new ConsistentHashModulo()); - loadFactories(); // let service loader override - } - - public TransformerFactory resolve(String policyName) throws ClassNotFoundException { - TransformerFactory factory = factories.get(policyName); - if (factory == null) { - throw new ClassNotFoundException("No TransformerFactory found for " + policyName); - } - return factory; - } - - private void loadFactories() { - ServiceLoader serviceLoader = ServiceLoader.load( - TransformerFactory.class, ConnectionRouter.class.getClassLoader()); - for (TransformerFactory factory : serviceLoader) { - factories.put(keyFromClassName(factory.getClass().getName()), factory); - } - } - - String keyFromClassName(String name) { - return name.substring(0, name.indexOf("TransformerFactory")); - } -} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 987ae7f94b6..179e0806555 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2163,10 +2163,10 @@ - + - the local target key transformer configuration + the key transformer configuration diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 641f90ac6c2..b59b271b8b1 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; +import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy; import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy; import org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy; import org.apache.activemq.artemis.core.server.routing.policies.LeastConnectionsPolicy; @@ -76,8 +77,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo.MODULO; - public class FileConfigurationTest extends ConfigurationImplTest { @BeforeClass @@ -280,9 +279,8 @@ public void testDefaults() { Assert.assertEquals(bc.getKeyType(), KeyType.CLIENT_ID); Assert.assertNotNull(bc.getLocalTargetFilter()); Assert.assertNotNull(bc.getKeyFilter()); - Assert.assertNull(bc.getPolicyConfiguration()); - Assert.assertNotNull(bc.getTransformerConfiguration()); - Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO)); + Assert.assertNotNull(bc.getPolicyConfiguration()); + Assert.assertNotNull(bc.getPolicyConfiguration().getProperties().get(ConsistentHashModuloPolicy.MODULO)); } else if (bc.getName().equals("simple-router")) { Assert.assertEquals(bc.getKeyType(), KeyType.USER_NAME); Assert.assertNull(bc.getLocalTargetFilter()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java index 0a4efc3ef47..22b72d4c177 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java @@ -17,15 +17,15 @@ package org.apache.activemq.artemis.core.server.routing; -import java.util.HashMap; +import java.util.Collections; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration; import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration; import org.apache.activemq.artemis.core.config.routing.PoolConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy; import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy; -import org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.junit.After; import org.junit.Before; @@ -96,12 +96,10 @@ public void deployLocalOnlyWithPolicy() throws Exception { ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration(); connectionRouterConfiguration.setName("partition-local-consistent-hash").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2)); - NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration(); - policyConfig.setName(ConsistentHashModulo.NAME); - HashMap properties = new HashMap<>(); - properties.put(ConsistentHashModulo.MODULO, String.valueOf(2)); - policyConfig.setProperties(properties); - connectionRouterConfiguration.setTransformerConfiguration(policyConfig); + NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration() + .setName(ConsistentHashModuloPolicy.NAME) + .setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO, String.valueOf(2))); + connectionRouterConfiguration.setPolicyConfiguration(policyConfig); underTest.deployConnectionRouter(connectionRouterConfiguration); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java index 28d27395f18..3f97859255a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java @@ -22,12 +22,11 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy; import org.apache.activemq.artemis.core.server.routing.policies.Policy; -import org.apache.activemq.artemis.core.server.routing.pools.Pool; import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget; import org.apache.activemq.artemis.core.server.routing.targets.Target; import org.apache.activemq.artemis.core.server.routing.targets.TargetResult; -import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -49,26 +48,24 @@ public void setUp() { @Test public void getTarget() { - Pool pool = null; Policy policy = null; underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}", - localTarget, "^FOO.*", null, pool, policy, null); + localTarget, "^FOO.*", null, null, policy); assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget()); assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE")); } @Test public void getLocalTargetWithTransformer() throws Exception { - Pool pool = null; - Policy policy = null; - KeyTransformer keyTransformer = new KeyTransformer() { + Policy policy = new AbstractPolicy("TEST") { @Override - public String transform(String key) { + public String transformKey(String key) { return key.substring("TRANSFORM_TO".length() + 1); } }; + underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}", - localTarget, "^FOO.*", null, pool, policy, keyTransformer); + localTarget, "^FOO.*", null, null, policy); assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget()); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/targets/KeyTypeResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/KeyResolverTest.java similarity index 73% rename from artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/targets/KeyTypeResolverTest.java rename to artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/KeyResolverTest.java index 77f294f6d6c..d6504a944a7 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/targets/KeyTypeResolverTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/KeyResolverTest.java @@ -15,15 +15,13 @@ * limitations under the License. */ -package org.apache.activemq.artemis.core.server.routing.targets; +package org.apache.activemq.artemis.core.server.routing; import javax.security.auth.Subject; import java.util.HashSet; import java.util.Set; -import org.apache.activemq.artemis.core.server.routing.KeyResolver; -import org.apache.activemq.artemis.core.server.routing.KeyType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; @@ -32,7 +30,8 @@ import org.junit.Test; import org.mockito.Mockito; -public class KeyTypeResolverTest { +public class KeyResolverTest { + private static final String UNMATCHED_FILTER = "ARTEMIS"; @Test public void testClientIDKey() { @@ -44,12 +43,17 @@ public void testClientIDKeyWithFilter() { testClientIDKey("TEST", "TEST1234", "^.{4}"); } + @Test + public void testClientIDKeyWithUnmatchedFilter() { + testClientIDKey(KeyResolver.NULL_KEY_VALUE, "TEST1234", UNMATCHED_FILTER); + } + private void testClientIDKey(String expected, String clientID, String filter) { KeyResolver keyResolver = new KeyResolver(KeyType.CLIENT_ID, filter); Assert.assertEquals(expected, keyResolver.resolve(null, clientID, null)); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null)); } @Test @@ -62,6 +66,11 @@ public void testSNIHostKeyWithFilter() { testSNIHostKey("TEST", "TEST1234", "^.{4}"); } + @Test + public void testSNIHostKeyWithUnmatchedFilter() { + testSNIHostKey(KeyResolver.NULL_KEY_VALUE, null, UNMATCHED_FILTER); + } + private void testSNIHostKey(String expected, String sniHost, String filter) { Connection connection = Mockito.mock(Connection.class); @@ -70,10 +79,10 @@ private void testSNIHostKey(String expected, String sniHost, String filter) { Mockito.when(connection.getSNIHostName()).thenReturn(sniHost); Assert.assertEquals(expected, keyResolver.resolve(connection, null, null)); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null)); Mockito.when(connection.getSNIHostName()).thenReturn(null); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null)); } @Test @@ -86,6 +95,11 @@ public void testSourceIPKeyWithFilter() { testSourceIPKey("10", "10.0.0.1:12345", "^[^.]+"); } + @Test + public void testSourceIPKeyWithUnmatchedFilter() { + testSourceIPKey(KeyResolver.NULL_KEY_VALUE, "10.0.0.1:12345", UNMATCHED_FILTER); + } + private void testSourceIPKey(String expected, String remoteAddress, String filter) { Connection connection = Mockito.mock(Connection.class); @@ -94,10 +108,10 @@ private void testSourceIPKey(String expected, String remoteAddress, String filte Mockito.when(connection.getRemoteAddress()).thenReturn(remoteAddress); Assert.assertEquals(expected, keyResolver.resolve(connection, null, null)); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null)); Mockito.when(connection.getRemoteAddress()).thenReturn(null); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null)); } @Test @@ -110,12 +124,17 @@ public void testUserNameKeyWithFilter() { testUserNameKey("TEST", "TEST1234", "^.{4}"); } + @Test + public void testUserNameKeyWithUnmatchedFilter() { + testUserNameKey(KeyResolver.NULL_KEY_VALUE, "TEST1234", UNMATCHED_FILTER); + } + private void testUserNameKey(String expected, String username, String filter) { KeyResolver keyResolver = new KeyResolver(KeyType.USER_NAME, filter); Assert.assertEquals(expected, keyResolver.resolve(null, null, username)); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(null, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(null, null, null)); } @Test @@ -123,23 +142,23 @@ public void testRoleNameKeyWithFilter() throws Exception { KeyResolver keyResolver = new KeyResolver(KeyType.ROLE_NAME, "B"); Connection connection = Mockito.mock(Connection.class); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class); Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection); Subject subject = Mockito.mock(Subject.class); Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); Set rolePrincipals = new HashSet<>(); Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); rolePrincipals.add(new RolePrincipal("A")); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); rolePrincipals.add(new RolePrincipal("B")); @@ -151,19 +170,19 @@ public void testRoleNameKeyWithoutFilter() throws Exception { KeyResolver keyResolver = new KeyResolver(KeyType.ROLE_NAME, null); Connection connection = Mockito.mock(Connection.class); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class); Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection); Subject subject = Mockito.mock(Subject.class); Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); Set rolePrincipals = new ListOrderedSet(); Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals); - Assert.assertEquals(KeyResolver.DEFAULT_KEY_VALUE, keyResolver.resolve(connection, null, null)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, keyResolver.resolve(connection, null, null)); final RolePrincipal roleA = new RolePrincipal("A"); rolePrincipals.add(roleA); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/ConsistentHashModuloTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashModuloPolicyTest.java similarity index 68% rename from artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/ConsistentHashModuloTest.java rename to artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashModuloPolicyTest.java index 9eb16968627..492259df75d 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/ConsistentHashModuloTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/policies/ConsistentHashModuloPolicyTest.java @@ -15,37 +15,37 @@ * limitations under the License. */ -package org.apache.activemq.artemis.core.server.routing.transformer; +package org.apache.activemq.artemis.core.server.routing.policies; import java.util.HashMap; import org.apache.activemq.artemis.core.server.routing.KeyResolver; +import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -public class ConsistentHashModuloTest { +public class ConsistentHashModuloPolicyTest { @Test - public void transform() { - ConsistentHashModulo underTest = new ConsistentHashModulo(); + public void transformKey() { + ConsistentHashModuloPolicy underTest = new ConsistentHashModuloPolicy(); - assertEquals(KeyResolver.DEFAULT_KEY_VALUE, underTest.transform(KeyResolver.DEFAULT_KEY_VALUE)); + Assert.assertEquals(KeyResolver.NULL_KEY_VALUE, underTest.transformKey(KeyResolver.NULL_KEY_VALUE)); - assertEquals("AA", underTest.transform("AA")); // default modulo 0 does nothing + Assert.assertEquals("AA", underTest.transformKey("AA")); // default modulo 0 does nothing HashMap properties = new HashMap<>(); final int modulo = 2; - properties.put(ConsistentHashModulo.MODULO, String.valueOf(modulo)); + properties.put(ConsistentHashModuloPolicy.MODULO, String.valueOf(modulo)); underTest.init(properties); - String hash1 = underTest.transform("AAA"); + String hash1 = underTest.transformKey("AAA"); int v1 = Integer.parseInt(hash1); - String hash2 = underTest.transform("BBB"); + String hash2 = underTest.transformKey("BBB"); int v2 = Integer.parseInt(hash2); assertNotEquals(hash1, hash2); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolverTest.java deleted file mode 100644 index d157ee1313b..00000000000 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/transformer/TransformerFactoryResolverTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server.routing.transformer; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -public class TransformerFactoryResolverTest { - - @Test - public void resolveOk() throws Exception { - TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance(); - assertNotNull(instance.resolve(ConsistentHashModulo.NAME)); - } - - @Test(expected = ClassNotFoundException.class) - public void resolveError() throws Exception { - TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance(); - assertNotNull(instance.resolve("NOT PRESENT")); - } - - @Test - public void keyFromName() throws Exception { - TransformerFactoryResolver instance = TransformerFactoryResolver.getInstance(); - assertEquals("New", instance.keyFromClassName("NewTransformerFactory")); - } -} \ No newline at end of file diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index b64cb12d12b..1adf01baf8c 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -164,9 +164,9 @@ CLIENT_ID ^[^.]+ DEFAULT - - - + + + USER_NAME diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml index c45a0030a82..4f242cb1d30 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml @@ -154,9 +154,9 @@ CLIENT_ID ^[^.]+ DEFAULT - - - + + + USER_NAME diff --git a/docs/user-manual/en/connection-routers.md b/docs/user-manual/en/connection-routers.md index 839f46e3275..274b7fe3d94 100644 --- a/docs/user-manual/en/connection-routers.md +++ b/docs/user-manual/en/connection-routers.md @@ -84,13 +84,15 @@ Let's take a look at a pool example from broker.xml: ``` ## Policies -The policy define how to select a broker from a pool. The included policies are: +The policy defines how to select a broker from a pool and allows [key values](#key-values) transformation. The included policies are: * `FIRST_ELEMENT` to select the first target broker from the pool which is ready. It is useful to select the ready target brokers according to the priority defined with their sequence order, ie supposing there are 2 target brokers this policy selects the second target broker only when the first target broker isn't ready. * `ROUND_ROBIN` to select a target sequentially from a pool, this policy is useful to evenly distribute; * `CONSISTENT_HASH` to select a target by a key. This policy always selects the same target broker for the same key until it is removed from the pool. * `LEAST_CONNECTIONS` to select the targets with the fewest active connections. This policy helps you maintain an equal distribution of active connections with the target brokers. +* `CONSISTENT_HASH_MODULO` to transform a key value to a number from 0 to N-1, it takes a single `modulo` property to configure the bound N. One use case is `CLIENT_ID` + sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id can map exclusively to just one of the brokers. A policy is defined by the `policy` element. Let's take a look at a policy example from broker.xml: ```xml @@ -115,19 +117,14 @@ Let's take a look at a cache example from broker.xml: ``` -## Key transformers -A `local-target-key-transformer` allows key value transformation before matching against any local-target-filter. One use case is -CLIENT_ID sharding across a cluster of N brokers. With a consistent hash % N transformation, each client id -can map exclusively to just one of the brokers. The included transformers are: -* `CONSISTENT_HASH_MODULO` that takes a single `modulo` property to configure the bound. - ## Defining connection routers A connection router is defined by the `connection-router` element, it includes the following items: * the `name` attribute defines the name of the connection router and is used to reference the router from an acceptor; * the `key-type` element defines what type of key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [Keys](#keys) for further details; -* the `key-filter` element defines a regular expression to filter the resolved keys; -* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target; -* the `local-target-key-transformer` element defines a key transformer, see [key transformers](#key-transformers); +* the `key-filter` element defines a regular expression to filter the resolved [key values](#key-values); +* the `local-target-filter` element defines a regular expression to match the [key values](#key-values) + that have to return a local target, the [key value](#key-values) could be equal to the special string `NULL` + if the value of the key is undefined or it doesn't match the `key-filter`; * the `pool` element defines the pool to group the target brokers, see [pools](#pools); * the `policy` element defines the policy used to select the target brokers from the pool, see [policies](#policies). @@ -172,15 +169,23 @@ Let's take a look at some connection router examples from broker.xml: ``` +## Key values +The key value is retrieved from the incoming client connection. +If the incoming client connection has no value for the key type used, the key value is set to the special string `NULL`. +If the incoming client connection has a value for the key type used, the key value retrieved can be sequentially manipulated using a `key-filter` and a `policy`. +If a `key-filter` is defined and the filter fails to match, the value is set to the special string `NULL`. +If a `policy` with a key transformation is defined, the key value is set to the transformed value. + + ## Connection Router Workflow The connection router workflow include the following steps: -* Retrieve the key value from the incoming connection; +* Retrieve the [key value](#key-values) from the incoming connection; * Return the local target broker if the key value matches the local filter; -* Delegate to the pool: +* Delegate to the [pool](#pools): * Return the cached target broker if it is ready; * Get ready/active target brokers from the pool; -* Select one target broker using the policy; -* Add the selected broker in the cache; +* Select one target broker using the [policy](#policies); +* Add the selected broker in the [cache](#cache); * Return the selected broker. Let's take a look at flowchart of the connection router workflow: diff --git a/examples/features/connection-router/evenly-redirect/pom.xml b/examples/features/connection-router/evenly-redirect/pom.xml index 0e877ee090d..735a9155e71 100644 --- a/examples/features/connection-router/evenly-redirect/pom.xml +++ b/examples/features/connection-router/evenly-redirect/pom.xml @@ -132,6 +132,7 @@ under the License. true ${noServer} ${basedir}/target/server0 + $.artemis.internal.router.client.test tcp://localhost:61616 run diff --git a/examples/features/connection-router/evenly-redirect/src/main/resources/activemq/server0/broker.xml b/examples/features/connection-router/evenly-redirect/src/main/resources/activemq/server0/broker.xml index 3767bb80727..dd86b1f60e1 100644 --- a/examples/features/connection-router/evenly-redirect/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/connection-router/evenly-redirect/src/main/resources/activemq/server0/broker.xml @@ -58,7 +58,6 @@ under the License. CLIENT_ID ^.{3} - DEFAULT diff --git a/examples/features/connection-router/symmetric-redirect/pom.xml b/examples/features/connection-router/symmetric-redirect/pom.xml index 86dc3ff4884..24cf9bc3410 100644 --- a/examples/features/connection-router/symmetric-redirect/pom.xml +++ b/examples/features/connection-router/symmetric-redirect/pom.xml @@ -86,6 +86,7 @@ under the License. true ${noServer} ${basedir}/target/server0 + $.artemis.internal.router.client.test tcp://localhost:61616 run @@ -102,6 +103,7 @@ under the License. ${noServer} true ${basedir}/target/server1 + $.artemis.internal.router.client.test tcp://localhost:61617 run diff --git a/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server0/broker.xml b/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server0/broker.xml index 0f39ba03653..1b7dd36346b 100644 --- a/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server0/broker.xml @@ -74,7 +74,6 @@ under the License. CLIENT_ID ^.{3} - DEFAULT guest diff --git a/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server1/broker.xml b/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server1/broker.xml index 16c70676f4c..9e4c1b13301 100644 --- a/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server1/broker.xml +++ b/examples/features/connection-router/symmetric-redirect/src/main/resources/activemq/server1/broker.xml @@ -74,7 +74,6 @@ under the License. CLIENT_ID ^.{3} - DEFAULT guest diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/AutoClientIDShardClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/AutoClientIDShardClusterTest.java index e8b4f304dc6..ab3417e805d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/AutoClientIDShardClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/AutoClientIDShardClusterTest.java @@ -41,8 +41,8 @@ import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManagerFactory; import org.apache.activemq.artemis.core.server.routing.KeyType; import org.apache.activemq.artemis.core.server.routing.KeyResolver; -import org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; import org.apache.activemq.artemis.utils.Wait; @@ -275,13 +275,13 @@ private void addRouterWithClientIdConsistentHashMod() { for (int node = 0; node < numberOfNodes; node++) { Configuration configuration = servers[node].getConfiguration(); ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration().setName(CONNECTION_ROUTER_NAME); - connectionRouterConfiguration.setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(KeyResolver.DEFAULT_KEY_VALUE + "|" + node); - NamedPropertyConfiguration transformerConfig = new NamedPropertyConfiguration(); - transformerConfig.setName(ConsistentHashModulo.NAME); + connectionRouterConfiguration.setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(KeyResolver.NULL_KEY_VALUE + "|" + node); + NamedPropertyConfiguration polocyConfig = new NamedPropertyConfiguration(); + polocyConfig.setName(ConsistentHashModuloPolicy.NAME); HashMap properties = new HashMap<>(); - properties.put(ConsistentHashModulo.MODULO, String.valueOf(numberOfNodes)); - transformerConfig.setProperties(properties); - connectionRouterConfiguration.setTransformerConfiguration(transformerConfig); + properties.put(ConsistentHashModuloPolicy.MODULO, String.valueOf(numberOfNodes)); + polocyConfig.setProperties(properties); + connectionRouterConfiguration.setPolicyConfiguration(polocyConfig); configuration.setConnectionRouters(Collections.singletonList(connectionRouterConfiguration));