Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.it.testcontainers.pubsub.outbox;

import io.dapr.client.DaprClient;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.it.testcontainers.DaprClientFactory;
import io.dapr.testcontainers.Component;
import io.dapr.testcontainers.DaprContainer;
import io.dapr.testcontainers.DaprLogLevel;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;

import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG;

@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
classes = {
TestPubsubOutboxApplication.class
}
)
@Testcontainers
@Tag("testcontainers")
public class DaprPubSubOutboxIT {

private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubOutboxIT.class);
private static final Network DAPR_NETWORK = Network.newNetwork();
private static final Random RANDOM = new Random();
private static final int PORT = RANDOM.nextInt(1000) + 8000;
private static final String APP_FOUND_MESSAGE_PATTERN = ".*application discovered on port.*";

private static final String PUBSUB_APP_ID = "pubsub-dapr-app";
private static final String PUBSUB_NAME = "pubsub";

// topics
private static final String TOPIC_PRODUCT_CREATED = "product.created";
private static final String STATE_STORE_NAME = "kvstore";

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG)
.withAppName(PUBSUB_APP_ID)
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.in-memory", "v1", Map.of(
"outboxPublishPubsub", PUBSUB_NAME,
"outboxPublishTopic", TOPIC_PRODUCT_CREATED
)))
.withComponent(new Component(PUBSUB_NAME, "pubsub.in-memory", "v1", Collections.emptyMap()))
.withDaprLogLevel(DaprLogLevel.DEBUG)
.withLogConsumer(outputFrame -> LOG.info(outputFrame.getUtf8String()))
.withAppChannelAddress("host.testcontainers.internal")
.withAppPort(PORT);

/**
* Expose the Dapr ports to the host.
*
* @param registry the dynamic property registry
*/
@DynamicPropertySource
static void daprProperties(DynamicPropertyRegistry registry) {
registry.add("dapr.http.endpoint", DAPR_CONTAINER::getHttpEndpoint);
registry.add("dapr.grpc.endpoint", DAPR_CONTAINER::getGrpcEndpoint);
registry.add("server.port", () -> PORT);
}


@BeforeEach
public void setUp() {
org.testcontainers.Testcontainers.exposeHostPorts(PORT);
}


