diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index b8e3d106664e..b917455a205e 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -55,6 +55,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -89,14 +90,22 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner; private final Set pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>()); private volatile boolean initializationFailed; + private final Supplier remoteLogMetadataManagerSupplier; + /** + * The default constructor delegates to the internal one, starting the consumer thread and + * supplying an instance of RemotePartitionMetadataStore by default. + */ public TopicBasedRemoteLogMetadataManager() { - this(true); + this(true, RemotePartitionMetadataStore::new); } - // Visible for testing. - public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) { + /** + * Used in tests to dynamically configure the instance. + */ + TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Supplier remoteLogMetadataManagerSupplier) { this.startConsumerThread = startConsumerThread; + this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier; } @Override @@ -358,7 +367,7 @@ public void configure(Map configs) { rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs); rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount()); - remotePartitionMetadataStore = new RemotePartitionMetadataStore(); + remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get(); configured = true; log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig); diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java index abad6ea76760..9b48ebfc238c 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java @@ -34,10 +34,11 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; -import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; @@ -60,17 +61,30 @@ protected Map overrideRemoteLogMetadataManagerProps() { public void initialize(Set topicIdPartitions, boolean startConsumerThread) { + initialize(topicIdPartitions, startConsumerThread, RemotePartitionMetadataStore::new); + } + + public void initialize(Set topicIdPartitions, + boolean startConsumerThread, + Supplier remotePartitionMetadataStoreSupplier) { // Call setup to start the cluster. super.setUp(new EmptyTestInfo()); - initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null); + initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null, remotePartitionMetadataStoreSupplier); } public void initializeRemoteLogMetadataManager(Set topicIdPartitions, boolean startConsumerThread, RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) { + initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new); + } + + public void initializeRemoteLogMetadataManager(Set topicIdPartitions, + boolean startConsumerThread, + RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner, + Supplier remotePartitionMetadataStoreSupplier) { String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); - topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) { + topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remotePartitionMetadataStoreSupplier) { @Override public void onPartitionLeadershipChanges(Set leaderPartitions, Set followerPartitions) { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 8c53ab8153ad..bb3def893cd6 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; @@ -43,8 +42,15 @@ import java.util.HashMap; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters public class TopicBasedRemoteLogMetadataManagerTest { private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class); @@ -53,11 +59,13 @@ public class TopicBasedRemoteLogMetadataManagerTest { private final Time time = new MockTime(1); private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness(); + private RemotePartitionMetadataStore spyRemotePartitionMetadataEventHandler; @BeforeEach public void setup() { // Start the cluster and initialize TopicBasedRemoteLogMetadataManager. - remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true); + spyRemotePartitionMetadataEventHandler = spy(new RemotePartitionMetadataStore()); + remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true, () -> spyRemotePartitionMetadataEventHandler); } @AfterEach @@ -130,6 +138,20 @@ public void testNewPartitionUpdates() throws Exception { final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); + CountDownLatch initializationLatch = new CountDownLatch(2); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + initializationLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).markInitialized(any()); + + CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(2); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + handleRemoteLogSegmentMetadataLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); + // Add segments for these partitions but an exception is received as they have not yet been subscribed. // These messages would have been published to the respective metadata topic partitions but the ConsumerManager // has not yet been subscribing as they are not yet registered. @@ -152,50 +174,17 @@ public void testNewPartitionUpdates() throws Exception { // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - waitUntilConsumerCatchesUp(newLeaderTopicIdPartition, newFollowerTopicIdPartition, 30_000L); + Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + verify(spyRemotePartitionMetadataEventHandler).markInitialized(newLeaderTopicIdPartition); + verify(spyRemotePartitionMetadataEventHandler).markInitialized(newFollowerTopicIdPartition); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(leaderSegmentMetadata); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(followerSegmentMetadata); Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext()); Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext()); } - private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartition, - TopicIdPartition newFollowerTopicIdPartition, - long timeoutMs) throws TimeoutException { - int leaderMetadataPartition = topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition); - int followerMetadataPartition = topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition); - - log.debug("Metadata partition for newLeaderTopicIdPartition: [{}], is: [{}]", newLeaderTopicIdPartition, leaderMetadataPartition); - log.debug("Metadata partition for newFollowerTopicIdPartition: [{}], is: [{}]", newFollowerTopicIdPartition, followerMetadataPartition); - - long sleepMs = 100L; - long time = System.currentTimeMillis(); - - while (true) { - if (System.currentTimeMillis() - time > timeoutMs) { - throw new TimeoutException("Timed out after " + timeoutMs + "ms "); - } - - // If both the leader and follower partitions are mapped to the same metadata partition then it should have at least - // 2 messages. That means, read offset should be >= 1 (including duplicate messages if any). - if (leaderMetadataPartition == followerMetadataPartition) { - if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) { - break; - } - } else { - // If the leader partition and the follower partition are mapped to different metadata partitions then - // each of those metadata partitions will have at least 1 message. That means, read offset should - // be >= 0 (including duplicate messages if any). - if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 0 || - topicBasedRlmm().readOffsetForPartition(followerMetadataPartition).orElse(-1L) >= 0) { - break; - } - } - - log.debug("Sleeping for: " + sleepMs); - Utils.sleep(sleepMs); - } - } - @Test public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); @@ -203,10 +192,24 @@ public void testRemoteLogSizeCalculationForUnknownTopicIdPartitionThrows() { } @Test - public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException { + public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws RemoteStorageException, TimeoutException, InterruptedException { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm(); + CountDownLatch initializationLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + initializationLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).markInitialized(any()); + + CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(3); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + handleRemoteLogSegmentMetadataLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); + RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); RemoteLogSegmentMetadata secondSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), @@ -222,17 +225,35 @@ public void testRemoteLogSizeCalculationWithSegmentsOfTheSameEpoch() throws Remo // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L); + Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); Long remoteLogSize = topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0); Assertions.assertEquals(SEG_SIZE * 6, remoteLogSize); } @Test - public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException { + public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm(); + CountDownLatch initializationLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + initializationLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).markInitialized(any()); + + CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(3); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + handleRemoteLogSegmentMetadataLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); @@ -249,17 +270,35 @@ public void testRemoteLogSizeCalculationWithSegmentsOfDifferentEpochs() throws R // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L); + Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(thirdSegmentMetadata); Assertions.assertEquals(SEG_SIZE, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 0)); Assertions.assertEquals(SEG_SIZE * 2, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 1)); Assertions.assertEquals(SEG_SIZE * 3, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 2)); } @Test - public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException { + public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() throws RemoteStorageException, TimeoutException, InterruptedException { TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("singleton", 0)); TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = topicBasedRlmm(); + CountDownLatch initializationLatch = new CountDownLatch(1); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + initializationLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).markInitialized(any()); + + CountDownLatch handleRemoteLogSegmentMetadataLatch = new CountDownLatch(2); + doAnswer(invocationOnMock -> { + Object result = invocationOnMock.callRealMethod(); + handleRemoteLogSegmentMetadataLatch.countDown(); + return result; + }).when(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(any()); RemoteLogSegmentMetadata firstSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); @@ -273,8 +312,12 @@ public void testRemoteLogSizeCalculationWithSegmentsHavingNonExistentEpochs() th // RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start // fetching those events and build the cache. - waitUntilConsumerCatchesUp(topicIdPartition, topicIdPartition, 30_000L); + Assertions.assertTrue(initializationLatch.await(30_000, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(handleRemoteLogSegmentMetadataLatch.await(30_000, TimeUnit.MILLISECONDS)); + verify(spyRemotePartitionMetadataEventHandler).markInitialized(topicIdPartition); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(firstSegmentMetadata); + verify(spyRemotePartitionMetadataEventHandler).handleRemoteLogSegmentMetadata(secondSegmentMetadata); Assertions.assertEquals(0, topicBasedRemoteLogMetadataManager.remoteLogSize(topicIdPartition, 9001)); }