diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index b9230b2081280..d406d8e2ebad4 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -16,24 +16,53 @@ */ package org.apache.kafka.tiered.storage.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfig; +import org.apache.kafka.common.test.api.ClusterTemplate; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; -import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.tiered.storage.specs.KeyValueSpec; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage; -public final class AlterLogDirTest extends TieredStorageTestHarness { +public final class AlterLogDirTest { - @Override - public int brokerCount() { - return 2; + private static final int BROKER_COUNT = 3; + + private static List clusterConfig() { + return List.of(ClusterConfig.defaultBuilder() + .setTypes(Set.of(Type.KRAFT)) + .setBrokers(BROKER_COUNT) + .setDisksPerBroker(2) + .setServerProperties(createServerPropsForRemoteStorage( + AlterLogDirTest.class.getSimpleName().toLowerCase(Locale.ROOT), + BROKER_COUNT, + 5)) + .build()); + } + + @ClusterTemplate("clusterConfig") + public void testAlterLogDirWithClassicGroupProtocol(ClusterInstance clusterInstance) throws Exception { + executeAlterLogDirTest(clusterInstance, GroupProtocol.CLASSIC); } - @Override - protected void writeTestSpecifications(TieredStorageTestBuilder builder) { + @ClusterTemplate("clusterConfig") + public void testAlterLogDirWithConsumerGroupProtocol(ClusterInstance clusterInstance) throws Exception { + executeAlterLogDirTest(clusterInstance, GroupProtocol.CONSUMER); + } + + private void executeAlterLogDirTest(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws Exception { final String topicB = "topicB"; final int p0 = 0; final int partitionCount = 1; @@ -43,8 +72,9 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { final int broker0 = 0; final int broker1 = 1; + TieredStorageTestBuilder builder = new TieredStorageTestBuilder(); builder - // create topicB with 1 partition and 1 RF + // create topicB with 1 partition and 2 RF .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment, mkMap(mkEntry(p0, List.of(broker1, broker0))), enableRemoteLogStorage) // send records to partition 0 @@ -63,5 +93,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { // consume from the beginning of the topic to read data from local and remote storage .expectFetchFromTieredStorage(broker0, topicB, p0, 3) .consume(topicB, p0, 0L, 4, 3); + + Map extraConsumerProps = Map.of( + ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT) + ); + try (TieredStorageTestContext context = new TieredStorageTestContext(clusterInstance, extraConsumerProps)) { + try { + for (TieredStorageTestAction action : builder.complete()) { + action.execute(context); + } + } finally { + context.printReport(System.out); + } + } } } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java index 0eab6c6f6851a..dff3347248a4a 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java @@ -23,11 +23,9 @@ import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.test.api.ClusterTemplate; -import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.config.ServerLogConfigs; -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tiered.storage.integration.TransactionsTestHelper.TransactionHooks; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; @@ -35,12 +33,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC; -import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage; +import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage; /** @@ -56,10 +53,7 @@ public class TransactionsWithTieredStoreTest { private static final int BROKER_COUNT = 3; private static Map baseServerProperties() { - String storageDirPath = TestUtils.tempDirectory( - "kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath(); - - Map serverProps = new HashMap<>(); + Map serverProps = createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 3); serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"); serverProps.put("offsets.topic.num.partitions", "1"); serverProps.put("transaction.state.log.num.partitions", "3"); @@ -70,14 +64,6 @@ private static Map baseServerProperties() { serverProps.put("auto.leader.rebalance.enable", "false"); serverProps.put("group.initial.rebalance.delay.ms", "0"); serverProps.put("transaction.abort.timed.out.transaction.cleanup.interval.ms", "200"); - - Properties tieredProps = createPropsForRemoteStorage( - TEST_CLASS_NAME, storageDirPath, BROKER_COUNT, 3, new Properties()); - tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); - - serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME); - return serverProps; } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index bc4ffdf238f38..cd0d243da04e3 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.record.internal.Record; +import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; @@ -164,6 +165,24 @@ public static Properties createPropsForRemoteStorage(String testClassName, return overridingProps; } + public static Map createServerPropsForRemoteStorage( + String testClassName, + int brokerCount, + int numRemoteLogMetadataPartitions + ) { + String storageDirPath = org.apache.kafka.test.TestUtils + .tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath(); + Properties tieredProps = createPropsForRemoteStorage( + testClassName, storageDirPath, brokerCount, numRemoteLogMetadataPartitions, new Properties()); + Map serverProps = new HashMap<>(); + tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); + serverProps.put( + REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME + ); + return serverProps; + } + public static Map createTopicConfigForRemoteStorage(boolean enableRemoteStorage, int maxRecordBatchPerSegment) { Map topicProps = new HashMap<>();