From 5154e06bf54ba2443ffd73c52a52a5bb8f484647 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 17 May 2026 15:55:16 +0800 Subject: [PATCH 1/5] completed the feature --- .../storage/integration/AlterLogDirTest.java | 82 +++++++++++++++++-- 1 file changed, 74 insertions(+), 8 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index b9230b2081280..409096c83125d 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -16,24 +16,76 @@ */ package org.apache.kafka.tiered.storage.integration; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfig; +import org.apache.kafka.common.test.api.ClusterTemplate; +import org.apache.kafka.common.test.api.TestKitDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; -import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import java.util.HashMap; +import java.util.Locale; import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage; -public final class AlterLogDirTest extends TieredStorageTestHarness { +public final class AlterLogDirTest { - @Override - public int brokerCount() { - return 2; + private static final String TEST_CLASS_NAME = "alterlogdirtest"; + private static final int BROKER_COUNT = 2; + + private static List clusterConfig() { + String storageDirPath = TestUtils + .tempDirectory("kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath(); + + Properties tieredProps = createPropsForRemoteStorage( + TEST_CLASS_NAME, + storageDirPath, + BROKER_COUNT, + 5, + new Properties() + ); + + Map serverProps = new HashMap<>(); + tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); + + // TestKit registers brokers under "EXTERNAL"; createPropsForRemoteStorage defaults to "PLAINTEXT" + serverProps.put( + RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME + ); + + return List.of(ClusterConfig.defaultBuilder() + .setTypes(Set.of(Type.KRAFT)) + .setBrokers(BROKER_COUNT) + .setDisksPerBroker(2) + .setServerProperties(serverProps) + .build()); } - @Override - protected void writeTestSpecifications(TieredStorageTestBuilder builder) { + @ClusterTemplate("clusterConfig") + public void testAlterLogDirWithClassicGroupProtocol(ClusterInstance clusterInstance) throws Exception { + executeAlterLogDirTest(clusterInstance, GroupProtocol.CLASSIC); + } + + @ClusterTemplate("clusterConfig") + public void testAlterLogDirWithConsumerGroupProtocol(ClusterInstance clusterInstance) throws Exception { + executeAlterLogDirTest(clusterInstance, GroupProtocol.CONSUMER); + } + + private void executeAlterLogDirTest(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws Exception { final String topicB = "topicB"; final int p0 = 0; final int partitionCount = 1; @@ -43,8 +95,9 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { final int broker0 = 0; final int broker1 = 1; + TieredStorageTestBuilder builder = new TieredStorageTestBuilder(); builder - // create topicB with 1 partition and 1 RF + // create topicB with 1 partition and 2 RF .createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment, mkMap(mkEntry(p0, List.of(broker1, broker0))), enableRemoteLogStorage) // send records to partition 0 @@ -63,5 +116,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) { // consume from the beginning of the topic to read data from local and remote storage .expectFetchFromTieredStorage(broker0, topicB, p0, 3) .consume(topicB, p0, 0L, 4, 3); + + Map extraConsumerProps = Map.of( + ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT) + ); + try (TieredStorageTestContext context = new TieredStorageTestContext(clusterInstance, extraConsumerProps)) { + try { + for (TieredStorageTestAction action : builder.complete()) { + action.execute(context); + } + } finally { + context.printReport(System.out); + } + } } } From c70ffa222fc1d6dd1f0eb31ef6508dd1a7a5a9b0 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 18 May 2026 18:05:20 +0800 Subject: [PATCH 2/5] spotless --- .../kafka/tiered/storage/integration/AlterLogDirTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index 409096c83125d..bbb5c368fd77e 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -31,8 +31,8 @@ import org.apache.kafka.tiered.storage.specs.KeyValueSpec; import java.util.HashMap; -import java.util.Locale; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; import java.util.Set; From f6307fa3182b8bdd8d2fc3f46f472aea32519e0b Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 18 May 2026 19:37:19 +0800 Subject: [PATCH 3/5] extract the config --- .../storage/integration/AlterLogDirTest.java | 29 ++----------------- .../TransactionsWithTieredStoreTest.java | 18 ++---------- .../storage/utils/TieredStorageTestUtils.java | 14 +++++++++ 3 files changed, 18 insertions(+), 43 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index bbb5c368fd77e..90233794ea641 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -21,25 +21,20 @@ import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.test.api.ClusterTemplate; -import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.Type; -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; -import org.apache.kafka.test.TestUtils; import org.apache.kafka.tiered.storage.TieredStorageTestAction; import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; import org.apache.kafka.tiered.storage.TieredStorageTestContext; import org.apache.kafka.tiered.storage.specs.KeyValueSpec; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Properties; import java.util.Set; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage; +import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage; public final class AlterLogDirTest { @@ -47,31 +42,11 @@ public final class AlterLogDirTest { private static final int BROKER_COUNT = 2; private static List clusterConfig() { - String storageDirPath = TestUtils - .tempDirectory("kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath(); - - Properties tieredProps = createPropsForRemoteStorage( - TEST_CLASS_NAME, - storageDirPath, - BROKER_COUNT, - 5, - new Properties() - ); - - Map serverProps = new HashMap<>(); - tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); - - // TestKit registers brokers under "EXTERNAL"; createPropsForRemoteStorage defaults to "PLAINTEXT" - serverProps.put( - RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME - ); - return List.of(ClusterConfig.defaultBuilder() .setTypes(Set.of(Type.KRAFT)) .setBrokers(BROKER_COUNT) .setDisksPerBroker(2) - .setServerProperties(serverProps) + .setServerProperties(createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 5)) .build()); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java index 0eab6c6f6851a..dff3347248a4a 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java @@ -23,11 +23,9 @@ import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfig; import org.apache.kafka.common.test.api.ClusterTemplate; -import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.test.api.Type; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.config.ServerLogConfigs; -import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tiered.storage.integration.TransactionsTestHelper.TransactionHooks; import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; @@ -35,12 +33,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC; -import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage; +import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage; import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage; /** @@ -56,10 +53,7 @@ public class TransactionsWithTieredStoreTest { private static final int BROKER_COUNT = 3; private static Map baseServerProperties() { - String storageDirPath = TestUtils.tempDirectory( - "kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath(); - - Map serverProps = new HashMap<>(); + Map serverProps = createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 3); serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"); serverProps.put("offsets.topic.num.partitions", "1"); serverProps.put("transaction.state.log.num.partitions", "3"); @@ -70,14 +64,6 @@ private static Map baseServerProperties() { serverProps.put("auto.leader.rebalance.enable", "false"); serverProps.put("group.initial.rebalance.delay.ms", "0"); serverProps.put("transaction.abort.timed.out.transaction.cleanup.interval.ms", "200"); - - Properties tieredProps = createPropsForRemoteStorage( - TEST_CLASS_NAME, storageDirPath, BROKER_COUNT, 3, new Properties()); - tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); - - serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME); - return serverProps; } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index bc4ffdf238f38..ca0c2b2ba78df 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.record.internal.Record; +import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; @@ -164,6 +165,19 @@ public static Properties createPropsForRemoteStorage(String testClassName, return overridingProps; } + public static Map createServerPropsForRemoteStorage( + String testClassName, int brokerCount, int numRemoteLogMetadataPartitions) { + String storageDirPath = org.apache.kafka.test.TestUtils + .tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath(); + Properties tieredProps = createPropsForRemoteStorage( + testClassName, storageDirPath, brokerCount, numRemoteLogMetadataPartitions, new Properties()); + Map serverProps = new HashMap<>(); + tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); + serverProps.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME); + return serverProps; + } + public static Map createTopicConfigForRemoteStorage(boolean enableRemoteStorage, int maxRecordBatchPerSegment) { Map topicProps = new HashMap<>(); From 05b9fa38ed58f5b3a1ef8e277ccc461d6c42b484 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 19 May 2026 20:02:29 +0800 Subject: [PATCH 4/5] addressed by comment --- .../kafka/tiered/storage/integration/AlterLogDirTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index 90233794ea641..eb720d18c30d0 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -39,7 +39,7 @@ public final class AlterLogDirTest { private static final String TEST_CLASS_NAME = "alterlogdirtest"; - private static final int BROKER_COUNT = 2; + private static final int BROKER_COUNT = 3; private static List clusterConfig() { return List.of(ClusterConfig.defaultBuilder() From 58884926180a1dcf1efb6980c47da31dfdcea850 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Tue, 19 May 2026 21:19:27 +0800 Subject: [PATCH 5/5] addressed by comment --- .../tiered/storage/integration/AlterLogDirTest.java | 6 ++++-- .../tiered/storage/utils/TieredStorageTestUtils.java | 11 ++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java index eb720d18c30d0..d406d8e2ebad4 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java @@ -38,7 +38,6 @@ public final class AlterLogDirTest { - private static final String TEST_CLASS_NAME = "alterlogdirtest"; private static final int BROKER_COUNT = 3; private static List clusterConfig() { @@ -46,7 +45,10 @@ private static List clusterConfig() { .setTypes(Set.of(Type.KRAFT)) .setBrokers(BROKER_COUNT) .setDisksPerBroker(2) - .setServerProperties(createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 5)) + .setServerProperties(createServerPropsForRemoteStorage( + AlterLogDirTest.class.getSimpleName().toLowerCase(Locale.ROOT), + BROKER_COUNT, + 5)) .build()); } diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java index ca0c2b2ba78df..cd0d243da04e3 100644 --- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java @@ -166,15 +166,20 @@ public static Properties createPropsForRemoteStorage(String testClassName, } public static Map createServerPropsForRemoteStorage( - String testClassName, int brokerCount, int numRemoteLogMetadataPartitions) { + String testClassName, + int brokerCount, + int numRemoteLogMetadataPartitions + ) { String storageDirPath = org.apache.kafka.test.TestUtils .tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath(); Properties tieredProps = createPropsForRemoteStorage( testClassName, storageDirPath, brokerCount, numRemoteLogMetadataPartitions, new Properties()); Map serverProps = new HashMap<>(); tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString())); - serverProps.put(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, - TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME); + serverProps.put( + REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, + TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME + ); return serverProps; }