Skip to content

Commit

Permalink
spring-projectsGH-1029: Add option to randomize connection order
Browse files Browse the repository at this point in the history
Resolves spring-projects#1029

Add `shuffleAddresses` to the `CachingConnectionFactory`.
  • Loading branch information
garyrussell committed Jun 20, 2019
1 parent dde7a37 commit a51561c
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 19 deletions.
Expand Up @@ -40,6 +40,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {

private static final String ADDRESSES = "addresses";

private static final String SHUFFLE_ADDRESSES = "shuffle-addresses";

private static final String VIRTUAL_HOST_ATTRIBUTE = "virtual-host";

private static final String USER_ATTRIBUTE = "username";
Expand Down Expand Up @@ -98,6 +100,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
NamespaceUtils.setValueIfAttributeDefined(builder, element, VIRTUAL_HOST_ATTRIBUTE);
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, EXECUTOR_ATTRIBUTE);
NamespaceUtils.setValueIfAttributeDefined(builder, element, ADDRESSES);
NamespaceUtils.setValueIfAttributeDefined(builder, element, SHUFFLE_ADDRESSES);
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_CONFIRMS);
NamespaceUtils.setValueIfAttributeDefined(builder, element, PUBLISHER_RETURNS);
NamespaceUtils.setValueIfAttributeDefined(builder, element, REQUESTED_HEARTBEAT, "requestedHeartBeat");
Expand Down
Expand Up @@ -22,7 +22,9 @@
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -103,7 +105,9 @@ public void handleRecovery(Recoverable recoverable) {

private ExecutorService executorService;

private Address[] addresses;
private List<Address> addresses;

private boolean shuffleAddresses;

private int closeTimeout = DEFAULT_CLOSE_TIMEOUT;

Expand Down Expand Up @@ -281,7 +285,7 @@ public void setAddresses(String addresses) {
if (StringUtils.hasText(addresses)) {
Address[] addressArray = Address.parseAddresses(addresses);
if (addressArray.length > 0) {
this.addresses = addressArray;
this.addresses = Arrays.asList(addressArray);
if (this.publisherConnectionFactory != null) {
this.publisherConnectionFactory.setAddresses(addresses);
}
Expand Down Expand Up @@ -441,6 +445,18 @@ protected String getBeanName() {
return this.beanName;
}

/**
* When {@link #setAddresses(String) addresses} are provided and there is more than
* one, set to true to shuffle the list before opening a new connection so that the
* connection to the broker will be attempted in random order.
* @param shuffleAddresses true to shuffle the list.
* @since 2.1.8
* @see Collections#shuffle(List)
*/
public void setShuffleAddresses(boolean shuffleAddresses) {
this.shuffleAddresses = shuffleAddresses;
}

public boolean hasPublisherConnectionFactory() {
return this.publisherConnectionFactory != null;
}
Expand All @@ -456,12 +472,17 @@ protected final Connection createBareConnection() {

com.rabbitmq.client.Connection rabbitConnection;
if (this.addresses != null) {
List<Address> addressesToConnect = this.addresses;
if (this.shuffleAddresses && addressesToConnect.size() > 1) {
List<Address> list = new ArrayList<>(addressesToConnect);
Collections.shuffle(list);
addressesToConnect = list;
}
if (this.logger.isInfoEnabled()) {
this.logger.info("Attempting to connect to: " + Arrays.toString(this.addresses));
this.logger.info("Attempting to connect to: " + addressesToConnect);
}
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, this.addresses,
rabbitConnection = this.rabbitConnectionFactory.newConnection(this.executorService, addressesToConnect,
connectionName);

}
else {
if (this.logger.isInfoEnabled()) {
Expand Down
Expand Up @@ -1384,6 +1384,14 @@
<xsd:annotation>
<xsd:documentation><![CDATA[
List of addresses; e.g. host1,host2:4567,host3 - overrides host/port if supplied.
Connection will be attempted in order unless 'shuffle-addresses' is 'true'.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="shuffle-addresses" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation><![CDATA[
Set to true when 'addresses' has more than one address to shuffle the list into a random order.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Expand Up @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.junit.Before;
Expand Down Expand Up @@ -110,14 +111,16 @@ public void testMultiHost() throws Exception {
assertThat(connectionFactory).isNotNull();
assertThat(connectionFactory.getChannelCacheSize()).isEqualTo(10);
DirectFieldAccessor dfa = new DirectFieldAccessor(connectionFactory);
Address[] addresses = (Address[]) dfa.getPropertyValue("addresses");
assertThat(addresses.length).isEqualTo(3);
assertThat(addresses[0].getHost()).isEqualTo("host1");
assertThat(addresses[0].getPort()).isEqualTo(1234);
assertThat(addresses[1].getHost()).isEqualTo("host2");
assertThat(addresses[1].getPort()).isEqualTo(-1);
assertThat(addresses[2].getHost()).isEqualTo("host3");
assertThat(addresses[2].getPort()).isEqualTo(4567);
@SuppressWarnings("unchecked")
List<Address> addresses = (List<Address>) dfa.getPropertyValue("addresses");
assertThat(addresses).hasSize(3);
assertThat(addresses.get(0).getHost()).isEqualTo("host1");
assertThat(addresses.get(0).getPort()).isEqualTo(1234);
assertThat(addresses.get(1).getHost()).isEqualTo("host2");
assertThat(addresses.get(1).getPort()).isEqualTo(-1);
assertThat(addresses.get(2).getHost()).isEqualTo("host3");
assertThat(addresses.get(2).getPort()).isEqualTo(4567);
assertThat(dfa.getPropertyValue("shuffleAddresses")).isEqualTo(Boolean.TRUE);
assertThat(TestUtils.getPropertyValue(connectionFactory,
"rabbitConnectionFactory.threadFactory")).isSameAs(beanFactory.getBean("tf"));
}
Expand Down
Expand Up @@ -19,12 +19,12 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
Expand Down Expand Up @@ -61,10 +61,13 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.commons.logging.Log;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;

import org.springframework.amqp.AmqpConnectException;
Expand Down Expand Up @@ -1639,7 +1642,7 @@ public void setAddressesOneHost() throws Exception {
ccf.createConnection();
verify(mock).isAutomaticRecoveryEnabled();
verify(mock)
.newConnection(isNull(), aryEq(new Address[] { new Address("mq1") }), anyString());
.newConnection(isNull(), eq(Collections.singletonList(new Address("mq1"))), anyString());
verifyNoMoreInteractions(mock);
}

Expand All @@ -1653,7 +1656,7 @@ public void setAddressesTwoHosts() throws Exception {
verify(mock).isAutomaticRecoveryEnabled();
verify(mock).setAutomaticRecoveryEnabled(false);
verify(mock).newConnection(isNull(),
aryEq(new Address[] { new Address("mq1"), new Address("mq2") }), anyString());
eq(Arrays.asList(new Address("mq1"), new Address("mq2"))), anyString());
verifyNoMoreInteractions(mock);
}

Expand Down Expand Up @@ -1810,4 +1813,34 @@ public void testFirstConnectionDoesntWait() throws IOException, TimeoutException
assertThat(System.currentTimeMillis() - t1).isLessThan(30_000);
}

@SuppressWarnings("unchecked")
@Test
public void testShuffle() throws IOException, TimeoutException {
com.rabbitmq.client.ConnectionFactory mockConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
Channel mockChannel = mock(Channel.class);

given(mockConnectionFactory.newConnection((ExecutorService) isNull(), any(List.class), anyString()))
.willReturn(mockConnection);
given(mockConnection.createChannel()).willReturn(mockChannel);
given(mockChannel.isOpen()).willReturn(true);
given(mockConnection.isOpen()).willReturn(true);

CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
ccf.setCacheMode(CacheMode.CONNECTION);
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setShuffleAddresses(true);
IntStream.range(0, 100).forEach(i -> ccf.createConnection());
ccf.destroy();
ArgumentCaptor<List<Address>> captor = ArgumentCaptor.forClass(List.class);
verify(mockConnectionFactory, times(100)).newConnection(isNull(), captor.capture(), anyString());
List<String> firstAddress = captor.getAllValues()
.stream()
.map(addresses -> addresses.get(0).getHost())
.distinct()
.sorted()
.collect(Collectors.toList());
assertThat(firstAddress).containsExactly("host1", "host2", "host3");
}

}
Expand Up @@ -36,7 +36,7 @@
<bean id="execService" class="java.util.concurrent.Executors" factory-method="newSingleThreadExecutor" />

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
thread-factory="tf" shuffle-addresses="true"
channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
Expand Down
39 changes: 37 additions & 2 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -356,10 +356,12 @@ Alternatively, if running in a clustered environment, you can use the addresses
[source,xml]
----
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672"/>
id="connectionFactory" addresses="host1:5672,host2:5672" shuffle-addresses="true"/>
----
====

See <<cluster>> for information about `shuffle-addresses`.

The following example with a custom thread factory that prefixes thread names with `rabbitmq-`:

====
Expand Down Expand Up @@ -521,6 +523,39 @@ If you wish to skip this validation for some reason, set the factory bean's `ski
Starting with version 2.1, the `RabbitConnectionFactoryBean` now calls `enableHostnameVerification()` by default.
To revert to the previous behavior, set the `enableHostnameVerification` property to `false`.

[[cluster]]
===== Connecting to a Cluster

To connect to a cluster, configure the `addresses` property on the `CachingConnectionFactory`:

====
[source, java]
----
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
----
====

The underlying connection factory will attempt to connect to each host, in order, whenever a new connection is established.
Starting with version 2.1.8, the connection order can be made random by setting the `shuffleAddresses` property to true; the shuffle will be applied before creating any new connection.

====
[source, java]
----
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setShuffleAddresses(true);
return ccf;
}
----
====

[[routing-connection-factory]]
===== Routing Connection Factory

Expand Down Expand Up @@ -592,7 +627,7 @@ For example, with lookup key qualifier `thing1` and a container listening to que

When using HA queues in a cluster, for the best performance, you may want to connect to the physical broker
where the master queue resides.
While the `CachingConnectionFactory` can be configured with multiple broker addresses.
The `CachingConnectionFactory` can be configured with multiple broker addresses.
This is to fail over and the client attempts to connect in order.
The `LocalizedQueueConnectionFactory` uses the REST API provided by the admin plugin to determine on which node the queue is mastered.
It then creates (or retrieves from a cache) a `CachingConnectionFactory` that connects to just that node.
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Expand Up @@ -41,3 +41,9 @@ See <<logging>> for more information.

The `MessageListenerAdapter` provides now a new `buildListenerArguments(Object, Channel, Message)` method to build an array of arguments to be passed into target listener and an old one is deprecated.
See <<message-listener-adapter>> for more information.

===== Connection Factory Changes

The `CachingConnectionFactory` has a new property `shuffleAddresses`.
When providing a list of broker node addresses, the list will be shuffled before creating a connection so that the order in which the connections are attempted is random.
See <<cluster>> for more information.

0 comments on commit a51561c

Please sign in to comment.