Skip to content

Commit

Permalink
KAFKA-16686 Wait for given offset in TopicBasedRemoteLogMetadataManag…
Browse files Browse the repository at this point in the history
…erTest (#15885)

Some tests in TopicBasedRemoteLogMetadataManagerTest flake because waitUntilConsumerCatchesUp may break early before consumer manager has caught up with all the events.

This PR adds an expected offsets for leader/follower metadataOffset partitions and ensures we wait for the offset to be at least equal to the argument to avoid flakyness.

Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
gaurav-narula committed May 15, 2024
1 parent 1e427c0 commit eb5559a
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -89,14 +90,22 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier;

/**
* The default constructor delegates to the internal one, starting the consumer thread and
* supplying an instance of RemotePartitionMetadataStore by default.
*/
public TopicBasedRemoteLogMetadataManager() {
this(true);
this(true, RemotePartitionMetadataStore::new);
}

// Visible for testing.
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) {
/**
* Used in tests to dynamically configure the instance.
*/
TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
this.startConsumerThread = startConsumerThread;
this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier;
}

@Override
Expand Down Expand Up @@ -358,7 +367,7 @@ public void configure(Map<String, ?> configs) {

rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = new RemotePartitionMetadataStore();
remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get();
configured = true;
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
Expand All @@ -60,17 +61,30 @@ protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {

public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread) {
initialize(topicIdPartitions, startConsumerThread, RemotePartitionMetadataStore::new);
}

public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) {
// Call setup to start the cluster.
super.setUp(new EmptyTestInfo());

initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null);
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null, remotePartitionMetadataStoreSupplier);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner,
Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) {
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) {
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remotePartitionMetadataStoreSupplier) {
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) {
Expand Down
Loading

0 comments on commit eb5559a

Please sign in to comment.