Skip to content

Commit

Permalink
ARTEMIS-3640 Use client connectors for HA
Browse files Browse the repository at this point in the history
  • Loading branch information
brusdev authored and gtully committed Mar 9, 2023
1 parent 00cae02 commit bb08a57
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class TransportConfiguration implements Serializable {

private static final long serialVersionUID = -3994528421527392679L;

public static final String NAME_PARAM = "name";

public static final String EXTRA_PROPERTY_PREFIX = "$.EP.";

private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private final ClientProtocolManager clientProtocolManager;

private final TransportConfiguration connectorConfig;
private TransportConfiguration connectorConfig;

private TransportConfiguration previousConnectorConfig;

Expand Down Expand Up @@ -158,6 +158,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C

private final Object connectionReadyLock = new Object();

private final TransportConfiguration[] connectorConfigs;

public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connectorConfig,
final ServerLocatorConfig locatorConfig,
Expand All @@ -171,14 +173,28 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
}

ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
this(serverLocator, connectorConfig,
locatorConfig, reconnectAttempts, threadPool,
scheduledThreadPool, incomingInterceptors, outgoingInterceptors, null);
}

ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration, TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
final int reconnectAttempts,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
final List<Interceptor> outgoingInterceptors) {
final List<Interceptor> outgoingInterceptors,
final TransportConfiguration[] connectorConfigs) {
createTrace = new Exception();

this.serverLocator = serverLocator;
Expand Down Expand Up @@ -238,6 +254,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
if (connectorConfig.getB() != null) {
this.backupConnectorConfig = connectorConfig.getB();
}

this.connectorConfigs = connectorConfigs;
}

