Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16712: Fix race in TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest #15962

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -91,21 +92,23 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier;
private final Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction;

/**
* The default constructor delegates to the internal one, starting the consumer thread and
* supplying an instance of RemotePartitionMetadataStore by default.
* supplying an instance of RemoteLogMetadataTopicPartitioner and RemotePartitionMetadataStore by default.
*/
public TopicBasedRemoteLogMetadataManager() {
this(true, RemotePartitionMetadataStore::new);
this(true, RemoteLogMetadataTopicPartitioner::new, RemotePartitionMetadataStore::new);
}

/**
* Used in tests to dynamically configure the instance.
*/
TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitionerFunction, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
this.startConsumerThread = startConsumerThread;
this.remoteLogMetadataManagerSupplier = remoteLogMetadataManagerSupplier;
this.remoteLogMetadataTopicPartitionerFunction = remoteLogMetadataTopicPartitionerFunction;
}

@Override
Expand Down Expand Up @@ -366,7 +369,7 @@ public void configure(Map<String, ?> configs) {
log.info("Started configuring topic-based RLMM with configs: {}", configs);

rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
rlmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
rlmTopicPartitioner = remoteLogMetadataTopicPartitionerFunction.apply(rlmmConfig.metadataTopicPartitionsCount());
remotePartitionMetadataStore = remoteLogMetadataManagerSupplier.get();
configured = true;
log.info("Successfully configured topic-based RLMM with config: {}", rlmmConfig);
Expand Down Expand Up @@ -559,11 +562,6 @@ public TopicBasedRemoteLogMetadataManagerConfig config() {
return rlmmConfig;
}

// Visible for testing.
void setRlmTopicPartitioner(RemoteLogMetadataTopicPartitioner rlmTopicPartitioner) {
this.rlmTopicPartitioner = Objects.requireNonNull(rlmTopicPartitioner);
}

@Override
public void close() throws IOException {
// Close all the resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
Expand Down Expand Up @@ -70,21 +71,21 @@ public void initialize(Set<TopicIdPartition> topicIdPartitions,
// Call setup to start the cluster.
super.setUp(new EmptyTestInfo());

initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, null, remotePartitionMetadataStoreSupplier);
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, RemoteLogMetadataTopicPartitioner::new, remotePartitionMetadataStoreSupplier);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) {
Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner) {
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new);
}

public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner,
Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner,
Supplier<RemotePartitionMetadataStore> remotePartitionMetadataStoreSupplier) {
String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remotePartitionMetadataStoreSupplier) {
topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread, remoteLogMetadataTopicPartitioner, remotePartitionMetadataStoreSupplier) {
@Override
public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
Set<TopicIdPartition> followerPartitions) {
Expand Down Expand Up @@ -119,9 +120,6 @@ public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs);

topicBasedRemoteLogMetadataManager.configure(configs);
if (remoteLogMetadataTopicPartitioner != null) {
topicBasedRemoteLogMetadataManager.setRlmTopicPartitioner(remoteLogMetadataTopicPartitioner);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setRlmTopicPartitioner gets unused now, so could you please remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 6880e9f

}
try {
waitUntilInitialized(60_000);
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -41,6 +39,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
Expand Down Expand Up @@ -108,7 +114,27 @@ public void testMultiplePartitionSubscriptions() throws Exception {
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
final TopicIdPartition emptyTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(topicWithNoMessages, 0));

RemoteLogMetadataTopicPartitioner partitioner = new RemoteLogMetadataTopicPartitioner(10) {
final RemotePartitionMetadataStore spyRemotePartitionMetadataStore = spy(new RemotePartitionMetadataStore());

// Think of a Phaser as a CountdownLatch which provides a "countUp" operation in addition to a countDown.
// The "parties" in a phaser are analogous to the "count". The awaiting semantics of Phaser
// however differ slightly compared to a CountdownLatch, which requires us to account for
// the test thread as well while initialising the Phaser.
Phaser initializationPhaser = new Phaser(2); // 1 to register test thread, 1 to register leaderTopicIdPartition
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
initializationPhaser.arriveAndDeregister(); // similar to CountdownLatch::countDown
return result;
}).when(spyRemotePartitionMetadataStore).markInitialized(any());

