Skip to content

Commit

Permalink
debug
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Apr 17, 2024
1 parent 4954c57 commit 54c0e9e
Showing 1 changed file with 61 additions and 33 deletions.
Expand Up @@ -16,7 +16,14 @@
*/
package org.apache.kafka.tools.consumer.group;

import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -29,8 +36,8 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.Defaults;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Collections;
import java.util.Map;
Expand All @@ -42,19 +49,40 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest {
@Tag("integration")
@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"),
@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1")
})
public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
private final ClusterInstance clusterInstance;
public static final String TOPIC = "foo";
public static final String GROUP = "test.group";

DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { // Constructor injections
this.clusterInstance = clusterInstance;
}

String[] getArgs(String group, String topic) {
return new String[] {
"--bootstrap-server", bootstrapServers(listenerName()),
"--bootstrap-server", clusterInstance.bootstrapServers(),
"--delete-offsets",
"--group", group,
"--topic", topic
};
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsNonExistingGroup(String quorum) {
ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {
ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs(args);

return new ConsumerGroupCommand.ConsumerGroupService(
opts,
Collections.singletonMap(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE))
);
}

@ClusterTest
public void testDeleteOffsetsNonExistingGroup() {
String group = "missing.group";
String topic = "foo:1";
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic));
Expand All @@ -63,51 +91,47 @@ public void testDeleteOffsetsNonExistingGroup(String quorum) {
assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
createTopic(TOPIC);
testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
createTopic(TOPIC);
testWithStableConsumerGroup(TOPIC, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
testWithStableConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
testWithStableConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
createTopic(TOPIC);
testWithEmptyConsumerGroup(TOPIC, 0, 0, Errors.NONE);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
createTopic(TOPIC);
testWithEmptyConsumerGroup(TOPIC, -1, 0, Errors.NONE);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
testWithEmptyConsumerGroup("foobar", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}

@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly(String quorum) {
@ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
testWithEmptyConsumerGroup("foobar", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
}

Expand Down Expand Up @@ -193,7 +217,7 @@ private void withEmptyConsumerGroup(Runnable body) {
}

private KafkaProducer<byte[], byte[]> createProducer(Properties config) {
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName()));
config.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "-1");
config.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
config.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
Expand All @@ -202,7 +226,7 @@ private KafkaProducer<byte[], byte[]> createProducer(Properties config) {
}

private Consumer<byte[], byte[]> createConsumer(Properties config) {
config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers(listenerName()));
config.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
config.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, GROUP);
config.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
Expand All @@ -213,4 +237,8 @@ private Consumer<byte[], byte[]> createConsumer(Properties config) {

return new KafkaConsumer<>(config);
}

private void createTopic(String topic) {
TestUtils.createTopicWithAdminRaw(clusterInstance.createAdminClient(), topic, 1, 1, scala.collection.immutable.Map$.MODULE$.empty(), new Properties());
}
}

0 comments on commit 54c0e9e

Please sign in to comment.