Skip to content

Commit

Permalink
ARTEMIS-3708 Collapse key transformer into policy
Browse files Browse the repository at this point in the history
  • Loading branch information
brusdev authored and clebertsuconic committed Mar 21, 2022
1 parent 2a26e46 commit 603462a
Show file tree
Hide file tree
Showing 29 changed files with 165 additions and 339 deletions.
Expand Up @@ -29,7 +29,6 @@ public class ConnectionRouterConfiguration implements Serializable {
private CacheConfiguration cacheConfiguration = null;
private PoolConfiguration poolConfiguration = null;
private NamedPropertyConfiguration policyConfiguration = null;
private NamedPropertyConfiguration transformerConfiguration = null;

public String getName() {
return name;
Expand Down Expand Up @@ -93,12 +92,4 @@ public ConnectionRouterConfiguration setPoolConfiguration(PoolConfiguration pool
this.poolConfiguration = poolConfiguration;
return this;
}

public void setTransformerConfiguration(NamedPropertyConfiguration configuration) {
this.transformerConfiguration = configuration;
}

public NamedPropertyConfiguration getTransformerConfiguration() {
return transformerConfiguration;
}
}
Expand Up @@ -94,7 +94,6 @@
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
Expand Down Expand Up @@ -2686,10 +2685,6 @@ private void parseConnectionRouterConfiguration(final Element e, final Configura
poolConfiguration = new PoolConfiguration();
parsePoolConfiguration((Element) child, config, poolConfiguration);
connectionRouterConfiguration.setPoolConfiguration(poolConfiguration);
} else if (child.getNodeName().equals("local-target-key-transformer")) {
policyConfiguration = new NamedPropertyConfiguration();
parseTransformerConfiguration((Element) child, policyConfiguration);
connectionRouterConfiguration.setTransformerConfiguration(policyConfiguration);
}
}

Expand All @@ -2704,16 +2699,6 @@ private void parseCacheConfiguration(final Element e, final CacheConfiguration c
cacheConfiguration.getTimeout(), Validators.GE_ZERO));
}

private void parseTransformerConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");

TransformerFactoryResolver.getInstance().resolve(name);

policyConfiguration.setName(name);

policyConfiguration.setProperties(getMapOfChildPropertyElements(e));
}

