Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16686: Wait for given offset in TopicBasedRemoteLogMetadataManagerTest #15885

Merged
merged 2 commits into from
May 15, 2024

Conversation

gaurav-narula
Copy link
Contributor

Some tests in TopicBasedRemoteLogMetadataManagerTest flake because waitUntilConsumerCatchesUp may break early before consumer manager has caught up with all the events.

This PR adds an expected offsets for leader/follower metadataOffset partitions and ensures we wait for the offset to be at least equal to the argument to avoid flakyness.

Refer Gradle Enterprise Report for more information on flakyness.

@gaurav-narula
Copy link
Contributor Author

CC: @clolov @satishd

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gaurav-narula for the patch! Left minor comments to address.

We can rewrite this test to be concise. I'll file a separate ticket for this.

if (leaderMetadataPartition == followerMetadataPartition) {
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= 1) {
Assertions.assertEquals(targetLeaderMetadataPartitionOffset, targetFollowerMetadataPartitionOffset);
if (topicBasedRlmm().readOffsetForPartition(leaderMetadataPartition).orElse(-1L) >= targetLeaderMetadataPartitionOffset) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, we were waiting for >=1, after this change, >=0. This will make the test more flaky.

when the leader and follower partitions are mapped to the same partition, then we have to wait for twice the amount of messages.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gaurav-narula for the PR, left a meta comment.

Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newLeaderTopicIdPartition).hasNext());
Assertions.assertTrue(topicBasedRlmm().listRemoteLogSegments(newFollowerTopicIdPartition).hasNext());
}

private void waitUntilConsumerCatchesUp(TopicIdPartition newLeaderTopicIdPartition,
TopicIdPartition newFollowerTopicIdPartition,
long timeoutMs) throws TimeoutException {
long timeoutMs,
long targetLeaderMetadataPartitionOffset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters will not help much here as this method was written for testNewPartitionUpdates but other tests in this class used the functionality with the gaps. It is better to relook at those usecases and refactor this method respectively.

@gaurav-narula gaurav-narula force-pushed the KAFKA-16686 branch 3 times, most recently from d2684d3 to 8586c03 Compare May 12, 2024 01:27
@gaurav-narula
Copy link
Contributor Author

Thanks for the feedback @kamalcph @satishd!

I've modified the tests so that we propagate a Consumer<RemoteLogMetadata> down to ConsumerTask and use it only for tests.

This allows us to replace waitUntilConsumerCatchesUp with TestUtils.waitForCondition to actually wait on the consumption of all expected RemoteLogMetadata objects we set up in the tests instead of relying on offsets which is ambiguous.

Please have a look and let me know your thoughts!

@@ -153,6 +173,7 @@ public void run() {

private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
final RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
onConsume.accept(remoteLogMetadata);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the correct way to capture the events. Assume that the testcase don't want to process an event (shouldProcess check returns false). We don't want that event to be captured.

Instead, we can have a setter method for RemotePartitionMetadataStore and pass a custom implementation similar to DummyEventHandler where we can capture the event and delegate it to the real implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! This made me realise there's also a possible race where even after RemotePartitionMetadataStore::handleRemoteLogSegmentMetadata is invoked, the assertions on topicBasedRlmm().listRemoteLogSegments may fail because remoteLogMetadataCache.isInitialized() may return false.

Inspired by your suggestion to hook on RemotePartitionMetadataStore, I've modified TopicBasedRemoteLogMetadataManagerHarness to accept a spy object for it which is passed down to ConsumerTask. The tests are modified to ensure handleRemoteLogSegmentMetadata and markInitialized are invoked appropriate number of times.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the test, the approach LGTM!

Nit: Why are we using the supplier pattern instead of adding a setter to TopicBasedRemoteLogMetadataManager and marking it as visibleForTesting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why are we using the supplier pattern instead of adding a setter to TopicBasedRemoteLogMetadataManager and marking it as visibleForTesting?

IIUC, you're alluding to something similar we do for remoteLogMetadataTopicPartitioner at

.

That can work, but I feel it's very easy to introduce a race inadvertently since TopicBasedRemoteLogMetadataManager::configure spawns a thread (

initializationThread = KafkaThread.nonDaemon("RLMMInitializationThread", this::initializeResources);
). In fact, remoteLogMetadataTopicPartitioner is prone to a race, where if the test thread yields before line 109 is executed, the ProducerManager and ConsumerManager instances can get instantiated with incorrect remoteLogMetdataTopicPartitioner instance (
producerManager = new ProducerManager(rlmmConfig, rlmTopicPartitioner);
).

We can avoid it by invoking the setter before calling TopicBasedRemoteLogMetadataManager::configure but I feel it's easier to enforce it by using a Supplier instead. Either way, I feel this race should be fixed as well now :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for fixing the flaky test!

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gaurav-narula nice fix!

@@ -89,14 +90,16 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private volatile RemoteLogMetadataTopicPartitioner rlmTopicPartitioner;
private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
private volatile boolean initializationFailed;
private final Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier;

public TopicBasedRemoteLogMetadataManager() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comments to say this default constructor is required as we create RemoteLogMetadataManager dynamically?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, addressed both in a8ba568

}

// Visible for testing.
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) {
public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread, Supplier<RemotePartitionMetadataStore> remoteLogMetadataManagerSupplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems package-private is enough in testing, right?

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I have re-trigger QA. will merge it if no objection.

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gaurav-narula for addressing the review comments, this approach LGTM.

Some tests in TopicBasedRemoteLogMetadataManagerTest flake because
`waitUntilConsumerCatchesUp` may break early before consumer manager has
caught up with all the events.

This change allows passing a spy object for `RemotePartitionMetadataStore`
down to `ConsumerTask` which allows the test code to ensure the methods
on it were invoked appropriate number of times before performing
assertions.

Refer
[Gradle Enterprise Report](https://ge.apache.org/scans/tests?search.timeZoneId=Europe%2FLondon&tests.container=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest)
for more information on flakyness.
@gaurav-narula
Copy link
Contributor Author

@chia7712 looks like we still suffer from thread leaks in CI :( I've rebased from trunk to trigger CI again

@chia7712
Copy link
Contributor

looks like we still suffer from thread leaks in CI :( I've rebased from trunk to trigger CI again

I have noticed that too. so sad :(

@chia7712 chia7712 merged commit eb5559a into apache:trunk May 15, 2024
1 check failed
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
…erTest (apache#15885)

Some tests in TopicBasedRemoteLogMetadataManagerTest flake because waitUntilConsumerCatchesUp may break early before consumer manager has caught up with all the events.

This PR adds an expected offsets for leader/follower metadataOffset partitions and ensures we wait for the offset to be at least equal to the argument to avoid flakyness.

Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…erTest (apache#15885)

Some tests in TopicBasedRemoteLogMetadataManagerTest flake because waitUntilConsumerCatchesUp may break early before consumer manager has caught up with all the events.

This PR adds an expected offsets for leader/follower metadataOffset partitions and ensures we wait for the offset to be at least equal to the argument to avoid flakyness.

Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants