Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
h1alexbel committed May 8, 2023
1 parent 5464d25 commit acd5890
Showing 1 changed file with 43 additions and 47 deletions.
90 changes: 43 additions & 47 deletions src/it/producer-consumer-api/src/test/java/EntryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@
*/

import io.github.eocqrs.kafka.Consumer;
import io.github.eocqrs.kafka.Dataized;
import io.github.eocqrs.kafka.Producer;
import io.github.eocqrs.kafka.admin.CreateTopics;
import io.github.eocqrs.kafka.consumer.KfConsumer;
import io.github.eocqrs.kafka.consumer.settings.KfConsumerParams;
import io.github.eocqrs.kafka.data.KfData;
import io.github.eocqrs.kafka.parameters.AutoOffsetReset;
import io.github.eocqrs.kafka.parameters.BootstrapServers;
import io.github.eocqrs.kafka.parameters.ClientId;
import io.github.eocqrs.kafka.parameters.GroupId;
import io.github.eocqrs.kafka.parameters.KeyDeserializer;
import io.github.eocqrs.kafka.parameters.KeySerializer;
import io.github.eocqrs.kafka.parameters.KfFlexible;
import io.github.eocqrs.kafka.parameters.KfParams;
import io.github.eocqrs.kafka.parameters.Retries;
import io.github.eocqrs.kafka.parameters.ValueDeserializer;
import io.github.eocqrs.kafka.parameters.ValueSerializer;
import io.github.eocqrs.kafka.producer.KfCallback;
Expand All @@ -45,21 +44,15 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.cactoos.list.ListOf;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
Expand All @@ -73,23 +66,20 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.*;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;
/**
* @todo #236:30m/DEV Enable tests
* @todo #290:30m/DEV Split ITCases into multiple files
*/

/**
* Entry test cases.
*
* @author Ivan Ivanchuk (l3r8y@duck.com)
* @author Aliaksei Bialiauski (abialiauski.dev@gmail.com)
* @since 0.0.2
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
Expand All @@ -99,6 +89,7 @@ final class EntryTest {
DockerImageName.parse("confluentinc/cp-kafka:7.3.0")
)
.withEnv("KAFKA_CREATE_TOPICS", "TEST-TOPIC")
.withEnv("auto.create.topics.enable", "true")
.withReuse(true)
.withLogConsumer(
new Slf4jLogConsumer(
Expand Down Expand Up @@ -140,7 +131,6 @@ void runsKafka() {
);
}

@Disabled
@Test
@Order(2)
void createsConsumerAndSubscribes() throws IOException {
Expand All @@ -159,11 +149,14 @@ void createsConsumerAndSubscribes() throws IOException {
)
)
) {
Assertions.assertDoesNotThrow(() -> consumer.subscribe(new ListOf<>("fake")));
Assertions.assertDoesNotThrow(
() -> consumer.subscribe(
"TEST-TOPIC"
)
);
}
}

@Disabled
@Test
@Order(3)
void createsProducerAndSendsData() throws IOException {
Expand All @@ -184,7 +177,7 @@ void createsProducerAndSendsData() throws IOException {
Assertions.assertDoesNotThrow(
() -> producer.send(
"fake-key",
new KfData<>("fake-data", "TEST-TOPIC", 1)
new KfData<>("fake-data", "FAKE-TOPIC", 1)
)
);
}
Expand All @@ -199,37 +192,41 @@ void createsProducerAndSendsMessage() throws Exception {
new CreateTopics(
admin,
new NewTopic("TEST-TOPIC", 1, (short) 1)
).value().get(30L, TimeUnit.SECONDS);
final KafkaProducer<String, String> producer = new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
EntryTest.servers,
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()
),
new StringSerializer(),
new StringSerializer()
);
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
EntryTest.servers,
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"
),
new StringDeserializer(),
new StringDeserializer()
).value()
.get(30L, TimeUnit.SECONDS);
final Producer<String, String> producer = new KfProducer<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
new BootstrapServers(EntryTest.servers),
new ClientId(UUID.randomUUID().toString()),
new KeySerializer(StringSerializer.class.getName()),
new ValueSerializer(StringSerializer.class.getName())
)
)
)
);
consumer.subscribe(Collections.singletonList("TEST-TOPIC"));
producer.send(new ProducerRecord<>("TEST-TOPIC", "testcontainers", "rulezzz")).get();
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers(EntryTest.servers),
new GroupId("it-" + UUID.randomUUID()),
new AutoOffsetReset("earliest"),
new KeyDeserializer(StringDeserializer.class.getName()),
new ValueDeserializer(StringDeserializer.class.getName())
)
)
)
);
producer.send("testcontainers", new KfData<>("rulezzz", "TEST-TOPIC", 0));
Unreliables.retryUntilTrue(
10,
TimeUnit.SECONDS,
() -> {
final ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100L));
consumer.records("TEST-TOPIC", Duration.ofMillis(100L));
if (records.isEmpty()) {
return false;
}
Expand All @@ -243,7 +240,6 @@ void createsProducerAndSendsMessage() throws Exception {
consumer.unsubscribe();
}

@Disabled
@Test
@Order(4)
void createsProducerWithCallback() throws IOException {
Expand All @@ -262,13 +258,13 @@ void createsProducerWithCallback() throws IOException {
(recordMetadata, e) ->
MatcherAssert.assertThat(
recordMetadata.topic(),
Matchers.equalTo("TEST-TOPIC")
Matchers.equalTo("TEST-CALLBACK")
)
)
) {
producer.send(
"test-key",
new KfData<>("test-data", "TEST-TOPIC", 1)
new KfData<>("test-data", "TEST-CALLBACK", 1)
);
}
}
Expand Down

0 comments on commit acd5890

Please sign in to comment.