Skip to content

Commit

Permalink
Sometimes KafkaCdiExtensionTest fails (#1816)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Bescos Gascon <jorge.bescos.gascon@oracle.com>
  • Loading branch information
jbescos committed May 19, 2020
1 parent 456e999 commit c9cc6f0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
Expand All @@ -49,6 +51,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -98,6 +101,7 @@ public String value() {
public static final String TEST_TOPIC_8 = "graph-done-8";
public static final String TEST_TOPIC_10 = "graph-done-10";
public static final String TEST_TOPIC_13 = "graph-done-13";
public static final String UNEXISTING_TOPIC = "unexistingTopic2";
private final String KAFKA_SERVER = kafkaResource.getKafkaConnectString();

protected Map<String, String> cdiConfig() {
Expand Down Expand Up @@ -211,11 +215,12 @@ protected Map<String, String> cdiConfig() {
p.putAll(Map.of(
"mp.messaging.outgoing.test-channel-12.connector", KafkaConnector.CONNECTOR_NAME,
"mp.messaging.outgoing.test-channel-12.bootstrap.servers", KAFKA_SERVER,
"mp.messaging.outgoing.test-channel-12.topic", "unexistingTopic",
"mp.messaging.outgoing.test-channel-12.topic", UNEXISTING_TOPIC,
"mp.messaging.outgoing.test-channel-12.max.block.ms", "1000",
"mp.messaging.outgoing.test-channel-12.backpressure.size", "1",
"mp.messaging.outgoing.test-channel-12.batch.size", "1",
"mp.messaging.outgoing.test-channel-12.acks", "all",
"mp.messaging.outgoing.test-channel-12.acks", "1",
"mp.messaging.outgoing.test-channel-12.retries", "0",
"mp.messaging.outgoing.test-channel-12.key.serializer", LongSerializer.class.getName(),
"mp.messaging.outgoing.test-channel-12.value.serializer", StringSerializer.class.getName())
);
Expand Down Expand Up @@ -351,6 +356,7 @@ void withBackPressureAndError() {
kafkaConsumingBean.restart();
testData = Arrays.asList("not a number");
produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_5, Arrays.asList("error"));
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_5);
}

@Test
Expand Down Expand Up @@ -380,6 +386,7 @@ void someEventsNoAckWithOnePartition() {
kafkaConsumingBean = cdiContainer.select(AbstractSampleBean.Channel6.class).get();
// We should find the new message and all the previous not ACK
produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_6, uncommit);
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_6);
}

@Test
Expand All @@ -406,6 +413,7 @@ void someEventsNoAckWithDifferentPartitions() {
kafkaConsumingBean = cdiContainer.select(AbstractSampleBean.Channel8.class).get();
produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_8, Collections.emptyList(), uncommited + 1);
assertEquals(uncommited + 1, kafkaConsumingBean.consumed().size());
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_8);
}

@Test
Expand All @@ -429,6 +437,7 @@ void kafkaSubscriberConnectionError() throws InterruptedException {
// As the channel is cancelled, we cannot wait till something happens. We need to explicitly wait some time.
Thread.sleep(1000);
assertEquals(Collections.emptyList(), kafkaConsumingBean.consumed());
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_10);
}

@Test
Expand All @@ -444,6 +453,10 @@ void kafkaSubscriberSendError() throws InterruptedException {
// As the channel is cancelled, we cannot wait till something happens. We need to explicitly wait some time.
Thread.sleep(1000);
assertEquals(Collections.emptyList(), kafkaConsumingBean.consumed());
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_13);
// We create the topic, otherwise the kafka-producer-network-thread gets
// 'Error while fetching metadata with correlation id' many times
kafkaResource.getKafkaTestUtils().createTopic(UNEXISTING_TOPIC, 4, (short) 1);
}

private void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List<String> testData, String topic,
Expand All @@ -462,7 +475,15 @@ private void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List<String>
try (Producer<Object, String> producer = new KafkaProducer<>(config)) {
LOGGER.fine(() -> "Producing " + testData.size() + " events");
//Send all test messages(async send means order is not guaranteed) and in parallel
testData.parallelStream().map(s -> new ProducerRecord<>(topic, s)).forEach(msg -> producer.send(msg));
List<Future<RecordMetadata>> sent = testData.parallelStream()
.map(s -> producer.send(new ProducerRecord<>(topic, s))).collect(Collectors.toList());
sent.stream().forEach(future -> {
try {
future.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail("Some of next messages were not sent in time: " + testData, e);
}
});
}
if (requested > 0) {
// Wait till records are delivered
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/kafka/src/test/resources/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$

#All log level details
.level=SEVERE
org.apache.kafka.level=WARNING
org.apache.kafka.level=FINE
io.helidon.level=FINE
com.salesforce.kafka.level=WARNING
com.salesforce.kafka.level=FINE

# Known issue with meta.properties in embedded kafka server
#kafka.server.BrokerMetadataCheckpoint.level=SEVERE
Expand Down

0 comments on commit c9cc6f0

Please sign in to comment.