From 87057497f6a502997a44d02c8bd8fd65796c5cee Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 16 Jun 2022 11:23:09 -0400 Subject: [PATCH] GH-1465: Part 1: Provision Super Streams over AMQP See https://github.com/spring-projects/spring-amqp/issues/1465 Inspired by https://github.com/rabbitmq/rabbitmq-stream-java-client/blob/29b1a3eca72c8e3f719db4fd9fafdf503f37ea20/src/test/java/com/rabbitmq/stream/impl/TestUtils.java#L244-L276 --- .../rabbit/stream/config/SuperStream.java | 63 +++++++++++++++ .../config/SuperStreamProvisioningTests.java | 77 +++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java create mode 100644 spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java diff --git a/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java new file mode 100644 index 0000000000..cceeb2821f --- /dev/null +++ b/spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/SuperStream.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.config; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.Binding.DestinationType; +import org.springframework.amqp.core.Declarable; +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; + +/** + * Create Super Stream Topology {@link Declarable}s. + * + * @author Gary Russell + * @since 3.0 + * + */ +public class SuperStream extends Declarables { + + /** + * Create a Super Stream with the provided parameters. + * @param name the stream name. + * @param partitions the number of partitions. + */ + public SuperStream(String name, int partitions) { + super(declarables(name, partitions)); + } + + private static Declarable[] declarables(String name, int partitions) { + List declarables = new ArrayList<>(); + String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new); + declarables.add(new DirectExchange(name, true, false, Map.of("x-super-stream", true))); + for (int i = 0; i < partitions; i++) { + String rk = rks[i]; + Queue q = new Queue(name + "-" + rk, true, false, false, Map.of("x-queue-type", "stream")); + declarables.add(q); + declarables.add(new Binding(q.getName(), DestinationType.QUEUE, name, rk, + Map.of("x-stream-partition-order", i))); + } + return declarables.toArray(new Declarable[0]); + } + +} diff --git a/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java new file mode 100644 index 0000000000..4384089543 --- /dev/null +++ b/spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/config/SuperStreamProvisioningTests.java @@ -0,0 +1,77 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.rabbit.stream.config; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.core.Declarables; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.rabbit.stream.listener.AbstractIntegrationTests; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 3.0 + * + */ +@SpringJUnitConfig +public class SuperStreamProvisioningTests extends AbstractIntegrationTests { + + @Test + void provision(@Autowired Declarables declarables, @Autowired CachingConnectionFactory cf, + @Autowired RabbitAdmin admin) { + + assertThat(declarables.getDeclarables()).hasSize(7); + cf.createConnection(); + List queues = declarables.getDeclarablesByType(Queue.class); + assertThat(queues).extracting(que -> que.getName()).contains("test-0", "test-1", "test-2"); + queues.forEach(que -> admin.deleteQueue(que.getName())); + declarables.getDeclarablesByType(DirectExchange.class).forEach(ex -> admin.deleteExchange(ex.getName())); + } + + @Configuration + public static class Config { + + @Bean + CachingConnectionFactory cf() { + return new CachingConnectionFactory("localhost"); + } + + @Bean + RabbitAdmin admin(ConnectionFactory cf) { + return new RabbitAdmin(cf); + } + + @Bean + SuperStream superStream() { + return new SuperStream("test", 3); + } + + } + +}