diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java index abb3b5976973d..8ae4457533274 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java @@ -399,6 +399,10 @@ public boolean isPollerRunning() { return topicPoller.isRunning(); } + public boolean isCacheReady() { + return cacheReadyLatch.getCount() == 0; + } + private class TopicPoller implements Runnable { private final Logger log = LoggerFactory.getLogger(this.getClass()); diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java index cfb5ddd600677..4ad1a0f57b738 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java @@ -89,6 +89,7 @@ private void sendMessages(long count) { @Test @DisplayName("Checks that half of the messages pass and duplicates are blocked") public void testFirstPassFiltersAsExpected() { + await().until(() -> kafkaIdempotentRepository.isCacheReady()); int count = 10; sendMessages(count); diff --git a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/RedpandaTransactionsEnabledContainer.java b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/RedpandaTransactionsEnabledContainer.java index af6517f00c6ce..86e2018545838 100644 --- a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/RedpandaTransactionsEnabledContainer.java +++ b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/RedpandaTransactionsEnabledContainer.java @@ -22,7 +22,7 @@ public class RedpandaTransactionsEnabledContainer extends RedpandaContainer { - public static final String DEFAULT_REDPANDA_CONTAINER = "docker.redpanda.com/vectorized/redpanda:v22.3.9"; + public static final String DEFAULT_REDPANDA_CONTAINER = "docker.redpanda.com/vectorized/redpanda:v22.3.10"; public static final String REDPANDA_CONTAINER = System.getProperty("itest.redpanda.container.image", DEFAULT_REDPANDA_CONTAINER); public static final int REDPANDA_PORT = 9092; @@ -33,10 +33,7 @@ public RedpandaTransactionsEnabledContainer(String image) { protected void containerIsStarting(InspectContainerResponse containerInfo) { super.containerIsStarting(containerInfo); - String command = "#!/bin/bash\n"; - // enable transactions - command += "/usr/bin/rpk redpanda config set redpanda.enable_transactions true\n"; command += "/usr/bin/rpk redpanda start --mode dev-container "; command += "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ";