Skip to content

Commit

Permalink
KAFKA-16686: Improve waiting in TopicBasedRemoteLogMetadataManagerTest
Browse files Browse the repository at this point in the history
Some tests in TopicBasedRemoteLogMetadataManagerTest flake because
`waitUntilConsumerCatchesUp` may break early before consumer manager has
caught up with all the events.

This change allows passing a spy object for `RemotePartitionMetadataStore`
down to `ConsumerTask` which allows the test code to ensure the methods
on it were invoked appropriate number of times before performing
assertions.

Refer
[Gradle Enterprise Report](https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest)
for more information on flakyness.
  • Loading branch information
gaurav-narula committed May 12, 2024
1 parent 643db43 commit eb6fd01
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -89,14 +90,16 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier;

public TopicBasedRemoteLogMetadataManager() {
this(true);
this(true, RemotePartitionMetadataStore::new);
}

// Visible for testing.
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) {
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
this.startConsumerThread = startConsumerThread;
this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier;
}

@Override
Expand Down Expand Up @@ -358,7 +361,7 @@ public void configure(Map<String, ?> configs) {

rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = new RemotePartitionMetadataStore();
remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get();
configured = true;
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
Expand All @@ -60,17 +61,30 @@ protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {

public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread) {
initialize(topicIdPartitions, startConsumerThread, RemotePartitionMetadataStore::new);
}

public void initialize(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) {
// Call setup to start the cluster.
super.setUp(new EmptyTestInfo());

initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null);
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null, remotePartitionMetadataStoreSupplier);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner,
Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) {
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) {
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remotePartitionMetadataStoreSupplier) {
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
Expand All @@ -43,8 +42,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
public class TopicBasedRemoteLogMetadataManagerTest {
private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class);
Expand All @@ -53,11 +59,13 @@ public class TopicBasedRemoteLogMetadataManagerTest {

private final Time time = new MockTime(1);
private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
private RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler;

@BeforeEach
public void setup() {
// Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true);
spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore());
remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true, () -> spyRemotePartitionMetadataEventHandler);
}

@AfterEach
Expand Down Expand Up @@ -130,6 +138,20 @@ public void testNewPartitionUpdates() throws Exception {
final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));

CountDownLatch initializationLatch = new CountDownLatch(2);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).markInitialized(any());

CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(2);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
handleRemoteLogSegmentMetadataLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());

// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
// has not yet been subscribing as they are not yet registered.
Expand All @@ -152,61 +174,42 @@ public void testNewPartitionUpdates() throws Exception {

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L);
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));

verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
}

private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartition,
TopicIdPartition newFollowerTopicIdPartition,
long timeoutMs) throws TimeoutException {
int leaderMetadataPartition = topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
int followerMetadataPartition = topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);

log.debug("Metadata partition for newLeaderTopicIdPartition: [{}], is: [{}]", newLeaderTopicIdPartition, leaderMetadataPartition);
log.debug("Metadata partition for newFollowerTopicIdPartition: [{}], is: [{}]", newFollowerTopicIdPartition, followerMetadataPartition);

long sleepMs = 100L;
long time = System.currentTimeMillis();

while (true) {
if (System.currentTimeMillis() - time > timeoutMs) {
throw new TimeoutException("Timed out after " + timeoutMs + "ms ");
}

// If both the leader and follower partitions are mapped to the same metadata partition then it should have at least
// 2 messages. That means, read offset should be >= 1 (including duplicate messages if any).
if (leaderMetadataPartition == followerMetadataPartition) {
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) {
break;
}
} else {
// If the leader partition and the follower partition are mapped to different metadata partitions then
// each of those metadata partitions will have at least 1 message. That means, read offset should
// be >= 0 (including duplicate messages if any).
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 ||
topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) {
break;
}
}

log.debug("Sleeping for: " + sleepMs);
Utils.sleep(sleepMs);
}
}

@Test
public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> topicBasedRlmm().remoteLogSize(topicIdPartition, 0));
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException {
public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException, InterruptedException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();

CountDownLatch initializationLatch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).markInitialized(any());

CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(3);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
handleRemoteLogSegmentMetadataLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());

RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
Expand All @@ -222,17 +225,35 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));

verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata);
Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0);

Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize);
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException {
public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
CountDownLatch initializationLatch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).markInitialized(any());

CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(3);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
handleRemoteLogSegmentMetadataLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());

RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
Expand All @@ -249,17 +270,35 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));

verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata);
Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0));
Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1));
Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2));
}

@Test
public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException {
public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException {
TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0));
TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm();
CountDownLatch initializationLatch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).markInitialized(any());

CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(2);
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
handleRemoteLogSegmentMetadataLatch.countDown();
return result;
}).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any());

RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
Expand All @@ -273,8 +312,12 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L);
Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS));
Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS));

verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata);
verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata);
Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001));
}

Expand Down

0 comments on commit eb6fd01

Please sign in to comment.