Skip to content

Commit

Permalink
vars
Browse files Browse the repository at this point in the history
  • Loading branch information
h1alexbel committed Jun 21, 2023
1 parent 0a90d87 commit e14de76
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ final class ProducerConsumerTest extends KafkaITCase {

@Test
void createsProducerAndSendsMessage() throws Exception {
final String topic = "TEST-TOPIC";
final String key = "eo-kafka";
final String value = "rulezzz";
final Producer<String, String> producer =
new KfProducer<>(
new KfFlexible<>(
Expand Down Expand Up @@ -90,20 +93,20 @@ void createsProducerAndSendsMessage() throws Exception {
)
)
);
producer.send("testcontainers", new KfData<>("rulezzz", "TEST-TOPIC", 0));
producer.send(key, new KfData<>(value, topic, 0));
Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
final ConsumerRecords<String, String> records =
consumer.records("TEST-TOPIC", Duration.ofMillis(100L));
consumer.records(topic, Duration.ofMillis(100L));
if (records.isEmpty()) {
return false;
}
assertThat(records)
.hasSize(1)
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
.containsExactly(tuple("TEST-TOPIC", "testcontainers", "rulezzz"));
.containsExactly(tuple(topic, key, value));
return true;
}
);
Expand Down

0 comments on commit e14de76

Please sign in to comment.