Phaser handleRemoteLogSegmentMetadataPhaser = new Phaser(2); // 1 to register test thread, 1 to register leaderTopicIdPartition
doAnswer(invocationOnMock -> {
Object result = invocationOnMock.callRealMethod();
handleRemoteLogSegmentMetadataPhaser.arriveAndDeregister(); // similar to CountdownLatch::countDown
return result;
}).when(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(any());

remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, numMetadataTopicPartitions -> new RemoteLogMetadataTopicPartitioner(numMetadataTopicPartitions) {
@Override
public int metadataPartition(TopicIdPartition topicIdPartition) {
// Always return partition 0 except for noMessagesTopicIdPartition. So that, any new user
Expand All @@ -120,9 +146,7 @@ public int metadataPartition(TopicIdPartition topicIdPartition) {
return 0;
}
}
};

remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), true, partitioner);
}, () -> spyRemotePartitionMetadataStore);

// Add segments for these partitions but an exception is received as they have not yet been subscribed.
// These messages would have been published to the respective metadata topic partitions but the ConsumerManager
Expand Down Expand Up @@ -150,26 +174,31 @@ public int metadataPartition(TopicIdPartition topicIdPartition) {

// RemoteLogSegmentMetadata events are already published, and topicBasedRlmm's consumer manager will start
// fetching those events and build the cache.
waitUntilConsumerCatchesUp(30_000L);
initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS); // similar to CountdownLatch::await
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
verify(spyRemotePartitionMetadataStore).markInitialized(leaderTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(leaderSegmentMetadata);
clearInvocations(spyRemotePartitionMetadataStore);

// leader partitions would have received as it is registered, but follower partition is not yet registered,
// hence it throws an exception.
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext());
Assertions.assertThrows(RemoteStorageException.class, () -> rlmm().listRemoteLogSegments(followerTopicIdPartition));

// Register follower partition
// Phaser::bulkRegister and Phaser::register provide the "countUp" feature
initializationPhaser.bulkRegister(2); // 1 for emptyTopicIdPartition and 1 for followerTopicIdPartition
handleRemoteLogSegmentMetadataPhaser.register(); // 1 for followerTopicIdPartition, emptyTopicIdPartition doesn't have a RemoteLogSegmentMetadata event
rlmm().onPartitionLeadershipChanges(Collections.singleton(emptyTopicIdPartition),
Collections.singleton(followerTopicIdPartition));

// In this state, all the metadata should be available in RLMM for both leader and follower partitions.
TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
TestUtils.waitForCondition(() -> rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
}
initializationPhaser.awaitAdvanceInterruptibly(initializationPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);
handleRemoteLogSegmentMetadataPhaser.awaitAdvanceInterruptibly(handleRemoteLogSegmentMetadataPhaser.arrive(), 30_000, TimeUnit.MILLISECONDS);

private void waitUntilConsumerCatchesUp(long timeoutMs) throws TimeoutException, InterruptedException {
TestUtils.waitForCondition(() -> {
// If both the leader and follower partitions are mapped to the same metadata partition which is 0, it
// should have at least 2 messages. That means, read offset should be >= 1 (including duplicate messages if any).
return rlmm().readOffsetForPartition(0).orElse(-1L) >= 1;
}, timeoutMs, "Consumer did not catch up");
verify(spyRemotePartitionMetadataStore).markInitialized(followerTopicIdPartition);
verify(spyRemotePartitionMetadataStore).handleRemoteLogSegmentMetadata(followerSegmentMetadata);
// In this state, all the metadata should be available in RLMM for both leader and follower partitions.
Assertions.assertTrue(rlmm().listRemoteLogSegments(leaderTopicIdPartition).hasNext(), "No segments found");
Assertions.assertTrue(rlmm().listRemoteLogSegments(followerTopicIdPartition).hasNext(), "No segments found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
}

private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) {
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, null);
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, RemoteLogMetadataTopicPartitioner::new);
}

@AfterEach
Expand Down