From bb5804ea6dc2e8281d88e8a786626c363618b882 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 11 Oct 2022 11:09:51 -0400 Subject: [PATCH] GH-1419: Remove RabbitMQ http-client Usage Resolves https://github.com/spring-projects/spring-amqp/issues/1419 Use Spring WebFlux instead, while allowing the user to choose some other technology in the `LocalizedQueueConnectionFactory`. * Rename DefaultNodeLocator; add generics. * Remove unnecessary dependencies. GH-1419: Add RestTemplateNodeLocator - also remove hard dependency on `spring-webflux` from `spring-rabbit-junit`. Fix Javadoc. Use RestTemplate for aliveness test; JVM HttpClient not available in Java 8. Restore spring-rabbit-junit jackson dependency. --- build.gradle | 19 +- .../converter/SerializerMessageConverter.java | 5 +- .../amqp/utils/SerializationUtils.java | 5 +- .../SerializerMessageConverterTests.java | 6 +- .../rabbit/junit/BrokerRunningSupport.java | 50 +++- .../rabbit/junit/RabbitAvailableTests.java | 4 +- .../stream/listener/RabbitListenerTests.java | 45 +++- .../LocalizedQueueConnectionFactory.java | 195 +++++++++++---- .../rabbit/connection/RestTemplateHolder.java | 41 ++++ .../connection/RestTemplateNodeLocator.java | 87 +++++++ .../rabbit/connection/WebFluxNodeLocator.java | 73 ++++++ .../EnableRabbitIntegrationTests.java | 12 +- ...ueueConnectionFactoryIntegrationTests.java | 19 +- .../LocalizedQueueConnectionFactoryTests.java | 52 ++-- .../core/FixedReplyQueueDeadLetterTests.java | 48 +--- .../rabbit/core/NeedsManagementTests.java | 96 ++++++++ .../core/RabbitAdminIntegrationTests.java | 23 +- .../amqp/rabbit/core/RabbitAdminTests.java | 13 +- .../amqp/rabbit/core/RabbitRestApiTests.java | 223 ------------------ src/reference/asciidoc/amqp.adoc | 46 ++++ 20 files changed, 681 insertions(+), 381 deletions(-) create mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java create mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java create mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java create mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java delete mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java diff --git a/build.gradle b/build.gradle index 0c997c3ebd..5985467791 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ ext { rabbitmqStreamVersion = '0.4.0' rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.13.1' rabbitmqHttpClientVersion = '3.12.1' - reactorVersion = '2020.0.20' + reactorVersion = '2020.0.24' snappyVersion = '1.1.8.4' springDataCommonsVersion = '2.6.7' springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.23' @@ -366,7 +366,10 @@ project('spring-rabbit') { api 'org.springframework:spring-context' api 'org.springframework:spring-messaging' api 'org.springframework:spring-tx' + optionalApi 'org.springframework:spring-webflux' + optionalApi "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" optionalApi 'io.projectreactor:reactor-core' + optionalApi 'io.projectreactor.netty:reactor-netty-http' optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' optionalApi "io.micrometer:micrometer-core:$micrometerVersion" @@ -380,10 +383,10 @@ project('spring-rabbit') { testApi project(':spring-rabbit-junit') testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion") testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion" - testRuntimeOnly 'org.springframework:spring-web' + testRuntimeOnly 'org.springframework:spring-webflux' testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' - testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind' + testImplementation 'com.fasterxml.jackson.core:jackson-databind' testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin' testRuntimeOnly ("junit:junit:$junit4Version") { @@ -407,7 +410,6 @@ project('spring-rabbit-stream') { api project(':spring-rabbit') api "com.rabbitmq:stream-client:$rabbitmqStreamVersion" - optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion" testApi project(':spring-rabbit-junit') testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' @@ -419,8 +421,11 @@ project('spring-rabbit-stream') { testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion" testRuntimeOnly "org.lz4:lz4-java:$lz4Version" testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion" + testRuntimeOnly 'io.projectreactor.netty:reactor-netty-http' testImplementation "org.testcontainers:rabbitmq:1.15.3" testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion" + testImplementation 'org.springframework:spring-webflux' + testImplementation 'io.projectreactor.netty:reactor-netty-http' } } @@ -436,14 +441,14 @@ project('spring-rabbit-junit') { exclude group: 'org.hamcrest', module: 'hamcrest-core' } api "com.rabbitmq:amqp-client:$rabbitmqVersion" - api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") { - exclude group: 'org.springframework', module: 'spring-web' - } api 'org.springframework:spring-web' api 'org.junit.jupiter:junit-jupiter-api' api "org.assertj:assertj-core:$assertjVersion" + api "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" + api 'com.fasterxml.jackson.core:jackson-databind' optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' + optionalApi "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" compileOnly 'org.apiguardian:apiguardian-api:1.0.0' } diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/converter/SerializerMessageConverter.java b/spring-amqp/src/main/java/org/springframework/amqp/support/converter/SerializerMessageConverter.java index 5b12f288b6..753c56fc5d 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/support/converter/SerializerMessageConverter.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/support/converter/SerializerMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,7 +27,6 @@ import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.DirectFieldAccessor; import org.springframework.core.ConfigurableObjectInputStream; -import org.springframework.core.NestedIOException; import org.springframework.core.serializer.DefaultDeserializer; import org.springframework.core.serializer.DefaultSerializer; import org.springframework.core.serializer.Deserializer; @@ -182,7 +181,7 @@ protected Class resolveClass(ObjectStreamClass classDesc) return objectInputStream.readObject(); } catch (ClassNotFoundException ex) { - throw new NestedIOException("Failed to deserialize object type", ex); + throw new IOException("Failed to deserialize object type", ex); } } diff --git a/spring-amqp/src/main/java/org/springframework/amqp/utils/SerializationUtils.java b/spring-amqp/src/main/java/org/springframework/amqp/utils/SerializationUtils.java index ccdb9ce6b3..3a36c7448a 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/utils/SerializationUtils.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/utils/SerializationUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2006-2019 the original author or authors. + * Copyright 2006-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import java.util.Set; import org.springframework.core.ConfigurableObjectInputStream; -import org.springframework.core.NestedIOException; import org.springframework.util.ObjectUtils; import org.springframework.util.PatternMatchUtils; @@ -126,7 +125,7 @@ protected Class resolveClass(ObjectStreamClass classDesc) return objectInputStream.readObject(); } catch (ClassNotFoundException ex) { - throw new NestedIOException("Failed to deserialize object type", ex); + throw new IOException("Failed to deserialize object type", ex); } } diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/converter/SerializerMessageConverterTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/converter/SerializerMessageConverterTests.java index 74eb71cf96..cf99c97e08 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/support/converter/SerializerMessageConverterTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/support/converter/SerializerMessageConverterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -33,7 +34,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.utils.test.TestUtils; -import org.springframework.core.NestedIOException; import org.springframework.core.serializer.DefaultDeserializer; import org.springframework.core.serializer.Deserializer; @@ -178,7 +178,7 @@ public void messageConversionExceptionForClassNotFound() throws Exception { body[10] = 'z'; assertThatThrownBy(() -> converter.fromMessage(message)) .isExactlyInstanceOf(MessageConversionException.class) - .hasCauseExactlyInstanceOf(NestedIOException.class); + .hasCauseExactlyInstanceOf(IOException.class); } } diff --git a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java index 2852ef2857..1a9fa36cd9 100644 --- a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java +++ b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,10 @@ package org.springframework.amqp.rabbit.junit; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -29,14 +31,28 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.apache.http.HttpHost; +import org.apache.http.client.AuthCache; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicAuthCache; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.HttpContext; + +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.http.client.support.BasicAuthenticationInterceptor; +import org.springframework.lang.Nullable; import org.springframework.util.Base64Utils; import org.springframework.util.StringUtils; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.http.client.Client; /** * A class that can be used to prevent integration tests from failing if the Rabbit broker application is @@ -372,8 +388,7 @@ private Channel createQueues(Connection connection) throws IOException, URISynta } } if (this.management) { - Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword); - if (!client.alivenessTest("/")) { + if (!alivenessTest()) { throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; " + "management not available"); } @@ -381,6 +396,31 @@ private Channel createQueues(Connection connection) throws IOException, URISynta return channel; } + private boolean alivenessTest() throws URISyntaxException { + URI uri = new URI(getAdminUri()) + .resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8)); + HttpHost host = new HttpHost(uri.getHost(), uri.getPort()); + RestTemplate template = new RestTemplate(new HttpComponentsClientHttpRequestFactory() { + + @Override + @Nullable + protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) { + AuthCache cache = new BasicAuthCache(); + BasicScheme scheme = new BasicScheme(); + cache.put(host, scheme); + BasicHttpContext context = new BasicHttpContext(); + context.setAttribute(HttpClientContext.AUTH_CACHE, cache); + return context; + } + + }); + template.getInterceptors().add(new BasicAuthenticationInterceptor(this.adminUser, this.adminPassword)); + ResponseEntity response = template.exchange(uri, HttpMethod.GET, null, String.class); + return response.getStatusCode().equals(HttpStatus.OK) + ? response.getBody().equals("{\"status\":\"ok\"}") + : false; + } + public static boolean fatal() { String serversRequired = System.getenv(BROKER_REQUIRED); if (Boolean.parseBoolean(serversRequired)) { diff --git a/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java b/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java index 9c93f81d3f..d23b0190a5 100644 --- a/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java +++ b/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ * @since 2.0.2 * */ -@RabbitAvailable(queues = "rabbitAvailableTests.queue") +@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true) public class RabbitAvailableTests { @Test diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java index b0f914d7ba..f2e3ae98ca 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java @@ -18,8 +18,13 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -41,6 +46,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean; @@ -48,9 +55,10 @@ import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; import com.rabbitmq.stream.Address; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; @@ -97,11 +105,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException { assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue(); } + @SuppressWarnings("unchecked") @Test void queueOverAmqp() throws Exception { - Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api"); - QueueInfo queue = client.getQueue("/", "stream.created.over.amqp"); - assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream"); + WebClient client = WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest")) + .build(); + Map queue = queueInfo("stream.created.over.amqp"); + assertThat(((Map) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream"); + } + + private Map queueInfo(String queueName) throws URISyntaxException { + WebClient client = createClient("guest", "guest"); + URI uri = queueUri(queueName); + return client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + } + + private URI queueUri(String queue) throws URISyntaxException { + URI uri = new URI("http://localhost:" + managementPort() + "/api") + .resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue); + return uri; + } + + private WebClient createClient(String adminUser, String adminPassword) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword)) + .build(); } @Configuration(proxyBeanMethods = false) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java index e209734ec6..83885e82cc 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java @@ -16,7 +16,7 @@ package org.springframework.amqp.rabbit.connection; -import java.net.MalformedURLException; +import java.net.URI; import java.net.URISyntaxException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Arrays; @@ -32,11 +32,10 @@ import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator; import org.springframework.beans.factory.DisposableBean; import org.springframework.core.io.Resource; +import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; - -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; +import org.springframework.util.ClassUtils; /** * A {@link RoutingConnectionFactory} that determines the node on which a queue is located and @@ -56,6 +55,13 @@ */ public class LocalizedQueueConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, DisposableBean { + private static final boolean USING_WEBFLUX; + + static { + USING_WEBFLUX = ClassUtils.isPresent("org.springframework.web.reactive.function.client.WebClient", + LocalizedQueueConnectionFactory.class.getClassLoader()); + } + private final Log logger = LogFactory.getLog(getClass()); private final Map nodeFactories = new HashMap(); @@ -84,6 +90,8 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi private final String trustStorePassPhrase; + private NodeLocator nodeLocator; + /** * @param defaultConnectionFactory the fallback connection factory to use if the queue * can't be located. @@ -190,6 +198,12 @@ private LocalizedQueueConnectionFactory(ConnectionFactory defaultConnectionFacto this.trustStore = trustStore; this.keyStorePassPhrase = keyStorePassPhrase; this.trustStorePassPhrase = trustStorePassPhrase; + if (USING_WEBFLUX) { + this.nodeLocator = new WebFluxNodeLocator(); + } + else { + this.nodeLocator = new RestTemplateNodeLocator(); + } } private static Map nodesAddressesToMap(String[] nodes, String[] addresses) { @@ -200,6 +214,16 @@ private static Map nodesAddressesToMap(String[] nodes, String[] .collect(Collectors.toMap(SimpleImmutableEntry::getKey, SimpleImmutableEntry::getValue)); } + /** + * Set a {@link NodeLocator} to use to find the node address for the leader. + * @param nodeLocator the locator. + * @since 2.4.8 + */ + public void setNodeLocator(NodeLocator nodeLocator) { + Assert.notNull(nodeLocator, "'nodeLocator' cannot be null"); + this.nodeLocator = nodeLocator; + } + @Override public Connection createConnection() throws AmqpException { return this.defaultConnectionFactory.createConnection(); @@ -244,7 +268,8 @@ public void clearConnectionListeners() { public ConnectionFactory getTargetConnectionFactory(Object key) { String queue = ((String) key); queue = queue.substring(1, queue.length() - 1); - Assert.isTrue(!queue.contains(","), () -> "Cannot use LocalizedQueueConnectionFactory with more than one queue: " + key); + Assert.isTrue(!queue.contains(","), + () -> "Cannot use LocalizedQueueConnectionFactory with more than one queue: " + key); ConnectionFactory connectionFactory = determineConnectionFactory(queue); if (connectionFactory == null) { return this.defaultConnectionFactory; @@ -256,51 +281,12 @@ public ConnectionFactory getTargetConnectionFactory(Object key) { @Nullable private ConnectionFactory determineConnectionFactory(String queue) { - for (int i = 0; i < this.adminUris.length; i++) { - String adminUri = this.adminUris[i]; - if (!adminUri.endsWith("/api/")) { - adminUri += "/api/"; - } - try { - Client client = createClient(adminUri, this.username, this.password); - QueueInfo queueInfo = client.getQueue(this.vhost, queue); - if (queueInfo != null) { - String node = queueInfo.getNode(); - if (node != null) { - String uri = this.nodeToAddress.get(node); - if (uri != null) { - return nodeConnectionFactory(queue, node, uri); - } - if (this.logger.isDebugEnabled()) { - this.logger.debug("No match for node: " + node); - } - } - } - else { - throw new AmqpException("Admin returned null QueueInfo"); - } - } - catch (Exception e) { - this.logger.warn("Failed to determine queue location for: " + queue + " at: " + - adminUri + ": " + e.getMessage()); - } + ConnectionFactory cf = this.nodeLocator.locate(this.adminUris, this.nodeToAddress, this.vhost, this.username, + this.password, queue, this::nodeConnectionFactory); + if (cf == null) { + this.logger.warn("Failed to determine queue location for: " + queue + ", using default connection factory"); } - this.logger.warn("Failed to determine queue location for: " + queue + ", using default connection factory"); - return null; - } - - /** - * Create a client instance. - * @param adminUri the admin URI. - * @param username the username - * @param password the password. - * @return The client. - * @throws MalformedURLException if the URL is malformed - * @throws URISyntaxException if there is a syntax error. - */ - protected Client createClient(String adminUri, String username, String password) throws MalformedURLException, - URISyntaxException { - return new Client(adminUri, username, password); + return cf; } private synchronized ConnectionFactory nodeConnectionFactory(String queue, String node, String address) { @@ -372,4 +358,115 @@ public void destroy() { resetConnection(); } + /** + * Used to obtain a connection factory for the queue leader. + * + * @param the client type. + * @since 2.4.8 + */ + public interface NodeLocator { + + LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(NodeLocator.class)); + + /** + * Return a connection factory for the leader node for the queue. + * @param adminUris an array of admin URIs. + * @param nodeToAddress a map of node names to node addresses (AMQP). + * @param vhost the vhost. + * @param username the user name. + * @param password the password. + * @param queue the queue name. + * @param factoryFunction an internal function to find or create the factory. + * @return a connection factory, if the leader node was found; null otherwise. + */ + @Nullable + default ConnectionFactory locate(String[] adminUris, Map nodeToAddress, String vhost, + String username, String password, String queue, FactoryFinder factoryFunction) { + + T client = createClient(username, password); + + for (int i = 0; i < adminUris.length; i++) { + String adminUri = adminUris[i]; + if (!adminUri.endsWith("/api/")) { + adminUri += "/api/"; + } + try { + String uri = new URI(adminUri) + .resolve("/api/queues/").toString(); + Map queueInfo = restCall(client, uri, vhost, queue); + if (queueInfo != null) { + String node = (String) queueInfo.get("node"); + if (node != null) { + String nodeUri = nodeToAddress.get(node); + if (uri != null) { + close(client); + return factoryFunction.locate(queue, node, nodeUri); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("No match for node: " + node); + } + } + } + else { + throw new AmqpException("Admin returned null QueueInfo"); + } + } + catch (Exception e) { + LOGGER.warn("Failed to determine queue location for: " + queue + " at: " + + adminUri + ": " + e.getMessage()); + } + } + LOGGER.warn("Failed to determine queue location for: " + queue + ", using default connection factory"); + close(client); + return null; + } + + /** + * Create a client for subsequent use. + * @param userName the user name. + * @param password the password. + * @return the client. + */ + T createClient(String userName, String password); + + /** + * Close the client. + * @param client the client. + */ + default void close(T client) { + } + + /** + * Retrieve a map of queue properties using the RabbitMQ Management REST API. + * @param client the client. + * @param baseUri the base uri. + * @param vhost the virtual host. + * @param queue the queue name. + * @return the map of queue properties. + * @throws URISyntaxException if the syntax is bad. + */ + @Nullable + Map restCall(T client, String baseUri, String vhost, String queue) + throws URISyntaxException; + + } + + /** + * Callback to determine the connection factory using the provided information. + * @since 2.4.8 + */ + @FunctionalInterface + public interface FactoryFinder { + + /** + * Locate or create a factory. + * @param queueName the queue name. + * @param node the node name. + * @param nodeUri the node URI. + * @return the factory. + */ + ConnectionFactory locate(String queueName, String node, String nodeUri); + + } + } diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java new file mode 100644 index 0000000000..056435925c --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateHolder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import org.springframework.web.client.RestTemplate; + +/** + * Holder for a {@link RestTemplate} and credentials. + * + * @author Gary Russell + * @since 2.4.8 + * + */ +class RestTemplateHolder { + + final String userName; // NOSONAR + + final String password; // NOSONAR + + RestTemplate template; // NOSONAR + + RestTemplateHolder(String userName, String password) { + this.userName = userName; + this.password = password; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java new file mode 100644 index 0000000000..761db1d13b --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RestTemplateNodeLocator.java @@ -0,0 +1,87 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import org.apache.http.HttpHost; +import org.apache.http.client.AuthCache; +import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.impl.auth.BasicScheme; +import org.apache.http.impl.client.BasicAuthCache; +import org.apache.http.protocol.BasicHttpContext; +import org.apache.http.protocol.HttpContext; + +import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.http.client.support.BasicAuthenticationInterceptor; +import org.springframework.lang.Nullable; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriUtils; + +/** + * A {@link NodeLocator} using the {@link RestTemplate}. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class RestTemplateNodeLocator implements NodeLocator { + + @Override + public RestTemplateHolder createClient(String userName, String password) { + return new RestTemplateHolder(userName, password); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + @Nullable + public Map restCall(RestTemplateHolder client, String baseUri, String vhost, String queue) + throws URISyntaxException { + + if (client.template == null) { + URI uri = new URI(baseUri); + HttpHost host = new HttpHost(uri.getHost(), uri.getPort()); + client.template = new RestTemplate(new HttpComponentsClientHttpRequestFactory() { + + @Override + @Nullable + protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) { + AuthCache cache = new BasicAuthCache(); + BasicScheme scheme = new BasicScheme(); + cache.put(host, scheme); + BasicHttpContext context = new BasicHttpContext(); + context.setAttribute(HttpClientContext.AUTH_CACHE, cache); + return context; + } + + }); + client.template.getInterceptors().add(new BasicAuthenticationInterceptor(client.userName, client.password)); + } + URI uri = new URI(baseUri) + .resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + queue); + ResponseEntity response = client.template.exchange(uri, HttpMethod.GET, null, Map.class); + return response.getStatusCode().equals(HttpStatus.OK) ? response.getBody() : null; + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java new file mode 100644 index 0000000000..5179ea378a --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/WebFluxNodeLocator.java @@ -0,0 +1,73 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; + +/** + * A {@link NodeLocator} using the Spring WebFlux {@link WebClient}. + * + * @author Gary Russell + * @since 2.4.8 + * + */ +public class WebFluxNodeLocator implements NodeLocator { + + @Override + @Nullable + public Map restCall(WebClient client, String baseUri, String vhost, String queue) + throws URISyntaxException { + + URI uri = new URI(baseUri) + .resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + queue); + HashMap queueInfo = client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); // NOSONAR magic# + return queueInfo; + } + + /** + * Create a client instance. + * @param username the username + * @param password the password. + * @return The client. + */ + @Override + public WebClient createClient(String username, String password) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(username, password)) + .build(); + } + +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java index 3b2bee26b4..ea1bd5baba 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java @@ -69,6 +69,7 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; import org.springframework.amqp.rabbit.connection.SimplePropertyValueConnectionNameStrategy; +import org.springframework.amqp.rabbit.core.NeedsManagementTests; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; @@ -144,8 +145,6 @@ import org.springframework.validation.annotation.Validated; import com.rabbitmq.client.Channel; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -180,7 +179,7 @@ "test.custom.argument", "test.arg.validation", "manual.acks.1", "manual.acks.2", "erit.batch.1", "erit.batch.2", "erit.batch.3", "erit.mp.arg" }, purgeAfterEach = false) -public class EnableRabbitIntegrationTests { +public class EnableRabbitIntegrationTests extends NeedsManagementTests { @Autowired private RabbitTemplate rabbitTemplate; @@ -837,11 +836,10 @@ public void deadLetterOnDefaultExchange() { this.rabbitTemplate.convertAndSend("amqp656", "foo"); assertThat(this.rabbitTemplate.receiveAndConvert("amqp656dlq", 10000)).isEqualTo("foo"); try { - Client rabbitRestClient = new Client("http://localhost:15672/api/", "guest", "guest"); - QueueInfo amqp656 = rabbitRestClient.getQueue("/", "amqp656"); + Map amqp656 = await().until(() -> queueInfo("amqp656"), q -> q != null); if (amqp656 != null) { - assertThat(amqp656.getArguments().get("test-empty")).isEqualTo(""); - assertThat(amqp656.getArguments().get("test-null")).isEqualTo("undefined"); + assertThat(arguments(amqp656).get("test-empty")).isEqualTo(""); + assertThat(arguments(amqp656).get("test-null")).isEqualTo("undefined"); } } catch (Exception e) { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java index 117c7f4765..6d19ccfbbb 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryIntegrationTests.java @@ -17,7 +17,9 @@ package org.springframework.amqp.rabbit.connection; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.AfterEach; @@ -34,7 +36,7 @@ * * @author Gary Russell */ -@RabbitAvailable(management = true) +@RabbitAvailable(management = true, queues = "local") public class LocalizedQueueConnectionFactoryIntegrationTests { private LocalizedQueueConnectionFactory lqcf; @@ -72,4 +74,19 @@ public void testConnect() throws Exception { admin.deleteQueue(queue.getName()); } + @Test + void findLocal() { + ConnectionFactory defaultCf = mock(ConnectionFactory.class); + LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultCf, + Map.of("rabbit@localhost", "localhost:5672"), new String[] { "http://localhost:15672" }, + "/", "guest", "guest", false, null); + ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]"); + RabbitAdmin admin = new RabbitAdmin(cf); + assertThat(admin.getQueueProperties("local")).isNotNull(); + lqcf.setNodeLocator(new RestTemplateNodeLocator()); + ConnectionFactory cf2 = lqcf.getTargetConnectionFactory("[local]"); + assertThat(cf2).isSameAs(cf); + lqcf.destroy(); + } + } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java index e7b55e55b5..9dc9246b20 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,11 +45,16 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.utils.test.TestUtils; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.test.web.reactive.server.HttpHandlerConnector; +import org.springframework.web.reactive.function.client.WebClient; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; +import reactor.core.publisher.Mono; /** @@ -76,8 +81,8 @@ public void testFailOver() throws Exception { String username = "guest"; String password = "guest"; final AtomicBoolean firstServer = new AtomicBoolean(true); - final Client client1 = doCreateClient(adminUris[0], username, password, nodes[0]); - final Client client2 = doCreateClient(adminUris[1], username, password, nodes[1]); + final WebClient client1 = doCreateClient(adminUris[0], username, password, nodes[0]); + final WebClient client2 = doCreateClient(adminUris[1], username, password, nodes[1]); final Map mockCFs = new HashMap(); CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(1); @@ -86,17 +91,20 @@ public void testFailOver() throws Exception { LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultConnectionFactory, addresses, adminUris, nodes, vhost, username, password, false, null) { - @Override - protected Client createClient(String adminUri, String username, String password) { - return firstServer.get() ? client1 : client2; - } - @Override protected ConnectionFactory createConnectionFactory(String address, String node) { return mockCFs.get(address); } }; + lqcf.setNodeLocator(new WebFluxNodeLocator() { + + @Override + public WebClient createClient(String username, String password) { + return firstServer.get() ? client1 : client2; + } + + }); Map nodeAddress = TestUtils.getPropertyValue(lqcf, "nodeToAddress", Map.class); assertThat(nodeAddress.get("rabbit@foo")).isEqualTo(rabbit1); assertThat(nodeAddress.get("rabbit@bar")).isEqualTo(rabbit2); @@ -144,12 +152,20 @@ private boolean assertLog(List logRows, String expected) { return false; } - private Client doCreateClient(String uri, String username, String password, String node) { - Client client = mock(Client.class); - QueueInfo queueInfo = new QueueInfo(); - queueInfo.setNode(node); - given(client.getQueue("/", "q")).willReturn(queueInfo); - return client; + private WebClient doCreateClient(String uri, String username, String password, String node) { + ClientHttpConnector httpConnector = + new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.OK); + response.getHeaders().setContentType(MediaType.APPLICATION_JSON); + Mono json = Mono + .just(response.bufferFactory().wrap(("{\"node\":\"" + node + "\"}").getBytes())); + return response.writeWith(json) + .then(Mono.defer(response::setComplete)); + }); + + return WebClient.builder() + .clientConnector(httpConnector) + .build(); } @Test @@ -182,8 +198,8 @@ private ConnectionFactory mockCF(final String address, final CountDownLatch latc given(channel.isOpen()).willReturn(true, false); willAnswer(invocation -> { String tag = UUID.randomUUID().toString(); - consumers.put(address, invocation.getArgument(6)); - consumerTags.put(address, tag); + this.consumers.put(address, invocation.getArgument(6)); + this.consumerTags.put(address, tag); if (latch != null) { latch.countDown(); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java index a79977dbfb..2854f8abc6 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.Binding; @@ -39,9 +38,7 @@ import org.springframework.amqp.core.QueueBuilder.Overflow; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; import org.springframework.amqp.rabbit.junit.RabbitAvailable; -import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowired; @@ -50,10 +47,6 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.ExchangeInfo; -import com.rabbitmq.http.client.domain.QueueInfo; - /** * * @author Gary Russell @@ -63,9 +56,7 @@ @SpringJUnitConfig @DirtiesContext @RabbitAvailable(management = true) -public class FixedReplyQueueDeadLetterTests { - - private static BrokerRunningSupport brokerRunning; +public class FixedReplyQueueDeadLetterTests extends NeedsManagementTests { @Autowired private RabbitTemplate rabbitTemplate; @@ -73,11 +64,6 @@ public class FixedReplyQueueDeadLetterTests { @Autowired private DeadListener deadListener; - @BeforeAll - static void setUp() { - brokerRunning = RabbitAvailableCondition.getBrokerRunning(); - } - @AfterAll static void tearDown() { brokerRunning.deleteQueues("all.args.1", "all.args.2", "all.args.3", "test.quorum"); @@ -98,10 +84,8 @@ void test() throws Exception { @Test void testQueueArgs1() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "all.args.1"), que -> que != null); - Map arguments = queue.getArguments(); + Map queue = await().until(() -> queueInfo("all.args.1"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-message-ttl")).isEqualTo(1000); assertThat(arguments.get("x-expires")).isEqualTo(200_000); assertThat(arguments.get("x-max-length")).isEqualTo(42); @@ -117,10 +101,8 @@ void testQueueArgs1() throws MalformedURLException, URISyntaxException, Interrup @Test void testQueueArgs2() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "all.args.2"), que -> que != null); - Map arguments = queue.getArguments(); + Map queue = await().until(() -> queueInfo("all.args.2"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-message-ttl")).isEqualTo(1000); assertThat(arguments.get("x-expires")).isEqualTo(200_000); assertThat(arguments.get("x-max-length")).isEqualTo(42); @@ -134,11 +116,9 @@ void testQueueArgs2() throws MalformedURLException, URISyntaxException, Interrup } @Test - void testQueueArgs3() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "all.args.3"), que -> que != null); - Map arguments = queue.getArguments(); + void testQueueArgs3() throws URISyntaxException { + Map queue = await().until(() -> queueInfo("all.args.3"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-message-ttl")).isEqualTo(1000); assertThat(arguments.get("x-expires")).isEqualTo(200_000); assertThat(arguments.get("x-max-length")).isEqualTo(42); @@ -150,19 +130,17 @@ void testQueueArgs3() throws MalformedURLException, URISyntaxException, Interrup assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy"); assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.random.getValue()); - ExchangeInfo exchange = client.getExchange("/", "dlx.test.requestEx"); - assertThat(exchange.getArguments().get("alternate-exchange")).isEqualTo("alternate"); + Map exchange = exchangeInfo("dlx.test.requestEx"); + assertThat(arguments(exchange).get("alternate-exchange")).isEqualTo("alternate"); } /* * Does not require a 3.8 broker - they are just arbitrary arguments. */ @Test - void testQuorumArgs() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "test.quorum"), que -> que != null); - Map arguments = queue.getArguments(); + void testQuorumArgs() { + Map queue = await().until(() -> queueInfo("test.quorum"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-queue-type")).isEqualTo("quorum"); assertThat(arguments.get("x-delivery-limit")).isEqualTo(10); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java new file mode 100644 index 0000000000..a854488588 --- /dev/null +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java @@ -0,0 +1,96 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.core; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; + +import org.junit.jupiter.api.BeforeAll; + +import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; +import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; + +/** + * @author Gary Russell + * @since 2.4.8 + * + */ +public abstract class NeedsManagementTests { + + protected static BrokerRunningSupport brokerRunning; + + @BeforeAll + static void setUp() { + brokerRunning = RabbitAvailableCondition.getBrokerRunning(); + } + + protected Map queueInfo(String queueName) throws URISyntaxException { + WebClient client = createClient(brokerRunning.getAdminUser(), brokerRunning.getAdminPassword()); + URI uri = queueUri(queueName); + return client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + } + + protected Map exchangeInfo(String name) throws URISyntaxException { + WebClient client = createClient(brokerRunning.getAdminUser(), brokerRunning.getAdminPassword()); + URI uri = exchangeUri(name); + return client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + } + + @SuppressWarnings("unchecked") + protected Map arguments(Map infoMap) { + return (Map) infoMap.get("arguments"); + } + + private URI queueUri(String queue) throws URISyntaxException { + URI uri = new URI(brokerRunning.getAdminUri()) + .resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue); + return uri; + } + + private URI exchangeUri(String queue) throws URISyntaxException { + URI uri = new URI(brokerRunning.getAdminUri()) + .resolve("/api/exchanges/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue); + return uri; + } + + private WebClient createClient(String adminUser, String adminPassword) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword)) + .build(); + } + +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java index 1c504a7c78..d9831acd18 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.time.Duration; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.AfterEach; @@ -51,8 +52,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.ExchangeInfo; /** * @author Dave Syer @@ -62,7 +61,7 @@ * @author Artem Bilan */ @RabbitAvailable(management = true) -public class RabbitAdminIntegrationTests { +public class RabbitAdminIntegrationTests extends NeedsManagementTests { private final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); @@ -256,9 +255,9 @@ public void testDeleteExchangeWithInternalOption() throws Exception { exchange.setInternal(true); rabbitAdmin.declareExchange(exchange); - ExchangeInfo exchange2 = getExchange(exchangeName); - assertThat(exchange2.getType()).isEqualTo(ExchangeTypes.DIRECT); - assertThat(exchange2.isInternal()).isTrue(); + Map exchange2 = getExchange(exchangeName); + assertThat(exchange2.get("type")).isEqualTo(ExchangeTypes.DIRECT); + assertThat(exchange2.get("internal")).isEqualTo(Boolean.TRUE); boolean result = rabbitAdmin.deleteExchange(exchangeName); @@ -368,6 +367,7 @@ public void testQueueDeclareBad() { this.rabbitAdmin.deleteQueue(queue.getName()); } + @SuppressWarnings("unchecked") @Test public void testDeclareDelayedExchange() throws Exception { DirectExchange exchange = new DirectExchange("test.delayed.exchange"); @@ -413,18 +413,17 @@ public void testDeclareDelayedExchange() throws Exception { assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(1000)); assertThat(System.currentTimeMillis() - t1).isGreaterThan(950L); - ExchangeInfo exchange2 = getExchange(exchangeName); - assertThat(exchange2.getArguments().get("x-delayed-type")).isEqualTo(ExchangeTypes.DIRECT); - assertThat(exchange2.getType()).isEqualTo("x-delayed-message"); + Map exchange2 = getExchange(exchangeName); + assertThat(arguments(exchange2).get("x-delayed-type")).isEqualTo(ExchangeTypes.DIRECT); + assertThat(exchange2.get("type")).isEqualTo("x-delayed-message"); this.rabbitAdmin.deleteQueue(queue.getName()); this.rabbitAdmin.deleteExchange(exchangeName); } - private ExchangeInfo getExchange(String exchangeName) throws Exception { - Client rabbitRestClient = new Client("http://localhost:15672/api/", "guest", "guest"); + private Map getExchange(String exchangeName) throws Exception { return await().pollDelay(Duration.ZERO) - .until(() -> rabbitRestClient.getExchange("/", exchangeName), exch -> exch != null); + .until(() -> exchangeInfo(exchangeName), exch -> exch != null); } /** diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java index 04b804ef48..9703c6c7cb 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java @@ -84,8 +84,6 @@ import com.rabbitmq.client.AMQP.Queue.DeclareOk; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; /** * @author Mark Pollack @@ -99,7 +97,7 @@ * */ @RabbitAvailable(management = true) -public class RabbitAdminTests { +public class RabbitAdminTests extends NeedsManagementTests { @Test public void testSettingOfNullConnectionFactory() { @@ -373,17 +371,16 @@ public void testLeaderLocator() throws Exception { RabbitAdmin admin = new RabbitAdmin(cf); AnonymousQueue queue = new AnonymousQueue(); admin.declareQueue(queue); - Client client = new Client("http://guest:guest@localhost:15672/api"); AnonymousQueue queue1 = queue; - QueueInfo info = await().until(() -> client.getQueue("/", queue1.getName()), inf -> inf != null); - assertThat(info.getArguments().get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo("client-local"); + Map info = await().until(() -> queueInfo(queue1.getName()), inf -> inf != null); + assertThat(arguments(info).get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo("client-local"); queue = new AnonymousQueue(); queue.setLeaderLocator(null); admin.declareQueue(queue); AnonymousQueue queue2 = queue; - info = await().until(() -> client.getQueue("/", queue2.getName()), inf -> inf != null); - assertThat(info.getArguments().get(Queue.X_QUEUE_LEADER_LOCATOR)).isNull(); + info = await().until(() -> queueInfo(queue2.getName()), inf -> inf != null); + assertThat(arguments(info).get(Queue.X_QUEUE_LEADER_LOCATOR)).isNull(); cf.destroy(); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java deleted file mode 100644 index 4a574ad87f..0000000000 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.amqp.rabbit.core; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; - -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; - -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Exchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.QueueBuilder; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.junit.RabbitAvailable; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.BindingInfo; -import com.rabbitmq.http.client.domain.ExchangeInfo; -import com.rabbitmq.http.client.domain.QueueInfo; - -/** - * @author Gary Russell - * @author Artem Bilan - * - * @since 1.5 - * - */ -@RabbitAvailable(management = true) -public class RabbitRestApiTests { - - private final CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); - - private final Client rabbitRestClient; - - public RabbitRestApiTests() throws MalformedURLException, URISyntaxException { - this.rabbitRestClient = new Client("http://localhost:15672/api/", "guest", "guest"); - } - - @AfterEach - public void tearDown() { - connectionFactory.destroy(); - } - - @Test - public void testExchanges() { - List list = this.rabbitRestClient.getExchanges(); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testExchangesVhost() { - List list = this.rabbitRestClient.getExchanges("/"); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testBindings() { - List list = this.rabbitRestClient.getBindings(); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testBindingsVhost() { - List list = this.rabbitRestClient.getBindings("/"); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testQueues() { - List list = this.rabbitRestClient.getQueues(); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testQueuesVhost() { - List list = this.rabbitRestClient.getQueues("/"); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testBindingsDetail() { - RabbitAdmin admin = new RabbitAdmin(connectionFactory); - Map args = Collections.singletonMap("alternate-exchange", ""); - Exchange exchange1 = new DirectExchange(UUID.randomUUID().toString(), false, true, args); - admin.declareExchange(exchange1); - Exchange exchange2 = new DirectExchange(UUID.randomUUID().toString(), false, true, args); - admin.declareExchange(exchange2); - Queue queue = admin.declareQueue(); - Binding binding1 = BindingBuilder - .bind(queue) - .to(exchange1) - .with("foo") - .and(args); - admin.declareBinding(binding1); - Binding binding2 = BindingBuilder - .bind(exchange2) - .to((DirectExchange) exchange1) - .with("bar"); - admin.declareBinding(binding2); - - List bindings = this.rabbitRestClient.getBindingsBySource("/", exchange1.getName()); - assertThat(bindings).hasSize(2); - assertThat(bindings.get(0).getSource()).isEqualTo(exchange1.getName()); - assertThat("foo").isIn(bindings.get(0).getRoutingKey(), bindings.get(1).getRoutingKey()); - BindingInfo qout = null; - BindingInfo eout = null; - if (bindings.get(0).getRoutingKey().equals("foo")) { - qout = bindings.get(0); - eout = bindings.get(1); - } - else { - eout = bindings.get(0); - qout = bindings.get(1); - } - assertThat(qout.getDestinationType()).isEqualTo("queue"); - assertThat(qout.getDestination()).isEqualTo(queue.getName()); - assertThat(qout.getArguments()).isNotNull(); - assertThat(qout.getArguments().get("alternate-exchange")).isEqualTo(""); - - assertThat(eout.getDestinationType()).isEqualTo("exchange"); - assertThat(eout.getDestination()).isEqualTo(exchange2.getName()); - - admin.deleteExchange(exchange1.getName()); - admin.deleteExchange(exchange2.getName()); - } - - @Test - public void testSpecificExchange() { - RabbitAdmin admin = new RabbitAdmin(connectionFactory); - Map args = Collections.singletonMap("alternate-exchange", ""); - Exchange exchange = new DirectExchange(UUID.randomUUID().toString(), true, true, args); - admin.declareExchange(exchange); - ExchangeInfo exchangeOut = this.rabbitRestClient.getExchange("/", exchange.getName()); - assertThat(exchangeOut.isDurable()).isTrue(); - assertThat(exchangeOut.isAutoDelete()).isTrue(); - assertThat(exchangeOut.getName()).isEqualTo(exchange.getName()); - assertThat(exchangeOut.getArguments()).isEqualTo(args); - admin.deleteExchange(exchange.getName()); - } - - @Test - public void testSpecificQueue() throws Exception { - RabbitAdmin admin = new RabbitAdmin(connectionFactory); - Map args = Collections.singletonMap("foo", "bar"); - Queue queue1 = QueueBuilder.nonDurable(UUID.randomUUID().toString()) - .autoDelete() - .withArguments(args) - .build(); - admin.declareQueue(queue1); - Queue queue2 = QueueBuilder.durable(UUID.randomUUID().toString()) - .withArguments(args) - .build(); - admin.declareQueue(queue2); - Channel channel = this.connectionFactory.createConnection().createChannel(false); - String consumer = channel.basicConsume(queue1.getName(), false, "", false, true, null, new DefaultConsumer(channel)); - QueueInfo qi = await().until(() -> this.rabbitRestClient.getQueue("/", queue1.getName()), - info -> info.getExclusiveConsumerTag() != null && !"".equals(info.getExclusiveConsumerTag())); - QueueInfo queueOut = this.rabbitRestClient.getQueue("/", queue1.getName()); - assertThat(queueOut.isDurable()).isFalse(); - assertThat(queueOut.isExclusive()).isFalse(); - assertThat(queueOut.isAutoDelete()).isTrue(); - assertThat(queueOut.getName()).isEqualTo(queue1.getName()); - assertThat(queueOut.getArguments()).isEqualTo(args); - assertThat(qi.getExclusiveConsumerTag()).isEqualTo(consumer); - channel.basicCancel(consumer); - channel.close(); - - queueOut = this.rabbitRestClient.getQueue("/", queue2.getName()); - assertThat(queueOut.isDurable()).isTrue(); - assertThat(queueOut.isExclusive()).isFalse(); - assertThat(queueOut.isAutoDelete()).isFalse(); - assertThat(queueOut.getName()).isEqualTo(queue2.getName()); - assertThat(queueOut.getArguments()).isEqualTo(args); - - admin.deleteQueue(queue1.getName()); - admin.deleteQueue(queue2.getName()); - } - - @Test - public void testDeleteExchange() { - String exchangeName = "testExchange"; - Exchange testExchange = new DirectExchange(exchangeName); - ExchangeInfo info = new ExchangeInfo(); - info.setArguments(testExchange.getArguments()); - info.setAutoDelete(testExchange.isAutoDelete()); - info.setDurable(testExchange.isDurable()); - info.setType(testExchange.getType()); - this.rabbitRestClient.declareExchange("/", testExchange.getName(), info); - ExchangeInfo exchangeToAssert = this.rabbitRestClient.getExchange("/", exchangeName); - assertThat(exchangeToAssert.getName()).isEqualTo(testExchange.getName()); - assertThat(exchangeToAssert.getType()).isEqualTo(testExchange.getType()); - this.rabbitRestClient.deleteExchange("/", testExchange.getName()); - assertThat(this.rabbitRestClient.getExchange("/", exchangeName)).isNull(); - } - -} diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 33028a9dd5..f7233e2162 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -794,6 +794,52 @@ public LocalizedQueueConnectionFactory queueAffinityCF( Notice that the first three parameters are arrays of `addresses`, `adminUris`, and `nodes`. These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node. +IMPORTANT: Starting with version 3.0, the RabbitMQ `http-client` is no longer used to access the Rest API. +Instead, by default, the `WebClient` from Spring Webflux is used if `spring-webflux` is on the class path; otherwise a `RestTemplate` is used. + +To add `WebFlux` to the class path: + +.Maven +==== +[source,xml,subs="+attributes"] +---- + + org.springframework.amqp + spring-rabbit + +---- +==== +.Gradle +==== +[source,groovy,subs="+attributes"] +---- +compile 'org.springframework.amqp:spring-rabbit' +---- +==== + +You can also use other REST technology by implementing `LocalizedQueueConnectionFactory.NodeLocator` and overriding its `createClient, ``restCall`, and optionally, `close` methods. + +==== +[source, java] +---- +lqcf.setNodeLocator(new NodeLocator() { + + @Override + public MyClient createClient(String userName, String password) { + ... + } + + @Override + public HashMap restCall(MyClient client, URI uri) { + ... + }); + +}); +---- +==== + +The framework provides the `WebFluxNodeLocator` and `RestTemplateNodeLocator`, with the default as discussed above. + [[cf-pub-conf-ret]] ===== Publisher Confirms and Returns