Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,53 @@
*/
package org.apache.kafka.tiered.storage.integration;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.KeyValueSpec;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;

public final class AlterLogDirTest extends TieredStorageTestHarness {
public final class AlterLogDirTest {

@Override
public int brokerCount() {
return 2;
private static final int BROKER_COUNT = 3;

private static List<ClusterConfig> clusterConfig() {
return List.of(ClusterConfig.defaultBuilder()
.setTypes(Set.of(Type.KRAFT))
.setBrokers(BROKER_COUNT)
.setDisksPerBroker(2)
.setServerProperties(createServerPropsForRemoteStorage(
AlterLogDirTest.class.getSimpleName().toLowerCase(Locale.ROOT),
BROKER_COUNT,
5))
.build());
}

@ClusterTemplate("clusterConfig")
public void testAlterLogDirWithClassicGroupProtocol(ClusterInstance clusterInstance) throws Exception {
executeAlterLogDirTest(clusterInstance, GroupProtocol.CLASSIC);
}

@Override
protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
@ClusterTemplate("clusterConfig")
public void testAlterLogDirWithConsumerGroupProtocol(ClusterInstance clusterInstance) throws Exception {
executeAlterLogDirTest(clusterInstance, GroupProtocol.CONSUMER);
}

private void executeAlterLogDirTest(ClusterInstance clusterInstance, GroupProtocol groupProtocol) throws Exception {
final String topicB = "topicB";
final int p0 = 0;
final int partitionCount = 1;
Expand All @@ -43,8 +72,9 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
final int broker0 = 0;
final int broker1 = 1;

TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
builder
// create topicB with 1 partition and 1 RF
// create topicB with 1 partition and 2 RF
.createTopic(topicB, partitionCount, replicationFactor, maxBatchCountPerSegment,
mkMap(mkEntry(p0, List.of(broker1, broker0))), enableRemoteLogStorage)
// send records to partition 0
Expand All @@ -63,5 +93,18 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
// consume from the beginning of the topic to read data from local and remote storage
.expectFetchFromTieredStorage(broker0, topicB, p0, 3)
.consume(topicB, p0, 0L, 4, 3);

Map<String, Object> extraConsumerProps = Map.of(
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)
);
try (TieredStorageTestContext context = new TieredStorageTestContext(clusterInstance, extraConsumerProps)) {
try {
for (TieredStorageTestAction action : builder.complete()) {
action.execute(context);
}
} finally {
context.printReport(System.out);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,21 @@
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.test.api.ClusterTemplate;
import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tiered.storage.integration.TransactionsTestHelper.TransactionHooks;
import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createServerPropsForRemoteStorage;
import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;

/**
Expand All @@ -56,10 +53,7 @@ public class TransactionsWithTieredStoreTest {
private static final int BROKER_COUNT = 3;

private static Map<String, String> baseServerProperties() {
String storageDirPath = TestUtils.tempDirectory(
"kafka-remote-tier-" + TEST_CLASS_NAME).getAbsolutePath();

Map<String, String> serverProps = new HashMap<>();
Map<String, String> serverProps = createServerPropsForRemoteStorage(TEST_CLASS_NAME, BROKER_COUNT, 3);
serverProps.put(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
serverProps.put("offsets.topic.num.partitions", "1");
serverProps.put("transaction.state.log.num.partitions", "3");
Expand All @@ -70,14 +64,6 @@ private static Map<String, String> baseServerProperties() {
serverProps.put("auto.leader.rebalance.enable", "false");
serverProps.put("group.initial.rebalance.delay.ms", "0");
serverProps.put("transaction.abort.timed.out.transaction.cleanup.interval.ms", "200");

Properties tieredProps = createPropsForRemoteStorage(
TEST_CLASS_NAME, storageDirPath, BROKER_COUNT, 3, new Properties());
tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString()));

serverProps.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME);

return serverProps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
Expand Down Expand Up @@ -164,6 +165,24 @@ public static Properties createPropsForRemoteStorage(String testClassName,
return overridingProps;
}

public static Map<String, String> createServerPropsForRemoteStorage(
String testClassName,
int brokerCount,
int numRemoteLogMetadataPartitions
) {
String storageDirPath = org.apache.kafka.test.TestUtils
.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath();
Properties tieredProps = createPropsForRemoteStorage(
testClassName, storageDirPath, brokerCount, numRemoteLogMetadataPartitions, new Properties());
Map<String, String> serverProps = new HashMap<>();
tieredProps.forEach((k, v) -> serverProps.put(k.toString(), v.toString()));
serverProps.put(
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME
);
return serverProps;
}

public static Map<String, String> createTopicConfigForRemoteStorage(boolean enableRemoteStorage,
int maxRecordBatchPerSegment) {
Map<String, String> topicProps = new HashMap<>();
Expand Down
Loading