@Override
Expand Down Expand Up @@ -1150,6 +1168,20 @@ protected Connection createTransportConnection() {


if (backupConnectorConfig != null) {
//Try to connect with the client connector that match the backup connector name
String backupConnectorName = backupConnectorConfig.getName();
if (backupConnectorName != null && connectorConfigs != null) {
for (TransportConfiguration connectorConfig : connectorConfigs) {
if (backupConnectorName.equals(connectorConfig.getName())) {
//Try to connect with the backup connector configuration
transportConnection = createTransportConnection("backup", connectorConfig);
if (transportConnection != null) {
return transportConnection;
}
}
}
}

//Try to connect with the backup connector configuration
transportConnection = createTransportConnection("backup", backupConnectorConfig);
if (transportConnection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ public ClientSessionFactory createSessionFactory() throws ActiveMQException {
// try each factory in the list until we find one which works

try {
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
factory = new ClientSessionFactoryImpl(this, tc, config, config.reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors, initialConnectors);
try {
addToConnecting(factory);
// We always try to connect here with only one attempt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public static List<TransportConfiguration> getTransportConfigurations(URI uri,
BeanSupport.setData(uri, props, allowableProperties, query, extraProps);
List<TransportConfiguration> transportConfigurations = new ArrayList<>();

TransportConfiguration config = new TransportConfiguration(factoryName, props, name, extraProps);
String nameFromQuery = query.getOrDefault(TransportConfiguration.NAME_PARAM, name);
if (name != null && !name.equals(nameFromQuery)) {
throw new IllegalArgumentException("Name doesn't match query param");
}
TransportConfiguration config = new TransportConfiguration(factoryName, props, nameFromQuery, extraProps);

transportConfigurations.add(config);
String connectors = uri.getFragment();
Expand All @@ -77,8 +81,10 @@ public static List<TransportConfiguration> getTransportConfigurations(URI uri,
HashMap<String, Object> newProps = new HashMap<>();
extraProps = new HashMap<>();
BeanSupport.setData(extraUri, newProps, allowableProperties, query, extraProps);
BeanSupport.setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null), extraProps);
transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(), extraProps));
Map<String, String> extraUriQuery = parseQuery(extraUri.getQuery(), null);
BeanSupport.setData(extraUri, newProps, allowableProperties, extraUriQuery, extraProps);
String extraUriNameFromQuery = extraUriQuery.getOrDefault(TransportConfiguration.NAME_PARAM, name + ":" + extraUri);
transportConfigurations.add(new TransportConfiguration(factoryName, newProps, extraUriNameFromQuery, extraProps));
}
}
return transportConfigurations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,26 @@ public void testParse() throws Exception {
Assert.assertEquals("3", objects.get(2).getParams().get("port"));
}

@Test
public void testParseMultipleConnectorWithName() throws Exception {
ConnectorTransportConfigurationParser parser = new ConnectorTransportConfigurationParser(false);

URI transportURI = parser.expandURI("(tcp://live:1?name=live1,tcp://backupA:2?name=backupA2,tcp://backupB:3?name=backupB3");
System.out.println(transportURI);
List<TransportConfiguration> objects = parser.newObject(transportURI, null);
if (logger.isInfoEnabled()) {
objects.forEach(t -> logger.info("transportConfig: {}", t));
}

Assert.assertEquals(3, objects.size());
Assert.assertEquals("live1", objects.get(0).getName());
Assert.assertEquals("live", objects.get(0).getParams().get("host"));
Assert.assertEquals("1", objects.get(0).getParams().get("port"));
Assert.assertEquals("backupA2", objects.get(1).getName());
Assert.assertEquals("backupA", objects.get(1).getParams().get("host"));
Assert.assertEquals("2", objects.get(1).getParams().get("port"));
Assert.assertEquals("backupB3", objects.get(2).getName());
Assert.assertEquals("backupB", objects.get(2).getParams().get("host"));
Assert.assertEquals("3", objects.get(2).getParams().get("port"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,13 @@ protected TransportConfiguration getNettyConnectorTransportConfiguration(final b
protected static final TransportConfiguration createTransportConfiguration(boolean netty,
boolean acceptor,
Map<String, Object> params) {
return createTransportConfiguration(UUIDGenerator.getInstance().generateStringUUID(), netty, acceptor, params);
}

protected static final TransportConfiguration createTransportConfiguration(String name,
boolean netty,
boolean acceptor,
Map<String, Object> params) {
String className;
if (netty) {
if (acceptor) {
Expand All @@ -1354,7 +1361,7 @@ protected static final TransportConfiguration createTransportConfiguration(boole
}
if (params == null)
params = new HashMap<>();
return new TransportConfiguration(className, params, UUIDGenerator.getInstance().generateStringUUID(), new HashMap<String, Object>());
return new TransportConfiguration(className, params, name, new HashMap<String, Object>());
}

protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException {
Expand Down
41 changes: 41 additions & 0 deletions docs/user-manual/en/ha.md
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,47 @@ If you wish to provide *once and only once* delivery guarantees for non
transacted sessions too, enable duplicate detection, and catch unblock
exceptions as described in [Handling Blocking Calls During Failover](ha.md)

#### Use client connectors to fail over

Apache ActiveMQ Artemis clients retrieve the backup connector from the
topology updates that the cluster brokers send. If the connection options
of the clients don't match the options of the cluster brokers the clients
can define a client connector that will be used in place of the connector
in the topology. To define a client connector it must have a name that matches
the name of the connector defined in the cluster connection of the broker, i.e.
supposing to have a live broker with the cluster connector name `node-0`
and a backup broker with the cluster connector name `node-1` the client
connection url must define 2 connectors with the names `node-0` and `node-1`:

Live broker config
```xml
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="node-0">tcp://localhost:61616</connector>
</connectors>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>node-0</connector-ref>
...
```

Backup broker config
```xml
<connectors>
<!-- Connector used to be announced through cluster connections and notifications -->
<connector name="node-1">tcp://localhost:61617</connector>
</connectors>
<cluster-connections>
<cluster-connection name="my-cluster">
<connector-ref>node-1</connector-ref>
...
```

Client connection url
```
(tcp://localhost:61616?name=node-0,tcp://localhost:61617?name=node-1)?ha=true&reconnectAttempts=-1
```

### Getting Notified of Connection Failure

JMS provides a standard mechanism for getting notified asynchronously of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1957,13 +1957,14 @@ protected void setupClusterConnectionWithBackups(final String name,
throw new IllegalStateException("No server at node " + nodeFrom);
}

TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom);
String connectorName = "node" + nodeFrom;
TransportConfiguration connectorFrom = createTransportConfiguration(connectorName, netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorName, connectorFrom);

List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
Configuration config = serverFrom.getConfiguration();

ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);

config.getClusterConfigurations().add(clusterConf);
}
Expand Down

0 comments on commit bb08a57

Please sign in to comment.