From 7a10c2af8b07ca741e4a461162f03776fe9bae22 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 21 Jun 2022 08:25:38 -0400 Subject: [PATCH] GH-1465: Super Stream SAC - WIP --- build.gradle | 5 +- .../listener/StreamListenerContainer.java | 12 ++ .../stream/listener/SuperStreamSACTests.java | 133 ++++++++++++++++++ 3 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java diff --git a/build.gradle b/build.gradle index 2ce43489c6..43946404c6 100644 --- a/build.gradle +++ b/build.gradle @@ -58,7 +58,7 @@ ext { micrometerVersion = '1.10.0-M2' micrometerTracingVersion = '1.0.0-M5' mockitoVersion = '4.5.1' - rabbitmqStreamVersion = '0.4.0' + rabbitmqStreamVersion = '0.6.0-SNAPSHOT' rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.13.1' rabbitmqHttpClientVersion = '3.12.1' reactorVersion = '2020.0.18' @@ -101,6 +101,7 @@ allprojects { } repositories { + mavenLocal() mavenCentral() maven { url 'https://repo.spring.io/libs-milestone' } if (version.endsWith('-SNAPSHOT')) { @@ -427,7 +428,7 @@ project('spring-rabbit-stream') { dependencies { api project(':spring-rabbit') - api "com.rabbitmq:stream-client:$rabbitmqStreamVersion" + api "com.rabbitmq:stream-client-sac:$rabbitmqStreamVersion" optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion" testApi project(':spring-rabbit-junit') diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java index 2a9684ccde..9a42c8ff96 100644 --- a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java @@ -96,6 +96,18 @@ public void setQueueNames(String... queueNames) { this.builder.stream(queueNames[0]); } + /** + * Enable Single Active Consumer on a Super Stream. + * @param superStream the stream. + * @param name the consumer name. + */ + public void superStream(String superStream, String name) { + Assert.notNull(superStream, "'superStream' cannot be null"); + this.builder.superStream(superStream) + .singleActiveConsumer() + .name(name); + } + /** * Get a {@link StreamMessageConverter} used to convert a * {@link com.rabbitmq.stream.Message} to a diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java new file mode 100644 index 0000000000..fc01730c9d --- /dev/null +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/SuperStreamSACTests.java @@ -0,0 +1,133 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Scope; +import org.springframework.rabbit.stream.config.SuperStream; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import com.rabbitmq.stream.Address; +import com.rabbitmq.stream.Environment; +import com.rabbitmq.stream.OffsetSpecification; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@SpringJUnitConfig +public class SuperStreamSACTests { // extends AbstractIntegrationTests + + @Test + void superStream(@Autowired ApplicationContext context, @Autowired RabbitTemplate template, + @Autowired Environment env, @Autowired Config config, @Autowired RabbitAdmin admin, + @Autowired Declarables declarables) throws InterruptedException { + + template.getConnectionFactory().createConnection(); + StreamListenerContainer container1 = context.getBean(StreamListenerContainer.class, env, "one"); + container1.start(); + StreamListenerContainer container2 = context.getBean(StreamListenerContainer.class, env, "two"); + container2.start(); + StreamListenerContainer container3 = context.getBean(StreamListenerContainer.class, env, "three"); + container3.start(); + template.convertAndSend("ss.sac.test", "0", "foo"); + template.convertAndSend("ss.sac.test", "1", "bar"); + template.convertAndSend("ss.sac.test", "2", "baz"); + assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(config.messages.keySet()).contains("one", "two", "three"); + clean(admin, declarables); + } + + private void clean(RabbitAdmin admin, Declarables declarables) { + declarables.getDeclarablesByType(Queue.class).forEach(queue -> admin.deleteQueue(queue.getName())); + declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName())); + } + + @Configuration + public static class Config { + + volatile Map messages = new ConcurrentHashMap<>(); + + volatile CountDownLatch latch = new CountDownLatch(3); + + @Bean + CachingConnectionFactory cf() { + return new CachingConnectionFactory("localhost"); //, amqpPort(); + } + + @Bean + RabbitAdmin admin(ConnectionFactory cf) { + return new RabbitAdmin(cf); + } + + @Bean + RabbitTemplate template(ConnectionFactory cf) { + return new RabbitTemplate(cf); + } + + @Bean + SuperStream superStream() { + return new SuperStream("ss.sac.test", 3); + } + + @Bean + static Environment environment() { + return Environment.builder() + .addressResolver(add -> new Address("localhost", 5552)) // streamPort())) + .build(); + } + + @Bean + @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) + StreamListenerContainer container(Environment env, String name) { + StreamListenerContainer container = new StreamListenerContainer(env); + container.superStream("ss.sac.test", "test"); + container.setupMessageListener(msg -> { + this.messages.put(name, msg); + this.latch.countDown(); + System.out.println(name + ":" + new String(msg.getBody())); + }); + container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last())); + container.setAutoStartup(false); + return container; + } + + } + +}