Skip to content

Commit

Permalink
KAFKA-16686: Improve waiting in TopicBasedRemoteLogMetadataManagerTest
Browse files Browse the repository at this point in the history
Some tests in TopicBasedRemoteLogMetadataManagerTest flake because
`waitUntilConsumerCatchesUp` may break early before consumer manager has
caught up with all the events.

This change allows passing a `Consumer<RemoteLogMetadata>` to the
`ConsumerTask` and modifies `waitUntilConsumerCatchesUp` to ensure
we wait until the consumer has caught up with all the
`RemoteLogMetadata` objects we expect in each test.

Refer
[Gradle Enterprise Report](https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest)
for more information on flakyness.
  • Loading branch information
gaurav-narula committed May 12, 2024
1 parent 643db43 commit d2684d3
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,6 +32,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

/**
* This class manages the consumer thread viz {@link ConsumerTask} that polls messages from the assigned metadata topic partitions.
Expand All @@ -50,7 +52,8 @@ public class ConsumerManager implements Closeable {
public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
Time time) {
Time time,
Consumer<RemoteLogMetadata> onConsume) {
this.rlmmConfig = rlmmConfig;
this.time = time;

Expand All @@ -62,7 +65,8 @@ public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
consumer,
100L,
300_000L,
time
time,
onConsume
);
consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,40 @@ class ConsumerTask implements Runnable, Closeable {
private long lastFailedFetchOffsetsTimestamp;
// The interval between retries to fetch the start and end offsets for the metadata partitions after a failed fetch.
private final long offsetFetchRetryIntervalMs;
private final java.util.function.Consumer<RemoteLogMetadata> onConsume;

public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
Consumer<byte[], byte[]> consumer,
long pollTimeoutMs,
long offsetFetchRetryIntervalMs,
Time time) {
this(
remotePartitionMetadataEventHandler,
topicPartitioner,
consumer,
pollTimeoutMs,
offsetFetchRetryIntervalMs,
time,
_unused -> { }
);
}

public ConsumerTask(RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
RemoteLogMetadataTopicPartitioner topicPartitioner,
Consumer<byte[], byte[]> consumer,
long pollTimeoutMs,
long offsetFetchRetryIntervalMs,
Time time,
java.util.function.Consumer<RemoteLogMetadata> onConsume) {
this.consumer = consumer;
this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler);
this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
this.pollTimeoutMs = pollTimeoutMs;
this.offsetFetchRetryIntervalMs = offsetFetchRetryIntervalMs;
this.time = Objects.requireNonNull(time);
this.uninitializedAt = time.milliseconds();
this.onConsume = onConsume;
}

@Override
Expand Down Expand Up @@ -153,6 +173,7 @@ public void run() {

private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
onConsume.accept(remoteLogMetadata);
if (shouldProcess(remoteLogMetadata, record.offset())) {
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -88,15 +89,17 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private final Consumer<RemoteLogMetadata> onConsume;
private volatile boolean initializationFailed;

public TopicBasedRemoteLogMetadataManager() {
this(true);
this(true, _unused -> { });
}

// Visible for testing.
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) {
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Consumer<RemoteLogMetadata> onConsume) {
this.startConsumerThread = startConsumerThread;
this.onConsume = onConsume;
}

@Override
Expand Down Expand Up @@ -419,7 +422,7 @@ private void initializeResources() {
lock.writeLock().lock();
try {
producerManager = new ProducerManager(rlmmConfig, rlmTopicPartitioner);
consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmTopicPartitioner, time);
consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmTopicPartitioner, time, onConsume);
if (startConsumerThread) {
consumerManager.startConsumerThread();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,10 +35,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
Expand All @@ -59,18 +61,28 @@ protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
}

public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread) {
boolean startConsumerThread
) {
initialize(topicIdPartitions, startConsumerThread, _unused -> { });
}

public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
Consumer<RemoteLogMetadata> onConsume
) {
// Call setup to start the cluster.
super.setUp(new EmptyTestInfo());

initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null);
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null, onConsume);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner,
Consumer<RemoteLogMetadata> onConsume
) {
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) {
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, onConsume) {
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public int metadataPartition(TopicIdPartition topicIdPartition) {
}
};

remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner);
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner, _unused -> { });

// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
}

private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) {
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, null);
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, null, _unused -> { });
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
Expand All @@ -39,10 +40,14 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
Expand All @@ -53,11 +58,15 @@ public class TopicBasedRemoteLogMetadataManagerTest {

private final Time time = new MockTime(1);
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
private Set<RemoteLogMetadata> remoteLogMetadataSet;

@BeforeEach
public void setup() {
// Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true);
remoteLogMetadataSet = ConcurrentHashMap.newKeySet();
remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true, remoteLogMetadata -> {
remoteLogMetadataSet.add(remoteLogMetadata);
});
}

@AfterEach
Expand Down Expand Up @@ -152,20 +161,14 @@ public void testNewPartitionUpdates() throws Exception {

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L);
waitUntilConsumerCatchesUp(new HashSet<>(Arrays.asList(leaderSegmentMetadata, followerSegmentMetadata)), 30_000L);

Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
}

private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartition,
TopicIdPartition newFollowerTopicIdPartition,
private void waitUntilConsumerCatchesUp(Set<RemoteLogMetadata> expected,
long timeoutMs) throws TimeoutException {
int leaderMetadataPartition = topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
int followerMetadataPartition = topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);

log.debug("Metadata partition for newLeaderTopicIdPartition: [{}], is: [{}]", newLeaderTopicIdPartition, leaderMetadataPartition);
log.debug("Metadata partition for newFollowerTopicIdPartition: [{}], is: [{}]", newFollowerTopicIdPartition, followerMetadataPartition);

long sleepMs = 100L;
long time = System.currentTimeMillis();
Expand All @@ -175,20 +178,8 @@ private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartiti
throw new TimeoutException("Timed out after " + timeoutMs + "ms ");
}

// If both the leader and follower partitions are mapped to the same metadata partition then it should have at least
// 2 messages. That means, read offset should be >= 1 (including duplicate messages if any).
if (leaderMetadataPartition == followerMetadataPartition) {
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) {
break;
}
} else {
// If the leader partition and the follower partition are mapped to different metadata partitions then
// each of those metadata partitions will have at least 1 message. That means, read offset should
// be >= 0 (including duplicate messages if any).
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 ||
topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) {
break;
}
if (remoteLogMetadataSet.equals(expected)) {
return;
}

log.debug("Sleeping for: " + sleepMs);
Expand All @@ -203,7 +194,7 @@ public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() {
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException {
public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException, InterruptedException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

Expand All @@ -222,15 +213,15 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
waitUntilConsumerCatchesUp(new HashSet<>(Arrays.asList(firstSegmentMetadata, secondSegmentMetadata, thirdSegmentMetadata)), 30_000L);

Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);

Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize);
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException {
public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

Expand All @@ -249,15 +240,15 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
waitUntilConsumerCatchesUp(new HashSet<>(Arrays.asList(firstSegmentMetadata, secondSegmentMetadata, thirdSegmentMetadata)), 30_000L);

Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException {
public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

Expand All @@ -273,7 +264,7 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
waitUntilConsumerCatchesUp(new HashSet<>(Arrays.asList(firstSegmentMetadata, secondSegmentMetadata)), 30_000L);

Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
}
Expand Down

0 comments on commit d2684d3

Please sign in to comment.