Skip to content

Commit

Permalink
ARTEMIS-2937 Broker connection improvements
Browse files Browse the repository at this point in the history
- Adding a paragraph about addressing and distinct queue names
- Renaming match on peers, senders and receivers as "address-match"
- Changing qpid dispatch test to use a single listener
- Fixing reconnect attemps message
  • Loading branch information
clebertsuconic committed Oct 29, 2020
1 parent 5257c08 commit dc7eb5c
Show file tree
Hide file tree
Showing 12 changed files with 70 additions and 80 deletions.
Expand Up @@ -289,13 +289,13 @@ public void retryConnection() {
if (bridgeManager.isStarted() && started) {
if (brokerConnectConfiguration.getReconnectAttempts() < 0 || retryCounter < brokerConnectConfiguration.getReconnectAttempts()) {
retryCounter++;
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(brokerConnectConfiguration.getName(), host, port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection(brokerConnectConfiguration.getName(), host + ":" + port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
if (logger.isDebugEnabled()) {
logger.debug("Reconnecting in " + brokerConnectConfiguration.getRetryInterval() + ", this is the " + retryCounter + " of " + brokerConnectConfiguration.getReconnectAttempts());
}
reconnectFuture = scheduledExecutorService.schedule(() -> connectExecutor.execute(() -> doConnect()), brokerConnectConfiguration.getRetryInterval(), TimeUnit.MILLISECONDS);
} else {
ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(brokerConnectConfiguration.getName(), host, port, retryCounter, brokerConnectConfiguration.getReconnectAttempts());
ActiveMQAMQPProtocolLogger.LOGGER.retryConnectionFailed(brokerConnectConfiguration.getName(), host + ":" + port, retryCounter);
if (logger.isDebugEnabled()) {
logger.debug("no more reconnections as the retry counter reached " + retryCounter + " out of " + brokerConnectConfiguration.getReconnectAttempts());
}
Expand Down
Expand Up @@ -51,13 +51,13 @@ public interface ActiveMQAMQPProtocolLogger extends BasicLogger {

@LogMessage(level = Logger.Level.WARN)
@Message(id = 111001, value = "\n*******************************************************************************************************************************" +
"\nCould not re-establish AMQP Server Connection {0} on {1}:{2} after {3} retries with a total configured of {4}" +
"\nCould not re-establish AMQP Server Connection {0} on {1} after {2} retries" +
"\n*******************************************************************************************************************************\n", format = Message.Format.MESSAGE_FORMAT)
void retryConnectionFailed(String name, String host, int port, int currentRetry, int maxRetry);
void retryConnectionFailed(String name, String hostAndPort, int currentRetry);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 111002, value = "\n*******************************************************************************************************************************" +
"\nRetrying Server AMQP Connection {0} on {1}:{2} retry {3} of {4}" +
"\nRetrying Server AMQP Connection {0} on {1} retry {2} of {3}" +
"\n*******************************************************************************************************************************\n", format = Message.Format.MESSAGE_FORMAT)
void retryConnection(String name, String host, int port, int currentRetry, int maxRetry);
void retryConnection(String name, String hostAndPort, int currentRetry, int maxRetry);
}
Expand Up @@ -1911,7 +1911,7 @@ private void parseAMQPBrokerConnections(final Element e,
connectionElement = amqpMirrorConnectionElement;
connectionElement.setType(AMQPBrokerConnectionAddressType.MIRROR);
} else {
String match = getAttributeValue(e2, "match");
String match = getAttributeValue(e2, "address-match");
String queue = getAttributeValue(e2, "queue-name");
connectionElement = new AMQPBrokerConnectionElement();
connectionElement.setMatchAddress(SimpleString.toSimpleString(match)).setType(nodeType);
Expand Down
27 changes: 9 additions & 18 deletions artemis-server/src/main/resources/schema/artemis-configuration.xsd
Expand Up @@ -972,7 +972,7 @@
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
pattern for matching security roles against addresses; can use wildards
pattern for matching security roles against addresses; can use wildcards
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down Expand Up @@ -2075,20 +2075,20 @@
</xsd:complexType>

<xsd:complexType name="amqp-address-match-type">
<xsd:attribute name="match" type="xsd:string" use="optional">
<xsd:attribute name="address-match" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queue-name" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
This is the exact queue name to be used.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:annotation>
<xsd:documentation>
This is the exact queue name to be used.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="amqp-mirror-type">
<xsd:annotation>
Expand All @@ -2097,15 +2097,6 @@
All events will be send towards this AMQP connection acting like a replica.
</xsd:documentation>
</xsd:annotation>
<!--
TODO: comment this out when we start supporting matching on mirror.
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
</xsd:documentation>
</xsd:annotation>
</xsd:attribute> -->

<xsd:attribute name="message-acknowledgements" type="xsd:boolean" use="optional" default="true">
<xsd:annotation>
Expand Down Expand Up @@ -3871,7 +3862,7 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>

</xsd:all>

<xsd:attribute name="match" type="xsd:string" use="required">
Expand Down
Expand Up @@ -374,9 +374,9 @@
</cluster-connections>
<broker-connections>
<amqp-connection uri="tcp://test1:111" name="test1" retry-interval="333" reconnect-attempts="33" user="testuser" password="testpassword">
<sender match="TEST-SENDER" />
<receiver match="TEST-RECEIVER" />
<peer match="TEST-PEER"/>
<sender address-match="TEST-SENDER" />
<receiver address-match="TEST-RECEIVER" />
<peer address-match="TEST-PEER"/>
<receiver queue-name="TEST-WITH-QUEUE-NAME"/>
<mirror message-acknowledgements="false" queue-creation="false" source-mirror-address="TEST-REPLICA" queue-removal="false"/>
</amqp-connection>
Expand Down
32 changes: 11 additions & 21 deletions artemis-tools/src/test/resources/artemis-configuration.xsd
Expand Up @@ -614,7 +614,7 @@
<xsd:annotation>
<xsd:documentation>
A list of connections the broker will make towards other servers.
Currently the only connection type supported is amqp-connection
Currently the only connection type supported is amqpConnection
</xsd:documentation>
</xsd:annotation>
</xsd:element>
Expand Down Expand Up @@ -972,7 +972,7 @@
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
pattern for matching security roles against addresses; can use wildards
pattern for matching security roles against addresses; can use wildcards
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down Expand Up @@ -2028,6 +2028,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="auto-start" type="xsd:boolean" default="true">
<xsd:annotation>
<xsd:documentation>
should the broker connection be started when the server is started.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="reconnect-attempts" type="xsd:int" default="-1">
<xsd:annotation>
<xsd:documentation>
Expand Down Expand Up @@ -2068,7 +2075,7 @@
</xsd:complexType>

<xsd:complexType name="amqp-address-match-type">
<xsd:attribute name="match" type="xsd:string" use="optional">
<xsd:attribute name="address-match" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
Expand All @@ -2090,15 +2097,6 @@
All events will be send towards this AMQP connection acting like a replica.
</xsd:documentation>
</xsd:annotation>
<!--
TODO: comment this out when we start supporting matching on mirror.
<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
address expression to match addresses
</xsd:documentation>
</xsd:annotation>
</xsd:attribute> -->

<xsd:attribute name="message-acknowledgements" type="xsd:boolean" use="optional" default="true">
<xsd:annotation>
Expand Down Expand Up @@ -3865,20 +3863,12 @@
</xsd:annotation>
</xsd:element>

<xsd:element name="page-store-name" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the name of the page store to use, to allow the page store to coalesce for address hierarchies when wildcard routing is in play
</xsd:documentation>
</xsd:annotation>
</xsd:element>

</xsd:all>

<xsd:attribute name="match" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
pattern for matching settings against addresses; can use wildcards
pattern for matching settings against addresses; can use wildards
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
52 changes: 36 additions & 16 deletions docs/user-manual/en/amqp-broker-connections.md
Expand Up @@ -34,7 +34,7 @@ To define an AMQP broker connection, add an `<amqp-connection>` element within t
- `reconnect-attempts`: default is -1 meaning infinite
- `auto-start` : Should the broker connection start automatically with the broker. Default is `true`. If false you need to call a management operation to start it.

*Notice*: If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController.
*Notice:* If you disable auto-start on the broker connection, the start of the broker connection will only happen after the management method `startBrokerConnection(connectionName)` is called on the ServerController.

*Important*: The target endpoint needs permission for all operations that you configure. Therefore, If you are using a security manager, ensure that you perform the configured operations as a user with sufficient permissions.

Expand All @@ -61,7 +61,7 @@ Both elements work like a message bridge. However, there is no additional overhe

You can configure senders or receivers for specific queues. You can also match senders and receivers to specific addresses or _sets_ of addresses, using wildcard expressions. When configuring a sender or receiver, you can set the following properties:

- `match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression
- `address-match`: Match the sender or receiver to a specific address or __set__ of addresses, using a wildcard expression
- `queue-name`: Configure the sender or receiver for a specific queue


Expand All @@ -71,9 +71,9 @@ Using address expressions:
```xml
<broker-connections>
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
<sender match="queues.#"/>
<sender address-match="queues.#"/>
<!-- notice the local queues for remotequeues.# need to be created on this broker -->
<receiver match="remotequeues.#"/>
<receiver address-match="remotequeues.#"/>
</amqp-connection>
</broker-connections>

Expand All @@ -90,7 +90,6 @@ Using address expressions:
</address>
</addresses>
```

Using queue names:
```xml
<broker-connections>
Expand All @@ -112,14 +111,13 @@ Using queue names:
</anycast>
</address>
</addresses>

```
*Important*: You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses.
*Important:* You can match a receiver only to a local queue that already exists. Therefore, if you are using receivers, make sure that you pre-create the queue locally. Otherwise, the broker cannot match the remote queues and addresses.

*Important*: Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives.
*Important:* Do not create a sender and a receiver to the same destination. This creates an infinite loop of sends and receives.


# Peers
## Peers
The broker can be configured as a peer which connects to the [Apache Qpid Dispatch Router](https://qpid.apache.org/components/dispatch-router/) and instructs it the broker it will act as a store-and-forward queue for a given AMQP waypoint address configured on the router. In this scenario, clients connect to a router to send and receive messages using a waypointed address, and the router routes these messages to or from the queue on the broker.

The peer configuration causes ActiveMQ Artemis to create a sender and receiver pair for each destination matched in the broker-connection configuration, with these carrying special configuration to let Qpid Dispatch know to collaborate with the broker. This replaces the traditional need of a router-initiated connection and auto-links.
Expand All @@ -130,7 +128,7 @@ With a peer configuration, you have the same properties that you have on a sende
```xml
<broker-connections>
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-router">
<peer match="queues.#"/>
<peer address-match="queues.#"/>
</amqp-connection>
</broker-connections>

Expand Down Expand Up @@ -163,7 +161,29 @@ For more information refer to the "brokered messaging" documentation for [Apache

*Important:* You do not need to configure the router with a connector or auto-links to communicate with the broker. The brokers peer configuration replaces these aspects of the router waypoint usage.

# Mirror
## Address Consideration
It is highly recommended that you keep `address name` and `queue name` the same, as when you use a queue with its distinct name (as in the following example), senders and receivers will always use the `address name` when creating the remote endpoint.

```xml
<broker-connections>
<amqp-connection uri="tcp://MY_HOST:MY_PORT" name="my-broker">
<sender address-match="queues.#"/>
</amqp-connection>
</broker-connections>
<addresses>
<address name="queues.A">
<anycast>
<queue name="distinctNameQueue.A"/>
</anycast>
</address>
</addresses>
```

In the above example the `broker connection` would create an AMQP sender towards "queues.A".

*Important:* To avoid confusion it is recommended that you keep the `address name` and `queue name` the same.

## Mirror
The mirror option on the broker connection can capture events from the broker and pass them over the wire to another broker. This enables you to capture multiple asynchronous replicas. The following types of events are captured:

* Message routing
Expand Down Expand Up @@ -195,15 +215,15 @@ An example of a mirror configuration is shown below:

*Important*: A broker can mirror to multiple replicas (1 to many). However a replica broker can only have a single mirror source. Make sure you do not mirror multiple source brokers to a single replica broker.

## Pre existing messages
### Pre Existing Messages
The broker will not send pre existing messages through the mirror. So, If you add mirror to your configuration and the journal had pre existing messages these messages will not be sent.

## Broker Connection Stop and Disconnect
Once you start the broker connection with a mirror the mirror events will always be sent to the intermediate queue configured at the `source-mirror-address`.

It is possible to stop the broker connection with the operation stopBrokerConnection(connectionName) on the ServerControl, but it is only effective to disconnect the brokers, while the mirror events are always captured.

## Disaster & Recovery considerations
## Disaster & Recovery Considerations
As you use the mirror option to replicate data across datacenters, you have to take a few considerations:

* Currently we don't support quorums for activating the replica, so you have to manually control when your clients connect to the replica site.
Expand Down Expand Up @@ -241,17 +261,17 @@ On the replicaBroker, add a disabled broker connection for failing back after a
<acceptor name="amqp">tcp://0.0.0.0:6700?autoStart=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true;autoStart=false</acceptor>
</acceptors>
<broker-connections>
<amqp-connection uri="tcp://sourceBroker:6700" name="sourceBroker" auto-start="false">
<amqp-connection uri="tcp://sourceBroker:6700" name="failbackBroker" auto-start="false">
<mirror message-acknowledgements="true"/>
</amqp-connection>
</broker-connections>
```

After a failure has occurred, you can use a management operation start on the acceptor:

- AccetorControl.start();
- AcceptorControl.start();

And you can call startBrokerConnection to enable the failback towards the live site:

- ActiveMQServerControl.startBrokerConnection("sourceBroker")
- ActiveMQServerControl.startBrokerConnection("failbackBroker")

Expand Up @@ -54,7 +54,7 @@ under the License.
<broker-connections>
<amqp-connection uri="tcp://localhost:5672" name="receiver" retry-interval="100">
<!-- This will create one receiver for every queue matching this address expression -->
<receiver match="#"/>
<receiver address-match="#"/>
</amqp-connection>
</broker-connections>

Expand Down
Expand Up @@ -54,7 +54,7 @@ under the License.
<broker-connections>
<amqp-connection uri="tcp://localhost:5771" name="sender" retry-interval="100">
<!-- This will create one sender for every queue matching this address expression -->
<sender match="#"/>
<sender address-match="#"/>
</amqp-connection>
</broker-connections>

Expand Down
Expand Up @@ -37,7 +37,7 @@ under the License.

<broker-connections>
<amqp-connection uri="tcp://localhost:5772?sslEnabled=true;trustStorePath=activemq.example.truststore;trustStorePassword=activemqexample" name="otherSSL" retry-interval="1000">
<sender match="#"/>
<sender address-match="#"/>
</amqp-connection>
</broker-connections>

Expand Down

0 comments on commit dc7eb5c

Please sign in to comment.