Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sometimes KafkaCdiExtensionTest fails #1816

Merged
merged 1 commit into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
Expand All @@ -49,6 +51,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -98,6 +101,7 @@ public String value() {
public static final String TEST_TOPIC_8 = "graph-done-8";
public static final String TEST_TOPIC_10 = "graph-done-10";
public static final String TEST_TOPIC_13 = "graph-done-13";
public static final String UNEXISTING_TOPIC = "unexistingTopic2";
private final String KAFKA_SERVER = kafkaResource.getKafkaConnectString();

protected Map<String, String> cdiConfig() {
Expand Down Expand Up @@ -211,11 +215,12 @@ protected Map<String, String> cdiConfig() {
p.putAll(Map.of(
"mp.messaging.outgoing.test-channel-12.connector", KafkaConnector.CONNECTOR_NAME,
"mp.messaging.outgoing.test-channel-12.bootstrap.servers", KAFKA_SERVER,
"mp.messaging.outgoing.test-channel-12.topic", "unexistingTopic",
"mp.messaging.outgoing.test-channel-12.topic", UNEXISTING_TOPIC,
"mp.messaging.outgoing.test-channel-12.max.block.ms", "1000",
"mp.messaging.outgoing.test-channel-12.backpressure.size", "1",
"mp.messaging.outgoing.test-channel-12.batch.size", "1",
"mp.messaging.outgoing.test-channel-12.acks", "all",
"mp.messaging.outgoing.test-channel-12.acks", "1",
"mp.messaging.outgoing.test-channel-12.retries", "0",
"mp.messaging.outgoing.test-channel-12.key.serializer", LongSerializer.class.getName(),
"mp.messaging.outgoing.test-channel-12.value.serializer", StringSerializer.class.getName())
);
Expand Down Expand Up @@ -351,6 +356,7 @@ void withBackPressureAndError() {
kafkaConsumingBean.restart();
testData = Arrays.asList("not a number");
produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_5, Arrays.asList("error"));
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_5);
}

@Test
Expand Down Expand Up @@ -380,6 +386,7 @@ void someEventsNoAckWithOnePartition() {
kafkaConsumingBean = cdiContainer.select(AbstractSampleBean.Channel6.class).get();
// We should find the new message and all the previous not ACK
produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_6, uncommit);
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_6);
}

@Test
Expand All @@ -406,6 +413,7 @@ void someEventsNoAckWithDifferentPartitions() {
kafkaConsumingBean = cdiContainer.select(AbstractSampleBean.Channel8.class).get();
produceAndCheck(kafkaConsumingBean, testData, TEST_TOPIC_8, Collections.emptyList(), uncommited + 1);
assertEquals(uncommited + 1, kafkaConsumingBean.consumed().size());
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_8);
}

@Test
Expand All @@ -429,6 +437,7 @@ void kafkaSubscriberConnectionError() throws InterruptedException {
// As the channel is cancelled, we cannot wait till something happens. We need to explicitly wait some time.
Thread.sleep(1000);
assertEquals(Collections.emptyList(), kafkaConsumingBean.consumed());
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_10);
}

@Test
Expand All @@ -444,6 +453,10 @@ void kafkaSubscriberSendError() throws InterruptedException {
// As the channel is cancelled, we cannot wait till something happens. We need to explicitly wait some time.
Thread.sleep(1000);
assertEquals(Collections.emptyList(), kafkaConsumingBean.consumed());
kafkaResource.getKafkaTestUtils().consumeAllRecordsFromTopic(TEST_TOPIC_13);
// We create the topic, otherwise the kafka-producer-network-thread gets
// 'Error while fetching metadata with correlation id' many times
kafkaResource.getKafkaTestUtils().createTopic(UNEXISTING_TOPIC, 4, (short) 1);
}

private void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List<String> testData, String topic,
Expand All @@ -462,7 +475,15 @@ private void produceAndCheck(AbstractSampleBean kafkaConsumingBean, List<String>
try (Producer<Object, String> producer = new KafkaProducer<>(config)) {
LOGGER.fine(() -> "Producing " + testData.size() + " events");
//Send all test messages(async send means order is not guaranteed) and in parallel
testData.parallelStream().map(s -> new ProducerRecord<>(topic, s)).forEach(msg -> producer.send(msg));
List<Future<RecordMetadata>> sent = testData.parallelStream()
.map(s -> producer.send(new ProducerRecord<>(topic, s))).collect(Collectors.toList());
sent.stream().forEach(future -> {
try {
future.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
fail("Some of next messages were not sent in time: " + testData, e);
}
});
}
if (requested > 0) {
// Wait till records are delivered
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/kafka/src/test/resources/logging.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$

#All log level details
.level=SEVERE
org.apache.kafka.level=WARNING
org.apache.kafka.level=FINE
io.helidon.level=FINE
com.salesforce.kafka.level=WARNING
com.salesforce.kafka.level=FINE

# Known issue with meta.properties in embedded kafka server
#kafka.server.BrokerMetadataCheckpoint.level=SEVERE
Expand Down