private void parsePolicyConfiguration(final Element e, final NamedPropertyConfiguration policyConfiguration) throws ClassNotFoundException {
String name = e.getAttribute("name");

Expand Down
Expand Up @@ -119,12 +119,12 @@ public String getLocalTargetFilter() {

@Override
public void setTargetKeyFilter(String regExp) {
connectionRouter.getTargetKeyResolver().setKeyFilter(regExp);
connectionRouter.getKeyResolver().setKeyFilter(regExp);
}

@Override
public String getTargetKeyFilter() {
return connectionRouter.getTargetKeyResolver().getKeyFilter();
return connectionRouter.getKeyResolver().getKeyFilter();
}

@Override
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.apache.activemq.artemis.core.server.routing.pools.Pool;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;

Expand All @@ -51,8 +50,6 @@ public class ConnectionRouter implements ActiveMQComponent {

private final Policy policy;

private final KeyTransformer transformer;

private final Cache cache;

private volatile boolean started = false;
Expand All @@ -61,10 +58,14 @@ public String getName() {
return name;
}

public KeyType getTargetKey() {
public KeyType getKey() {
return keyType;
}

public KeyResolver getKeyResolver() {
return keyResolver;
}

public Target getLocalTarget() {
return localTarget.getTarget();
}
Expand All @@ -73,6 +74,14 @@ public String getLocalTargetFilter() {
return localTargetFilter != null ? localTargetFilter.pattern() : null;
}

public void setLocalTargetFilter(String regExp) {
if (regExp == null || regExp.trim().isEmpty()) {
this.localTargetFilter = null;
} else {
this.localTargetFilter = Pattern.compile(regExp);
}
}

public Pool getPool() {
return pool;
}
Expand All @@ -98,14 +107,11 @@ public ConnectionRouter(final String name,
final String localTargetFilter,
final Cache cache,
final Pool pool,
final Policy policy,
KeyTransformer transformer) {
final Policy policy) {
this.name = name;

this.keyType = keyType;

this.transformer = transformer;

this.keyResolver = new KeyResolver(keyType, targetKeyFilter);

this.localTarget = new TargetResult(localTarget);
Expand Down Expand Up @@ -158,8 +164,11 @@ public TargetResult getTarget(Connection connection, String clientID, String use
}

public TargetResult getTarget(String key) {
if (policy != null && !KeyResolver.NULL_KEY_VALUE.equals(key)) {
key = policy.transformKey(key);
}

if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) {
if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
if (logger.isDebugEnabled()) {
logger.debug("The " + keyType + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
}
Expand Down Expand Up @@ -216,27 +225,4 @@ public TargetResult getTarget(String key) {

return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}

public void setLocalTargetFilter(String regExp) {
if (regExp == null || regExp.trim().isEmpty()) {
this.localTargetFilter = null;
} else {
this.localTargetFilter = Pattern.compile(regExp);
}
}

public KeyResolver getTargetKeyResolver() {
return keyResolver;
}

private String transform(String key) {
String result = key;
if (transformer != null) {
result = transformer.transform(key);
if (logger.isDebugEnabled()) {
logger.debug("Key: " + key + ", transformed to " + result);
}
}
return result;
}
}
Expand Up @@ -42,9 +42,6 @@
import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactory;
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -115,14 +112,8 @@ public void deployConnectionRouter(ConnectionRouterConfiguration config) throws
policy = deployPolicy(policyConfiguration, pool);
}

KeyTransformer transformer = null;
NamedPropertyConfiguration transformerConfiguration = config.getTransformerConfiguration();
if (transformerConfiguration != null) {
transformer = deployTransformer(transformerConfiguration);
}

ConnectionRouter connectionRouter = new ConnectionRouter(config.getName(), config.getKeyType(),
config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer);
config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy);

connectionRouters.put(connectionRouter.getName(), connectionRouter);

Expand Down Expand Up @@ -202,16 +193,6 @@ private Policy deployPolicy(NamedPropertyConfiguration policyConfig, Pool pool)
return policy;
}

private KeyTransformer deployTransformer(NamedPropertyConfiguration configuration) throws Exception {
TransformerFactory factory = TransformerFactoryResolver.getInstance().resolve(configuration.getName());

KeyTransformer transformer = factory.create();

transformer.init(configuration.getProperties());

return transformer;
}

public ConnectionRouter getRouter(String name) {
return connectionRouters.get(name);
}
Expand Down
Expand Up @@ -27,7 +27,7 @@
import java.util.regex.Pattern;

public class KeyResolver {
public static final String DEFAULT_KEY_VALUE = "DEFAULT";
public static final String NULL_KEY_VALUE = "NULL";


private static final Logger logger = Logger.getLogger(KeyResolver.class);
Expand Down Expand Up @@ -120,20 +120,25 @@ public String resolve(Connection connection, String clientID, String username) {
}

if (keyValue == null) {
keyValue = DEFAULT_KEY_VALUE;
keyValue = NULL_KEY_VALUE;
} else if (keyFilter != null) {
Matcher keyMatcher = keyFilter.matcher(keyValue);

if (keyMatcher.find()) {
keyValue = keyMatcher.group();

if (logger.isDebugEnabled()) {
logger.debugf("keyValue with filter %s: %s", keyFilter, keyValue);
logger.debugf("keyValue for %s matches filter %s: %s", key, keyFilter, keyValue);
}
} else {
keyValue = NULL_KEY_VALUE;

if (logger.isDebugEnabled()) {
logger.debugf("keyValue for %s doesn't matches filter %s", key, keyFilter);
}
}
}


return keyValue;
}

Expand Down
Expand Up @@ -15,33 +15,40 @@
* limitations under the License.
*/

package org.apache.activemq.artemis.core.server.routing.transformer;
package org.apache.activemq.artemis.core.server.routing.policies;

import java.util.Map;
import org.apache.activemq.artemis.core.server.routing.targets.Target;

import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
import java.util.List;
import java.util.Map;

public class ConsistentHashModulo implements KeyTransformer {
public class ConsistentHashModuloPolicy extends ConsistentHashPolicy {
public static final String NAME = "CONSISTENT_HASH_MODULO";
public static final String MODULO = "modulo";

public static final String MODULO = "MODULO";

int modulo = 0;

@Override
public String transform(String str) {
if (KeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
// we only want to transform resolved keys
return str;
}
if (modulo == 0) {
return str;
}
int hash = ConsistentHashPolicy.getHash(str);
return String.valueOf( hash % modulo );
public ConsistentHashModuloPolicy() {
super(NAME);
}

@Override
public void init(Map<String, String> properties) {
modulo = Integer.parseInt(properties.get(MODULO));
}

@Override
public String transformKey(String key) {
if (modulo > 0) {
return String.valueOf(getHash(key) % modulo);
}

return key;
}

@Override
public Target selectTarget(List<Target> targets, String key) {
return null;
}
}
Expand Up @@ -31,6 +31,10 @@ public ConsistentHashPolicy() {
super(NAME);
}

protected ConsistentHashPolicy(String name) {
super(name);
}

@Override
public Target selectTarget(List<Target> targets, String key) {
if (targets.size() > 1) {
Expand All @@ -56,7 +60,7 @@ public Target selectTarget(List<Target> targets, String key) {
return null;
}

public static int getHash(String str) {
protected int getHash(String str) {
final int FNV_INIT = 0x811c9dc5;
final int FNV_PRIME = 0x01000193;

Expand Down
Expand Up @@ -32,5 +32,11 @@ public interface Policy {

void init(Map<String, String> properties);

Target selectTarget(List<Target> targets, String key);
default String transformKey(String key) {
return key;
}

default Target selectTarget(List<Target> targets, String key) {
return null;
}
}
Expand Up @@ -40,6 +40,7 @@ private PolicyFactoryResolver() {
policyFactories.put(FirstElementPolicy.NAME, () -> new FirstElementPolicy());
policyFactories.put(LeastConnectionsPolicy.NAME, () -> new LeastConnectionsPolicy());
policyFactories.put(RoundRobinPolicy.NAME, () -> new RoundRobinPolicy());
policyFactories.put(ConsistentHashModuloPolicy.NAME, () -> new ConsistentHashModuloPolicy());

loadPolicyFactories();
}
Expand Down

This file was deleted.

0 comments on commit 603462a

Please sign in to comment.