@Test
public void shouldPublishUsingOutbox() throws Exception {
Wait.forLogMessage(APP_FOUND_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER);

try (DaprClient client = DaprClientFactory.createDaprClientBuilder(DAPR_CONTAINER).build()) {

ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest(STATE_STORE_NAME);

Product pencil = new Product("Pencil", 1.50);
State<Product> state = new State<>(
pencil.getId(), pencil, null
);

TransactionalStateOperation<Product> operation = new TransactionalStateOperation<>(
TransactionalStateOperation.OperationType.UPSERT, state
);

transactionRequest.setOperations(List.of(operation));

client.executeStateTransaction(transactionRequest).block();

Awaitility.await().atMost(Duration.ofSeconds(10))
.ignoreExceptions()
.untilAsserted(() -> Assertions.assertThat(ProductWebhookController.EVENT_LIST).isNotEmpty());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers.pubsub.outbox;

import java.util.UUID;

public class Product {
private String id;
private String name;
private double price;

public Product() {
}

public Product(String name, double price) {
this.id = UUID.randomUUID().toString();
this.name = name;
this.price = price;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

@Override
public String toString() {
return "Product{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", price=" + price +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers.pubsub.outbox;

import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@RestController
@RequestMapping("/webhooks/products")
public class ProductWebhookController {

public static final List<CloudEvent<Product>> EVENT_LIST = new CopyOnWriteArrayList<>();

@PostMapping("/created")
@Topic(name = "product.created", pubsubName = "pubsub")
public void handleEvent(@RequestBody CloudEvent cloudEvent) {
System.out.println("Received product.created event: " + cloudEvent.getData());
EVENT_LIST.add(cloudEvent);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/
package io.dapr.it.testcontainers.pubsub.outbox;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestPubsubOutboxApplication {
public static void main(String[] args) {
SpringApplication.run(TestPubsubOutboxApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.dapr.springboot.examples.producer;

public class OrderDTO {

private String id;
private String item;
private Integer amount;

public OrderDTO() {
}

public OrderDTO(String id, String item, Integer amount) {
this.id = id;
this.item = item;
this.amount = amount;
}

public String getId() {
return id;
}


public String getItem() {
return item;
}

public Integer getAmount() {
return amount;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@

package io.dapr.springboot.examples.producer;

import io.dapr.client.DaprClient;
import io.dapr.client.domain.ExecuteStateTransactionRequest;
import io.dapr.client.domain.State;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.spring.data.repository.config.EnableDaprRepositories;
import io.dapr.spring.messaging.DaprMessagingTemplate;
import org.slf4j.Logger;
Expand All @@ -24,18 +28,23 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController
@EnableDaprRepositories
public class OrdersRestController {

private final Logger logger = LoggerFactory.getLogger(OrdersRestController.class);
private static final Logger logger = LoggerFactory.getLogger(OrdersRestController.class);

@Autowired
private OrderRepository repository;

@Autowired
private DaprMessagingTemplate<Order> messagingTemplate;

@Autowired
private DaprClient daprClient;

/**
* Store orders from customers.
* @param order from the customer
Expand All @@ -51,6 +60,28 @@ public String storeOrder(@RequestBody Order order) {
return "Order Stored and Event Published";
}

@PostMapping("/orders/outbox")
public String storeOrderOutbox(@RequestBody Order order) {
logger.info("Storing Order with Outbox: {}", order);
ExecuteStateTransactionRequest transactionRequest = new ExecuteStateTransactionRequest("kvstore-outbox");

State<Order> state = new State<>(
order.getId(), order, null
);

TransactionalStateOperation<Order> operation = new TransactionalStateOperation<>(
TransactionalStateOperation.OperationType.UPSERT, state
);

transactionRequest.setOperations(List.of(operation));

daprClient.executeStateTransaction(transactionRequest).block();

logger.info("Order Stored with Outbox: {}", order);

return "Order Stored with Outbox";
}

@GetMapping("/orders")
public Iterable<Order> getAll() {
return repository.findAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class DaprTestContainersConfig {
static final String CONNECTION_STRING =
"host=postgres user=postgres password=password port=5432 connect_timeout=10 database=dapr_db_repository";
static final Map<String, String> STATE_STORE_PROPERTIES = createStateStoreProperties();

static final Map<String, String> STATE_STORE_OUTBOX_PROPERTIES = createStateStoreOutboxProperties();
static final Map<String, String> BINDING_PROPERTIES = Collections.singletonMap("connectionString", CONNECTION_STRING);


Expand Down Expand Up @@ -118,9 +118,8 @@ public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> p
.withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES))
.withComponent(new Component("kvbinding", "bindings.postgresql", "v1", BINDING_PROPERTIES))
.withComponent(new Component("pubsub", "pubsub.rabbitmq", "v1", rabbitMqProperties))
.withComponent(new Component("kvstore-outbox", "state.postgresql", "v1", STATE_STORE_OUTBOX_PROPERTIES))
.withSubscription(new Subscription("app", "pubsub", "topic", "/subscribe"))
// .withDaprLogLevel(DaprLogLevel.DEBUG)
// .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
.withAppPort(8080)
.withAppHealthCheckPath("/actuator/health")
.withAppChannelAddress("host.testcontainers.internal")
Expand All @@ -139,5 +138,14 @@ private static Map<String, String> createStateStoreProperties() {
return result;
}

private static Map<String, String> createStateStoreOutboxProperties() {
Map<String, String> result = new HashMap<>();
result.put("connectionString", CONNECTION_STRING);
result.put("outboxPublishPubsub", "pubsub");
result.put("outboxPublishTopic", "outbox-topic");

return result;
}


}
Loading
Loading