diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java new file mode 100644 index 0000000000..423ae05e55 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/DaprPubSubOutboxIT.java @@ -0,0 +1,130 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.it.testcontainers.pubsub.outbox; + +import io.dapr.client.DaprClient; +import io.dapr.client.domain.ExecuteStateTransactionRequest; +import io.dapr.client.domain.State; +import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.it.testcontainers.DaprClientFactory; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import org.assertj.core.api.Assertions; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT, + classes = { + TestPubsubOutboxApplication.class + } +) +@Testcontainers +@Tag("testcontainers") +public class DaprPubSubOutboxIT { + + private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubOutboxIT.class); + private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final Random RANDOM = new Random(); + private static final int PORT = RANDOM.nextInt(1000) + 8000; + private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*"; + + private static final String PUBSUB_APP_ID = "pubsub-dapr-app"; + private static final String PUBSUB_NAME = "pubsub"; + + // topics + private static final String TOPIC_PRODUCT_CREATED = "product.created"; + private static final String STATE_STORE_NAME = "kvstore"; + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName(PUBSUB_APP_ID) + .withNetwork(DAPR_NETWORK) + .withComponent(new Component(STATE_STORE_NAME, "state.in-memory", "v1", Map.of( + "outboxPublishPubsub", PUBSUB_NAME, + "outboxPublishTopic", TOPIC_PRODUCT_CREATED + ))) + .withComponent(new Component(PUBSUB_NAME, "pubsub.in-memory", "v1", Collections.emptyMap())) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> LOG.info(outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal") + .withAppPort(PORT); + + /** + * Expose the Dapr ports to the host. + * + * @param registry the dynamic property registry + */ + @DynamicPropertySource + static void daprProperties(DynamicPropertyRegistry registry) { + registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint); + registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint); + registry.add("server.port", () -> PORT); + } + + + @BeforeEach + public void setUp() { + org.testcontainers.Testcontainers.exposeHostPorts(PORT); + } + + + @Test + public void shouldPublishUsingOutbox() throws Exception { + Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER); + + try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) { + + ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest(STATE_STORE_NAME); + + Product pencil = new Product("Pencil", 1.50); + State state = new State<>( + pencil.getId(), pencil, null + ); + + TransactionalStateOperation operation = new TransactionalStateOperation<>( + TransactionalStateOperation.OperationType.UPSERT, state + ); + + transactionRequest.setOperations(List.of(operation)); + + client.executeStateTransaction(transactionRequest).block(); + + Awaitility.await().atMost(Duration.ofSeconds(10)) + .ignoreExceptions() + .untilAsserted(() -> Assertions.assertThat(ProductWebhookController.EVENT_LIST).isNotEmpty()); + } + } + +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/Product.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/Product.java new file mode 100644 index 0000000000..62c37d320c --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/Product.java @@ -0,0 +1,63 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.outbox; + +import java.util.UUID; + +public class Product { + private String id; + private String name; + private double price; + + public Product() { + } + + public Product(String name, double price) { + this.id = UUID.randomUUID().toString(); + this.name = name; + this.price = price; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public double getPrice() { + return price; + } + + public void setPrice(double price) { + this.price = price; + } + + @Override + public String toString() { + return "Product{" + + "id='" + id + '\'' + + ", name='" + name + '\'' + + ", price=" + price + + '}'; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/ProductWebhookController.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/ProductWebhookController.java new file mode 100644 index 0000000000..283dabf887 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/ProductWebhookController.java @@ -0,0 +1,37 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.outbox; + +import io.dapr.Topic; +import io.dapr.client.domain.CloudEvent; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +@RestController +@RequestMapping("/webhooks/products") +public class ProductWebhookController { + + public static final List> EVENT_LIST = new CopyOnWriteArrayList<>(); + + @PostMapping("/created") + @Topic(name = "product.created", pubsubName = "pubsub") + public void handleEvent(@RequestBody CloudEvent cloudEvent) { + System.out.println("Received product.created event: " + cloudEvent.getData()); + EVENT_LIST.add(cloudEvent); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/TestPubsubOutboxApplication.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/TestPubsubOutboxApplication.java new file mode 100644 index 0000000000..88a64f6582 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/outbox/TestPubsubOutboxApplication.java @@ -0,0 +1,23 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.dapr.it.testcontainers.pubsub.outbox; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class TestPubsubOutboxApplication { + public static void main(String[] args) { + SpringApplication.run(TestPubsubOutboxApplication.class, args); + } +} diff --git a/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrderDTO.java b/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrderDTO.java new file mode 100644 index 0000000000..68868e4310 --- /dev/null +++ b/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrderDTO.java @@ -0,0 +1,31 @@ +package io.dapr.springboot.examples.producer; + +public class OrderDTO { + + private String id; + private String item; + private Integer amount; + + public OrderDTO() { + } + + public OrderDTO(String id, String item, Integer amount) { + this.id = id; + this.item = item; + this.amount = amount; + } + + public String getId() { + return id; + } + + + public String getItem() { + return item; + } + + public Integer getAmount() { + return amount; + } + +} diff --git a/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrdersRestController.java b/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrdersRestController.java index 90384b8c05..15a1854635 100644 --- a/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrdersRestController.java +++ b/spring-boot-examples/producer-app/src/main/java/io/dapr/springboot/examples/producer/OrdersRestController.java @@ -13,6 +13,10 @@ package io.dapr.springboot.examples.producer; +import io.dapr.client.DaprClient; +import io.dapr.client.domain.ExecuteStateTransactionRequest; +import io.dapr.client.domain.State; +import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.spring.data.repository.config.EnableDaprRepositories; import io.dapr.spring.messaging.DaprMessagingTemplate; import org.slf4j.Logger; @@ -24,11 +28,13 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import java.util.List; + @RestController @EnableDaprRepositories public class OrdersRestController { - private final Logger logger = LoggerFactory.getLogger(OrdersRestController.class); + private static final Logger logger = LoggerFactory.getLogger(OrdersRestController.class); @Autowired private OrderRepository repository; @@ -36,6 +42,9 @@ public class OrdersRestController { @Autowired private DaprMessagingTemplate messagingTemplate; + @Autowired + private DaprClient daprClient; + /** * Store orders from customers. * @param order from the customer @@ -51,6 +60,28 @@ public String storeOrder(@RequestBody Order order) { return "Order Stored and Event Published"; } + @PostMapping("/orders/outbox") + public String storeOrderOutbox(@RequestBody Order order) { + logger.info("Storing Order with Outbox: {}", order); + ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest("kvstore-outbox"); + + State state = new State<>( + order.getId(), order, null + ); + + TransactionalStateOperation operation = new TransactionalStateOperation<>( + TransactionalStateOperation.OperationType.UPSERT, state + ); + + transactionRequest.setOperations(List.of(operation)); + + daprClient.executeStateTransaction(transactionRequest).block(); + + logger.info("Order Stored with Outbox: {}", order); + + return "Order Stored with Outbox"; + } + @GetMapping("/orders") public Iterable getAll() { return repository.findAll(); diff --git a/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/DaprTestContainersConfig.java b/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/DaprTestContainersConfig.java index 95603a36f1..5de8d19504 100644 --- a/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/DaprTestContainersConfig.java +++ b/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/DaprTestContainersConfig.java @@ -41,7 +41,7 @@ public class DaprTestContainersConfig { static final String CONNECTION_STRING = "host=postgres user=postgres password=password port=5432 connect_timeout=10 database=dapr_db_repository"; static final Map STATE_STORE_PROPERTIES = createStateStoreProperties(); - + static final Map STATE_STORE_OUTBOX_PROPERTIES = createStateStoreOutboxProperties(); static final Map BINDING_PROPERTIES = Collections.singletonMap("connectionString", CONNECTION_STRING); @@ -118,9 +118,8 @@ public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer p .withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES)) .withComponent(new Component("kvbinding", "bindings.postgresql", "v1", BINDING_PROPERTIES)) .withComponent(new Component("pubsub", "pubsub.rabbitmq", "v1", rabbitMqProperties)) + .withComponent(new Component("kvstore-outbox", "state.postgresql", "v1", STATE_STORE_OUTBOX_PROPERTIES)) .withSubscription(new Subscription("app", "pubsub", "topic", "/subscribe")) -// .withDaprLogLevel(DaprLogLevel.DEBUG) -// .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) .withAppPort(8080) .withAppHealthCheckPath("/actuator/health") .withAppChannelAddress("host.testcontainers.internal") @@ -139,5 +138,14 @@ private static Map createStateStoreProperties() { return result; } + private static Map createStateStoreOutboxProperties() { + Map result = new HashMap<>(); + result.put("connectionString", CONNECTION_STRING); + result.put("outboxPublishPubsub", "pubsub"); + result.put("outboxPublishTopic", "outbox-topic"); + + return result; + } + } diff --git a/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/ProducerAppIT.java b/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/ProducerAppIT.java index 24fa34c6fb..a0c561b931 100644 --- a/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/ProducerAppIT.java +++ b/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/ProducerAppIT.java @@ -21,6 +21,7 @@ import io.dapr.testcontainers.DaprContainer; import io.restassured.RestAssured; import io.restassured.http.ContentType; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -67,12 +68,32 @@ void setUp() { } + @AfterEach + void cleanUp() { + controller.getAllEvents().clear(); + } @Test - void testOrdersEndpointAndMessaging() throws InterruptedException, IOException { + void testOrdersOutboxEndpointAndMessaging() { + OrderDTO order = new OrderDTO("outbox-order-123", "Lorem ipsum", 1000); + + given().contentType(ContentType.JSON) + .body(order) + .when() + .post("/orders/outbox") + .then() + .statusCode(200); + + await().atMost(Duration.ofSeconds(15)) + .until(controller.getAllEvents()::size, equalTo(1)); + + } + @Test + void testOrdersEndpointAndMessaging() { + OrderDTO order = new OrderDTO("abc-123", "the mars volta LP", 1); given().contentType(ContentType.JSON) - .body("{ \"id\": \"abc-123\",\"item\": \"the mars volta LP\",\"amount\": 1}") + .body(order) .when() .post("/orders") .then() @@ -118,7 +139,7 @@ void testOrdersEndpointAndMessaging() throws InterruptedException, IOException { } @Test - void testCustomersWorkflows() throws InterruptedException, IOException { + void testCustomersWorkflows() { given().contentType(ContentType.JSON) .body("{\"customerName\": \"salaboy\"}") diff --git a/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/TestSubscriberRestController.java b/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/TestSubscriberRestController.java index 0f39dd9a97..0f69e28aa2 100644 --- a/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/TestSubscriberRestController.java +++ b/spring-boot-examples/producer-app/src/test/java/io/dapr/springboot/examples/producer/TestSubscriberRestController.java @@ -38,6 +38,15 @@ public void subscribe(@RequestBody CloudEvent cloudEvent){ events.add(cloudEvent); } + @PostMapping("outbox-subscribe") + @Topic(pubsubName = "pubsub", name = "outbox-topic") + public void outboxSubscribe(@RequestBody CloudEvent cloudEvent) { + // we are receiving the Order with CloudEvent as String due to the + // following issue https://github.com/dapr/java-sdk/issues/1580 + logger.info("Outbox Order Event Received: " + cloudEvent.getData()); + events.add(cloudEvent); + } + public List getAllEvents() { return events; }