Skip to content
Permalink
Browse files
(chores) camel-kafka: fix async test not correctly reporting commit f…
…ailures
  • Loading branch information
orpiske committed May 21, 2022
1 parent c2f6d79 commit 8c5c0d43774eb48eae5db272f9e4f868e76bfeb0
Showing 1 changed file with 15 additions and 1 deletion.
@@ -34,10 +34,14 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);

public static final String TOPIC = "testManualCommitTest";

@@ -57,6 +61,8 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo

private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;

private volatile int failCount;

@BeforeEach
public void before() {
Properties props = getDefaultProperties();
@@ -92,7 +98,13 @@ public void configure() {
KafkaManualCommit manual = e.getMessage().getBody(Exchange.class)
.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
assertNotNull(manual);
manual.commit();

try {
manual.commit();
} catch (Exception commitException) {
LOG.error("Failed to commit: {}", commitException.getMessage(), commitException);
failCount++;
}
});
from(from).routeId("bar").autoStartup(false).to(toBar);
}
@@ -139,6 +151,8 @@ public void kafkaManualCommit() throws Exception {
to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");

to.assertIsSatisfied(3000);

assertEquals(0, failCount, "There should have been 0 commit failures");
}

}

0 comments on commit 8c5c0d4

Please sign in to comment.