From 29dc74ff0b5e4de949a69473c88f72f1e279271d Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 10 Oct 2022 15:54:14 -0400 Subject: [PATCH] GH-1419: Remove RabbitMQ http-client Usage Resolves https://github.com/spring-projects/spring-amqp/issues/1419 Use Spring WebFlux instead, while allowing the user to choose some other technology in the `LocalizedQueueConnectionFactory`.. --- build.gradle | 18 +- .../rabbit/junit/BrokerRunningSupport.java | 33 ++- .../rabbit/junit/RabbitAvailableTests.java | 4 +- .../stream/listener/RabbitListenerTests.java | 47 +++- .../rabbit/connection/DefaultNodeLocator.java | 63 +++++ .../LocalizedQueueConnectionFactory.java | 148 +++++++---- .../EnableRabbitIntegrationTests.java | 22 +- .../LocalizedQueueConnectionFactoryTests.java | 67 +++-- .../core/FixedReplyQueueDeadLetterTests.java | 50 +--- .../rabbit/core/NeedsManagementTests.java | 96 ++++++++ .../core/RabbitAdminIntegrationTests.java | 25 +- .../amqp/rabbit/core/RabbitAdminTests.java | 13 +- .../amqp/rabbit/core/RabbitRestApiTests.java | 233 ------------------ src/reference/asciidoc/amqp.adoc | 36 +++ src/reference/asciidoc/whats-new.adoc | 3 + 15 files changed, 467 insertions(+), 391 deletions(-) create mode 100644 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/DefaultNodeLocator.java create mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java delete mode 100644 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java diff --git a/build.gradle b/build.gradle index e1874a35a6..63e94a5736 100644 --- a/build.gradle +++ b/build.gradle @@ -44,7 +44,7 @@ ext { assertkVersion = '0.24' awaitilityVersion = '4.2.0' commonsCompressVersion = '1.20' - commonsHttpClientVersion = '4.5.13' + commonsHttpClientVersion = '5.1.3' commonsPoolVersion = '2.11.1' googleJsr305Version = '3.0.2' hamcrestVersion = '2.2' @@ -62,7 +62,6 @@ ext { mockitoVersion = '4.8.0' rabbitmqStreamVersion = '0.8.0' rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.16.0' - rabbitmqHttpClientVersion = '3.12.1' reactorVersion = '2022.0.0-SNAPSHOT' snappyVersion = '1.1.8.4' springDataVersion = '2022.0.0-SNAPSHOT' @@ -384,11 +383,12 @@ project('spring-rabbit') { api project(':spring-amqp') api "com.rabbitmq:amqp-client:$rabbitmqVersion" - optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion" optionalApi 'org.springframework:spring-aop' api 'org.springframework:spring-context' api 'org.springframework:spring-messaging' api 'org.springframework:spring-tx' + optionalApi 'org.springframework:spring-web' + optionalApi 'org.springframework:spring-webflux' optionalApi 'io.projectreactor:reactor-core' optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' @@ -410,7 +410,7 @@ project('spring-rabbit') { testImplementation 'io.micrometer:micrometer-tracing-test' testImplementation 'io.micrometer:micrometer-tracing-integration-test' testRuntimeOnly 'org.springframework:spring-web' - testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" + testRuntimeOnly "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion" testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind' testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' @@ -465,14 +465,13 @@ project('spring-rabbit-stream') { api project(':spring-rabbit') api "com.rabbitmq:stream-client:$rabbitmqStreamVersion" - optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion" testApi project(':spring-rabbit-junit') testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind' testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin' - testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion" + testRuntimeOnly "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion" testRuntimeOnly "org.apache.commons:commons-compress:$commonsCompressVersion" testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion" testRuntimeOnly "org.lz4:lz4-java:$lz4Version" @@ -494,16 +493,15 @@ project('spring-rabbit-junit') { exclude group: 'org.hamcrest', module: 'hamcrest-core' } api "com.rabbitmq:amqp-client:$rabbitmqVersion" - api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") { - exclude group: 'org.springframework', module: 'spring-web' - } api 'org.springframework:spring-web' + api 'org.springframework:spring-webflux' api 'org.junit.jupiter:junit-jupiter-api' api "org.assertj:assertj-core:$assertjVersion" optionalApi "ch.qos.logback:logback-classic:$logbackVersion" optionalApi 'org.apache.logging.log4j:log4j-core' compileOnly 'org.apiguardian:apiguardian-api:1.0.0' - + testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core' + testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind' } } diff --git a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java index 2852ef2857..90b2d9f7e3 100644 --- a/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java +++ b/spring-rabbit-junit/src/main/java/org/springframework/amqp/rabbit/junit/BrokerRunningSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,11 @@ package org.springframework.amqp.rabbit.junit; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -30,13 +33,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; import org.springframework.util.Base64Utils; import org.springframework.util.StringUtils; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.http.client.Client; /** * A class that can be used to prevent integration tests from failing if the Rabbit broker application is @@ -372,8 +379,7 @@ private Channel createQueues(Connection connection) throws IOException, URISynta } } if (this.management) { - Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword); - if (!client.alivenessTest("/")) { + if (!alivenessTest()) { throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; " + "management not available"); } @@ -381,6 +387,25 @@ private Channel createQueues(Connection connection) throws IOException, URISynta return channel; } + private boolean alivenessTest() throws URISyntaxException { + WebClient client = WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(this.adminUser, this.adminPassword)) + .build(); + URI uri = new URI(getAdminUri()) + .resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8)); + HashMap result = client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); // NOSONAR magic# + if (result != null) { + return result.get("status").equals("ok"); + } + return false; + } + public static boolean fatal() { String serversRequired = System.getenv(BROKER_REQUIRED); if (Boolean.parseBoolean(serversRequired)) { diff --git a/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java b/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java index 9c93f81d3f..d23b0190a5 100644 --- a/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java +++ b/spring-rabbit-junit/src/test/java/org/springframework/amqp/rabbit/junit/RabbitAvailableTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,7 @@ * @since 2.0.2 * */ -@RabbitAvailable(queues = "rabbitAvailableTests.queue") +@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true) public class RabbitAvailableTests { @Test diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java index fa5f47f82c..ba4bd3e534 100644 --- a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java @@ -18,14 +18,18 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.Queue; @@ -42,6 +46,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean; @@ -50,9 +56,10 @@ import org.springframework.retry.interceptor.RetryOperationsInterceptor; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; import com.rabbitmq.stream.Address; import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.Message; @@ -99,12 +106,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException { assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue(); } + @SuppressWarnings("unchecked") @Test - @Disabled("Temporary until SF uses Micrometer snaps") void queueOverAmqp() throws Exception { - Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api"); - QueueInfo queue = client.getQueue("/", "stream.created.over.amqp"); - assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream"); + WebClient client = WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest")) + .build(); + Map queue = queueInfo("stream.created.over.amqp"); + assertThat(((Map) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream"); + } + + private Map queueInfo(String queueName) throws URISyntaxException { + WebClient client = createClient("guest", "guest"); + URI uri = queueUri(queueName); + return client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + } + + private URI queueUri(String queue) throws URISyntaxException { + URI uri = new URI("http://localhost:" + managementPort() + "/api") + .resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue); + return uri; + } + + private WebClient createClient(String adminUser, String adminPassword) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword)) + .build(); } @Configuration(proxyBeanMethods = false) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/DefaultNodeLocator.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/DefaultNodeLocator.java new file mode 100644 index 0000000000..06cad93098 --- /dev/null +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/DefaultNodeLocator.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.connection; + +import java.net.URI; +import java.time.Duration; +import java.util.HashMap; + +import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Default {@link NodeLocator} using the Spring WebFlux {@link WebClient}. + * + * @author Gary Russell + * @since 2.4.8 + * + */ +public class DefaultNodeLocator implements NodeLocator { + + @Override + public HashMap restCall(String username, String password, URI uri) { + WebClient client = createClient(username, password); + HashMap queueInfo = client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); // NOSONAR magic# + return queueInfo; + } + + /** + * Create a client instance. + * @param username the username + * @param password the password. + * @return The client. + */ + protected WebClient createClient(String username, String password) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(username, password)) + .build(); + } + +} diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java index e209734ec6..45e551270a 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactory.java @@ -16,8 +16,8 @@ package org.springframework.amqp.rabbit.connection; -import java.net.MalformedURLException; -import java.net.URISyntaxException; +import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Arrays; import java.util.HashMap; @@ -32,11 +32,10 @@ import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator; import org.springframework.beans.factory.DisposableBean; import org.springframework.core.io.Resource; +import org.springframework.core.log.LogAccessor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; - -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; +import org.springframework.web.util.UriUtils; /** * A {@link RoutingConnectionFactory} that determines the node on which a queue is located and @@ -84,6 +83,8 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi private final String trustStorePassPhrase; + private NodeLocator nodeLocator = new DefaultNodeLocator(); + /** * @param defaultConnectionFactory the fallback connection factory to use if the queue * can't be located. @@ -200,6 +201,14 @@ private static Map nodesAddressesToMap(String[] nodes, String[] .collect(Collectors.toMap(SimpleImmutableEntry::getKey, SimpleImmutableEntry::getValue)); } + /** + * Set a {@link NodeLocator} to use to find the node address for the leader. + * @param nodeLocator the locator. + */ + public void setNodeLocator(NodeLocator nodeLocator) { + this.nodeLocator = nodeLocator; + } + @Override public Connection createConnection() throws AmqpException { return this.defaultConnectionFactory.createConnection(); @@ -256,51 +265,11 @@ public ConnectionFactory getTargetConnectionFactory(Object key) { @Nullable private ConnectionFactory determineConnectionFactory(String queue) { - for (int i = 0; i < this.adminUris.length; i++) { - String adminUri = this.adminUris[i]; - if (!adminUri.endsWith("/api/")) { - adminUri += "/api/"; - } - try { - Client client = createClient(adminUri, this.username, this.password); - QueueInfo queueInfo = client.getQueue(this.vhost, queue); - if (queueInfo != null) { - String node = queueInfo.getNode(); - if (node != null) { - String uri = this.nodeToAddress.get(node); - if (uri != null) { - return nodeConnectionFactory(queue, node, uri); - } - if (this.logger.isDebugEnabled()) { - this.logger.debug("No match for node: " + node); - } - } - } - else { - throw new AmqpException("Admin returned null QueueInfo"); - } - } - catch (Exception e) { - this.logger.warn("Failed to determine queue location for: " + queue + " at: " + - adminUri + ": " + e.getMessage()); - } + ConnectionFactory cf = this.nodeLocator.locate(this.adminUris, this.nodeToAddress, this.vhost, this.username, this.password, queue, this::nodeConnectionFactory); + if (cf == null) { + this.logger.warn("Failed to determine queue location for: " + queue + ", using default connection factory"); } - this.logger.warn("Failed to determine queue location for: " + queue + ", using default connection factory"); - return null; - } - - /** - * Create a client instance. - * @param adminUri the admin URI. - * @param username the username - * @param password the password. - * @return The client. - * @throws MalformedURLException if the URL is malformed - * @throws URISyntaxException if there is a syntax error. - */ - protected Client createClient(String adminUri, String username, String password) throws MalformedURLException, - URISyntaxException { - return new Client(adminUri, username, password); + return cf; } private synchronized ConnectionFactory nodeConnectionFactory(String queue, String node, String address) { @@ -372,4 +341,85 @@ public void destroy() { resetConnection(); } + /** + * Used to obtain a connection factory for the queue leader. + * @since 2.4.8 + */ + @FunctionalInterface + public interface NodeLocator { + + LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(NodeLocator.class)); + + /** + * Return a connection factory for the leader node for the queue. + * @param adminUris an array of admin URIs. + * @param nodeToAddress a map of node names to node addresses (AMQP). + * @param vhost the vhost. + * @param username the user name. + * @param password the password. + * @param queue the queue name. + * @param factoryFunction an internal function to find or create the factory. + * @return a connection factory, if the leader node was found; null otherwise. + */ + @Nullable + default ConnectionFactory locate(String[] adminUris, Map nodeToAddress, String vhost, String username, + String password, String queue, FactoryFinder factoryFunction) { + + for (int i = 0; i < adminUris.length; i++) { + String adminUri = adminUris[i]; + if (!adminUri.endsWith("/api/")) { + adminUri += "/api/"; + } + try { + URI uri = new URI(adminUri) + .resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + + queue); + HashMap queueInfo = restCall(username, password, uri); + if (queueInfo != null) { + String node = (String) queueInfo.get("node"); + if (node != null) { + String nodeUri = nodeToAddress.get(node); + if (uri != null) { + return factoryFunction.locate(queue, node, nodeUri); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("No match for node: " + node); + } + } + } + else { + throw new AmqpException("Admin returned null QueueInfo"); + } + } + catch (Exception e) { + LOGGER.warn("Failed to determine queue location for: " + queue + " at: " + + adminUri + ": " + e.getMessage()); + } + } + LOGGER.warn("Failed to determine queue location for: " + queue + ", using default connection factory"); + return null; + } + + HashMap restCall(String username, String password, URI uri); + + } + + /** + * Callback to determine the connection factory using the provided information. + * @since 2.4.8 + */ + @FunctionalInterface + public interface FactoryFinder { + + /** + * Locate or create a factory. + * @param queueName the queue name. + * @param node the node name. + * @param nodeUri the node URI. + * @return the factory. + */ + ConnectionFactory locate(String queueName, String node, String nodeUri); + + } + } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java index 45505f86be..db176c52e0 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java @@ -47,7 +47,6 @@ import org.aopalliance.intercept.MethodInvocation; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -68,6 +67,7 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionNameStrategy; import org.springframework.amqp.rabbit.connection.SimplePropertyValueConnectionNameStrategy; +import org.springframework.amqp.rabbit.core.NeedsManagementTests; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; @@ -141,10 +141,10 @@ import org.springframework.validation.Errors; import org.springframework.validation.Validator; import org.springframework.validation.annotation.Validated; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; import com.rabbitmq.client.Channel; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -180,7 +180,7 @@ "test.custom.argument", "test.arg.validation", "manual.acks.1", "manual.acks.2", "erit.batch.1", "erit.batch.2", "erit.batch.3", "erit.mp.arg" }, purgeAfterEach = false) -public class EnableRabbitIntegrationTests { +public class EnableRabbitIntegrationTests extends NeedsManagementTests { @Autowired private RabbitTemplate rabbitTemplate; @@ -833,16 +833,14 @@ public void testHeadersExchange() throws Exception { } @Test - @Disabled("Temporary until SF uses Micrometer snaps") public void deadLetterOnDefaultExchange() { this.rabbitTemplate.convertAndSend("amqp656", "foo"); assertThat(this.rabbitTemplate.receiveAndConvert("amqp656dlq", 10000)).isEqualTo("foo"); try { - Client rabbitRestClient = new Client("http://localhost:15672/api/", "guest", "guest"); - QueueInfo amqp656 = rabbitRestClient.getQueue("/", "amqp656"); + Map amqp656 = await().until(() -> queueInfo("amqp656"), q -> q != null); if (amqp656 != null) { - assertThat(amqp656.getArguments().get("test-empty")).isEqualTo(""); - assertThat(amqp656.getArguments().get("test-null")).isEqualTo("undefined"); + assertThat(arguments(amqp656).get("test-empty")).isEqualTo(""); + assertThat(arguments(amqp656).get("test-null")).isEqualTo("undefined"); } } catch (Exception e) { @@ -850,6 +848,12 @@ public void deadLetterOnDefaultExchange() { } } + private WebClient createClient(String adminUser, String adminPassword) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword)) + .build(); + } + @Test @DirtiesContext public void returnExceptionWithRethrowAdapter() { diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java index e7b55e55b5..725b2c1e81 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/LocalizedQueueConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,20 +42,28 @@ import org.mockito.ArgumentCaptor; import org.mockito.internal.stubbing.answers.CallsRealMethods; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.junit.RabbitAvailable; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.utils.test.TestUtils; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.test.web.reactive.server.HttpHandlerConnector; +import org.springframework.web.reactive.function.client.WebClient; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; +import reactor.core.publisher.Mono; /** * @author Gary Russell * @author Artem Bilan */ +@RabbitAvailable(queues = "local") public class LocalizedQueueConnectionFactoryTests { private final Map channels = new HashMap(); @@ -76,8 +84,8 @@ public void testFailOver() throws Exception { String username = "guest"; String password = "guest"; final AtomicBoolean firstServer = new AtomicBoolean(true); - final Client client1 = doCreateClient(adminUris[0], username, password, nodes[0]); - final Client client2 = doCreateClient(adminUris[1], username, password, nodes[1]); + final WebClient client1 = doCreateClient(adminUris[0], username, password, nodes[0]); + final WebClient client2 = doCreateClient(adminUris[1], username, password, nodes[1]); final Map mockCFs = new HashMap(); CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(1); @@ -86,17 +94,20 @@ public void testFailOver() throws Exception { LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultConnectionFactory, addresses, adminUris, nodes, vhost, username, password, false, null) { - @Override - protected Client createClient(String adminUri, String username, String password) { - return firstServer.get() ? client1 : client2; - } - @Override protected ConnectionFactory createConnectionFactory(String address, String node) { return mockCFs.get(address); } }; + lqcf.setNodeLocator(new DefaultNodeLocator() { + + @Override + protected WebClient createClient(String username, String password) { + return firstServer.get() ? client1 : client2; + } + + }); Map nodeAddress = TestUtils.getPropertyValue(lqcf, "nodeToAddress", Map.class); assertThat(nodeAddress.get("rabbit@foo")).isEqualTo(rabbit1); assertThat(nodeAddress.get("rabbit@bar")).isEqualTo(rabbit2); @@ -144,12 +155,32 @@ private boolean assertLog(List logRows, String expected) { return false; } - private Client doCreateClient(String uri, String username, String password, String node) { - Client client = mock(Client.class); - QueueInfo queueInfo = new QueueInfo(); - queueInfo.setNode(node); - given(client.getQueue("/", "q")).willReturn(queueInfo); - return client; + private WebClient doCreateClient(String uri, String username, String password, String node) { + ClientHttpConnector httpConnector = + new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.OK); + response.getHeaders().setContentType(MediaType.APPLICATION_JSON); + Mono json = Mono + .just(response.bufferFactory().wrap(("{\"node\":\"" + node + "\"}").getBytes())); + return response.writeWith(json) + .then(Mono.defer(response::setComplete)); + }); + + return WebClient.builder() + .clientConnector(httpConnector) + .build(); + } + + @Test + void findLocal() { + ConnectionFactory defaultCf = mock(ConnectionFactory.class); + LocalizedQueueConnectionFactory lqcf = new LocalizedQueueConnectionFactory(defaultCf, + Map.of("rabbit@localhost", "localhost:5672"), new String[] { "http://localhost:15672" }, + "/", "guest", "guest", false, null); + ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]"); + RabbitAdmin admin = new RabbitAdmin(cf); + assertThat(admin.getQueueProperties("local")).isNotNull(); + lqcf.destroy(); } @Test @@ -182,8 +213,8 @@ private ConnectionFactory mockCF(final String address, final CountDownLatch latc given(channel.isOpen()).willReturn(true, false); willAnswer(invocation -> { String tag = UUID.randomUUID().toString(); - consumers.put(address, invocation.getArgument(6)); - consumerTags.put(address, tag); + this.consumers.put(address, invocation.getArgument(6)); + this.consumerTags.put(address, tag); if (latch != null) { latch.countDown(); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java index f7e862321d..f4dbce98ad 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java @@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.Binding; @@ -40,9 +38,7 @@ import org.springframework.amqp.core.QueueBuilder.Overflow; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; import org.springframework.amqp.rabbit.junit.RabbitAvailable; -import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowired; @@ -51,10 +47,6 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.ExchangeInfo; -import com.rabbitmq.http.client.domain.QueueInfo; - /** * * @author Gary Russell @@ -64,10 +56,7 @@ @SpringJUnitConfig @DirtiesContext @RabbitAvailable(management = true) -@Disabled("Temporary until SF uses Micrometer snaps") -public class FixedReplyQueueDeadLetterTests { - - private static BrokerRunningSupport brokerRunning; +public class FixedReplyQueueDeadLetterTests extends NeedsManagementTests { @Autowired private RabbitTemplate rabbitTemplate; @@ -75,11 +64,6 @@ public class FixedReplyQueueDeadLetterTests { @Autowired private DeadListener deadListener; - @BeforeAll - static void setUp() { - brokerRunning = RabbitAvailableCondition.getBrokerRunning(); - } - @AfterAll static void tearDown() { brokerRunning.deleteQueues("all.args.1", "all.args.2", "all.args.3", "test.quorum"); @@ -100,10 +84,8 @@ void test() throws Exception { @Test void testQueueArgs1() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "all.args.1"), que -> que != null); - Map arguments = queue.getArguments(); + Map queue = await().until(() -> queueInfo("all.args.1"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-message-ttl")).isEqualTo(1000); assertThat(arguments.get("x-expires")).isEqualTo(200_000); assertThat(arguments.get("x-max-length")).isEqualTo(42); @@ -119,10 +101,8 @@ void testQueueArgs1() throws MalformedURLException, URISyntaxException, Interrup @Test void testQueueArgs2() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "all.args.2"), que -> que != null); - Map arguments = queue.getArguments(); + Map queue = await().until(() -> queueInfo("all.args.2"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-message-ttl")).isEqualTo(1000); assertThat(arguments.get("x-expires")).isEqualTo(200_000); assertThat(arguments.get("x-max-length")).isEqualTo(42); @@ -136,11 +116,9 @@ void testQueueArgs2() throws MalformedURLException, URISyntaxException, Interrup } @Test - void testQueueArgs3() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "all.args.3"), que -> que != null); - Map arguments = queue.getArguments(); + void testQueueArgs3() throws URISyntaxException { + Map queue = await().until(() -> queueInfo("all.args.3"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-message-ttl")).isEqualTo(1000); assertThat(arguments.get("x-expires")).isEqualTo(200_000); assertThat(arguments.get("x-max-length")).isEqualTo(42); @@ -152,19 +130,17 @@ void testQueueArgs3() throws MalformedURLException, URISyntaxException, Interrup assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy"); assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.random.getValue()); - ExchangeInfo exchange = client.getExchange("/", "dlx.test.requestEx"); - assertThat(exchange.getArguments().get("alternate-exchange")).isEqualTo("alternate"); + Map exchange = exchangeInfo("dlx.test.requestEx"); + assertThat(arguments(exchange).get("alternate-exchange")).isEqualTo("alternate"); } /* * Does not require a 3.8 broker - they are just arbitrary arguments. */ @Test - void testQuorumArgs() throws MalformedURLException, URISyntaxException, InterruptedException { - Client client = new Client(brokerRunning.getAdminUri(), brokerRunning.getAdminUser(), - brokerRunning.getAdminPassword()); - QueueInfo queue = await().until(() -> client.getQueue("/", "test.quorum"), que -> que != null); - Map arguments = queue.getArguments(); + void testQuorumArgs() { + Map queue = await().until(() -> queueInfo("test.quorum"), que -> que != null); + Map arguments = arguments(queue); assertThat(arguments.get("x-queue-type")).isEqualTo("quorum"); assertThat(arguments.get("x-delivery-limit")).isEqualTo(10); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java new file mode 100644 index 0000000000..a854488588 --- /dev/null +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/NeedsManagementTests.java @@ -0,0 +1,96 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.rabbit.core; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; + +import org.junit.jupiter.api.BeforeAll; + +import org.springframework.amqp.rabbit.junit.BrokerRunningSupport; +import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.ExchangeFilterFunctions; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriUtils; + +/** + * @author Gary Russell + * @since 2.4.8 + * + */ +public abstract class NeedsManagementTests { + + protected static BrokerRunningSupport brokerRunning; + + @BeforeAll + static void setUp() { + brokerRunning = RabbitAvailableCondition.getBrokerRunning(); + } + + protected Map queueInfo(String queueName) throws URISyntaxException { + WebClient client = createClient(brokerRunning.getAdminUser(), brokerRunning.getAdminPassword()); + URI uri = queueUri(queueName); + return client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + } + + protected Map exchangeInfo(String name) throws URISyntaxException { + WebClient client = createClient(brokerRunning.getAdminUser(), brokerRunning.getAdminPassword()); + URI uri = exchangeUri(name); + return client.get() + .uri(uri) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .bodyToMono(new ParameterizedTypeReference>() { + }) + .block(Duration.ofSeconds(10)); + } + + @SuppressWarnings("unchecked") + protected Map arguments(Map infoMap) { + return (Map) infoMap.get("arguments"); + } + + private URI queueUri(String queue) throws URISyntaxException { + URI uri = new URI(brokerRunning.getAdminUri()) + .resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue); + return uri; + } + + private URI exchangeUri(String queue) throws URISyntaxException { + URI uri = new URI(brokerRunning.getAdminUri()) + .resolve("/api/exchanges/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue); + return uri; + } + + private WebClient createClient(String adminUser, String adminPassword) { + return WebClient.builder() + .filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword)) + .build(); + } + +} diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java index 85cdc2b05c..abf8f76cb3 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java @@ -22,11 +22,11 @@ import java.io.IOException; import java.time.Duration; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.amqp.AmqpIOException; @@ -52,8 +52,6 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.ExchangeInfo; /** * @author Dave Syer @@ -63,8 +61,7 @@ * @author Artem Bilan */ @RabbitAvailable(management = true) -@Disabled("Temporary until SF uses Micrometer snaps") -public class RabbitAdminIntegrationTests { +public class RabbitAdminIntegrationTests extends NeedsManagementTests { private final CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); @@ -258,9 +255,9 @@ public void testDeleteExchangeWithInternalOption() throws Exception { exchange.setInternal(true); rabbitAdmin.declareExchange(exchange); - ExchangeInfo exchange2 = getExchange(exchangeName); - assertThat(exchange2.getType()).isEqualTo(ExchangeTypes.DIRECT); - assertThat(exchange2.isInternal()).isTrue(); + Map exchange2 = getExchange(exchangeName); + assertThat(exchange2.get("type")).isEqualTo(ExchangeTypes.DIRECT); + assertThat(exchange2.get("internal")).isEqualTo(Boolean.TRUE); boolean result = rabbitAdmin.deleteExchange(exchangeName); @@ -370,6 +367,7 @@ public void testQueueDeclareBad() { this.rabbitAdmin.deleteQueue(queue.getName()); } + @SuppressWarnings("unchecked") @Test public void testDeclareDelayedExchange() throws Exception { DirectExchange exchange = new DirectExchange("test.delayed.exchange"); @@ -415,18 +413,17 @@ public void testDeclareDelayedExchange() throws Exception { assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(1000)); assertThat(System.currentTimeMillis() - t1).isGreaterThan(950L); - ExchangeInfo exchange2 = getExchange(exchangeName); - assertThat(exchange2.getArguments().get("x-delayed-type")).isEqualTo(ExchangeTypes.DIRECT); - assertThat(exchange2.getType()).isEqualTo("x-delayed-message"); + Map exchange2 = getExchange(exchangeName); + assertThat(arguments(exchange2).get("x-delayed-type")).isEqualTo(ExchangeTypes.DIRECT); + assertThat(exchange2.get("type")).isEqualTo("x-delayed-message"); this.rabbitAdmin.deleteQueue(queue.getName()); this.rabbitAdmin.deleteExchange(exchangeName); } - private ExchangeInfo getExchange(String exchangeName) throws Exception { - Client rabbitRestClient = new Client("http://localhost:15672/api/", "guest", "guest"); + private Map getExchange(String exchangeName) throws Exception { return await().pollDelay(Duration.ZERO) - .until(() -> rabbitRestClient.getExchange("/", exchangeName), exch -> exch != null); + .until(() -> exchangeInfo(exchangeName), exch -> exch != null); } /** diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java index d617d52773..8911191a74 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java @@ -85,8 +85,6 @@ import com.rabbitmq.client.AMQP.Queue.DeclareOk; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.QueueInfo; /** * @author Mark Pollack @@ -101,7 +99,7 @@ */ @RabbitAvailable(management = true) @Disabled("Temporary until SF uses Micrometer snaps") -public class RabbitAdminTests { +public class RabbitAdminTests extends NeedsManagementTests { @Test public void testSettingOfNullConnectionFactory() { @@ -375,17 +373,16 @@ public void testLeaderLocator() throws Exception { RabbitAdmin admin = new RabbitAdmin(cf); AnonymousQueue queue = new AnonymousQueue(); admin.declareQueue(queue); - Client client = new Client("http://guest:guest@localhost:15672/api"); AnonymousQueue queue1 = queue; - QueueInfo info = await().until(() -> client.getQueue("/", queue1.getName()), inf -> inf != null); - assertThat(info.getArguments().get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo("client-local"); + Map info = await().until(() -> queueInfo(queue1.getName()), inf -> inf != null); + assertThat(arguments(info).get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo("client-local"); queue = new AnonymousQueue(); queue.setLeaderLocator(null); admin.declareQueue(queue); AnonymousQueue queue2 = queue; - info = await().until(() -> client.getQueue("/", queue2.getName()), inf -> inf != null); - assertThat(info.getArguments().get(Queue.X_QUEUE_LEADER_LOCATOR)).isNull(); + info = await().until(() -> queueInfo(queue2.getName()), inf -> inf != null); + assertThat(arguments(info).get(Queue.X_QUEUE_LEADER_LOCATOR)).isNull(); cf.destroy(); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java deleted file mode 100644 index a4e5178f8e..0000000000 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitRestApiTests.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.amqp.rabbit.core; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.awaitility.Awaitility.await; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -import org.springframework.amqp.AmqpException; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Exchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.core.QueueBuilder; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.junit.RabbitAvailable; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.DefaultConsumer; -import com.rabbitmq.http.client.Client; -import com.rabbitmq.http.client.domain.BindingInfo; -import com.rabbitmq.http.client.domain.ExchangeInfo; -import com.rabbitmq.http.client.domain.QueueInfo; - -/** - * @author Gary Russell - * @author Artem Bilan - * - * @since 1.5 - * - */ -@RabbitAvailable(management = true) -@Disabled("Temporary until SF uses Micrometer snaps") -public class RabbitRestApiTests { - - private final CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); - - private final Client rabbitRestClient; - - public RabbitRestApiTests() throws MalformedURLException, URISyntaxException { - this.rabbitRestClient = new Client("http://localhost:15672/api/", "guest", "guest"); - } - - @AfterEach - public void tearDown() { - connectionFactory.destroy(); - } - - @Test - public void testExchanges() { - List list = this.rabbitRestClient.getExchanges(); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testExchangesVhost() { - List list = this.rabbitRestClient.getExchanges("/"); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testBindings() { - List list = this.rabbitRestClient.getBindings(); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testBindingsVhost() { - List list = this.rabbitRestClient.getBindings("/"); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testQueues() { - List list = this.rabbitRestClient.getQueues(); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testQueuesVhost() { - List list = this.rabbitRestClient.getQueues("/"); - assertThat(list.size() > 0).isTrue(); - } - - @Test - public void testBindingsDetail() { - RabbitAdmin admin = new RabbitAdmin(connectionFactory); - Map args = Collections.singletonMap("alternate-exchange", ""); - Exchange exchange1 = new DirectExchange(UUID.randomUUID().toString(), false, true, args); - admin.declareExchange(exchange1); - Exchange exchange2 = new DirectExchange(UUID.randomUUID().toString(), false, true, args); - admin.declareExchange(exchange2); - Queue queue = admin.declareQueue(); - Binding binding1 = BindingBuilder - .bind(queue) - .to(exchange1) - .with("foo") - .and(args); - admin.declareBinding(binding1); - Binding binding2 = BindingBuilder - .bind(exchange2) - .to((DirectExchange) exchange1) - .with("bar"); - admin.declareBinding(binding2); - - List bindings = this.rabbitRestClient.getBindingsBySource("/", exchange1.getName()); - assertThat(bindings).hasSize(2); - assertThat(bindings.get(0).getSource()).isEqualTo(exchange1.getName()); - assertThat("foo").isIn(bindings.get(0).getRoutingKey(), bindings.get(1).getRoutingKey()); - BindingInfo qout = null; - BindingInfo eout = null; - if (bindings.get(0).getRoutingKey().equals("foo")) { - qout = bindings.get(0); - eout = bindings.get(1); - } - else { - eout = bindings.get(0); - qout = bindings.get(1); - } - assertThat(qout.getDestinationType()).isEqualTo("queue"); - assertThat(qout.getDestination()).isEqualTo(queue.getName()); - assertThat(qout.getArguments()).isNotNull(); - assertThat(qout.getArguments().get("alternate-exchange")).isEqualTo(""); - - assertThat(eout.getDestinationType()).isEqualTo("exchange"); - assertThat(eout.getDestination()).isEqualTo(exchange2.getName()); - - admin.deleteExchange(exchange1.getName()); - admin.deleteExchange(exchange2.getName()); - } - - @Test - public void testSpecificExchange() { - RabbitAdmin admin = new RabbitAdmin(connectionFactory); - Map args = Collections.singletonMap("alternate-exchange", ""); - Exchange exchange = new DirectExchange(UUID.randomUUID().toString(), true, true, args); - admin.declareExchange(exchange); - ExchangeInfo exchangeOut = this.rabbitRestClient.getExchange("/", exchange.getName()); - assertThat(exchangeOut.isDurable()).isTrue(); - assertThat(exchangeOut.isAutoDelete()).isTrue(); - assertThat(exchangeOut.getName()).isEqualTo(exchange.getName()); - assertThat(exchangeOut.getArguments()).isEqualTo(args); - admin.deleteExchange(exchange.getName()); - } - - @Test - public void testSpecificQueue() throws Exception { - RabbitAdmin admin = new RabbitAdmin(connectionFactory); - Map args = Collections.singletonMap("foo", "bar"); - Queue queue1 = QueueBuilder.nonDurable(UUID.randomUUID().toString()) - .autoDelete() - .withArguments(args) - .build(); - admin.declareQueue(queue1); - Queue queue2 = QueueBuilder.durable(UUID.randomUUID().toString()) - .withArguments(args) - .build(); - admin.declareQueue(queue2); - Channel channel = this.connectionFactory.createConnection().createChannel(false); - String consumer = channel.basicConsume(queue1.getName(), false, "", false, true, null, new DefaultConsumer(channel)); - QueueInfo qi = await().until(() -> this.rabbitRestClient.getQueue("/", queue1.getName()), - info -> info.getExclusiveConsumerTag() != null && !"".equals(info.getExclusiveConsumerTag())); - QueueInfo queueOut = this.rabbitRestClient.getQueue("/", queue1.getName()); - assertThat(queueOut.isDurable()).isFalse(); - assertThat(queueOut.isExclusive()).isFalse(); - assertThat(queueOut.isAutoDelete()).isTrue(); - assertThat(queueOut.getName()).isEqualTo(queue1.getName()); - assertThat(queueOut.getArguments()).isEqualTo(args); - assertThat(qi.getExclusiveConsumerTag()).isEqualTo(consumer); - channel.basicCancel(consumer); - channel.close(); - - queueOut = this.rabbitRestClient.getQueue("/", queue2.getName()); - assertThat(queueOut.isDurable()).isTrue(); - assertThat(queueOut.isExclusive()).isFalse(); - assertThat(queueOut.isAutoDelete()).isFalse(); - assertThat(queueOut.getName()).isEqualTo(queue2.getName()); - assertThat(queueOut.getArguments()).isEqualTo(args); - - admin.deleteQueue(queue1.getName()); - admin.deleteQueue(queue2.getName()); - } - - @Test - public void testDeleteExchange() { - String exchangeName = "testExchange"; - Exchange testExchange = new DirectExchange(exchangeName); - ExchangeInfo info = new ExchangeInfo(); - info.setArguments(testExchange.getArguments()); - info.setAutoDelete(testExchange.isAutoDelete()); - info.setDurable(testExchange.isDurable()); - info.setType(testExchange.getType()); - this.rabbitRestClient.declareExchange("/", testExchange.getName(), info); - ExchangeInfo exchangeToAssert = this.rabbitRestClient.getExchange("/", exchangeName); - assertThat(exchangeToAssert.getName()).isEqualTo(testExchange.getName()); - assertThat(exchangeToAssert.getType()).isEqualTo(testExchange.getType()); - this.rabbitRestClient.deleteExchange("/", testExchange.getName()); - // 6.0.0 REST compatibility -// assertThat(this.rabbitRestClient.getExchange("/", exchangeName)).isNull(); - RabbitTemplate template = new RabbitTemplate(this.connectionFactory); - assertThatExceptionOfType(AmqpException.class) - .isThrownBy(() -> template.execute(channel -> channel.exchangeDeclarePassive(exchangeName))) - .withCauseExactlyInstanceOf(IOException.class); - } - -} diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 910f41b8d9..c5d1b6773c 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -795,6 +795,42 @@ public LocalizedQueueConnectionFactory queueAffinityCF( Notice that the first three parameters are arrays of `addresses`, `adminUris`, and `nodes`. These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node. +IMPORTANT: Starting with version 3.0, the RabbitMQ `http-client` is no longer used to access the Rest API. +Instead, by default, the `WebClient` from Spring Webflux is used; it is not added to the class path by default. + +.Maven +==== +[source,xml,subs="+attributes"] +---- + + org.springframework.amqp + spring-rabbit + +---- +==== +.Gradle +==== +[source,groovy,subs="+attributes"] +---- +compile 'org.springframework.amqp:spring-rabbit' +---- +==== + +You can also use other REST technology by extending `DefaultNodeLocator` and overriding its `restCall` method. + +==== +[source, java] +---- +lqcf.setNodeLocator(new DefaultNodeLocator() { + + @Override + public HashMap restCall(String username, String password, URI uri) { + ... + }); +}); +---- +==== + [[cf-pub-conf-ret]] ===== Publisher Confirms and Returns diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index d11a5e719d..ebcbe08a35 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -43,3 +43,6 @@ See <> for more information. The default `addressShuffleMode` in `AbstractConnectionFactory` is now `RANDOM`. This results in connecting to a random host when multiple addresses are provided. See <> for more information. + +The `LocalizedQueueConnectionFactory` no longer uses the RabbitMQ `http-client` library to determine which node is the leader for a queue. +See <> for more information.