Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15352: Update log-start-offset before initiating deletion of remote segments #14349

Merged
merged 3 commits into from Sep 12, 2023

Conversation

clolov
Copy link
Collaborator

@clolov clolov commented Sep 6, 2023

This pull request tries to solve is to first update the log-start-offset and then delete remote segments.
In the previous version if a read request arrives between us deleting a segment and updating the log-start-offset we won't be able to service it.

@clolov clolov marked this pull request as draft September 6, 2023 15:22
@satishd satishd added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label Sep 7, 2023
Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes LGTM. Could you please cover the patch with unit tests?

@@ -1006,6 +1005,10 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex

// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> true);
Copy link
Collaborator

@kamalcph kamalcph Sep 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we and the result of remoteLogRetentionHandler.deleteRemoteLogSegment for all the segments and log an error statement if it is not able to delete the segments?

remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader());

@kamalcph
Copy link
Collaborator

kamalcph commented Sep 7, 2023

@divijvaidya

Could you please review this patch? Discussion thread:

This patch addresses the case-2 mentioned in the KAFKA-15352 ticket partially. Only the leader will move the log-start-offset and starts to delete the remote log segments. In middle of deleting the remote log segments, if leader switch happens then there can be two cases:

  1. The log-start-offset changes was propagated to all the replicas including the new-leader. In this case, both the old-leader and new-leader can delete the remote log segments since the log-start-offset can be updated only with the monotonically increasing value.
  2. The log-start-offset changes was not propagated to the new leader. In this case, the new leader assumes the log-start-offset (with KAFKA-15351) which can be deleted immediately by the old leader and the log-start-offset can be stale until the next segment deletion, we can address this case by ensuring that the log-start-offset propagates to all the replicas before deleting the remote log segments, we can take this up later (or) file a JIRA.

@clolov clolov marked this pull request as ready for review September 7, 2023 12:09
Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some meta questions:

  1. The race condition between start offset getting updated, leadership getting moved and deletion by RLM is a tricky one. Is there a way to add integ tests for the different scenarios?

  2. Log start offset is updated only by expiration related work. In this PR we update log start offset even though we might not perform expiration. Isn't it safer to update log start offset only if we are leader? Yes, we will still have a case where leadership changes between updating log start and deleting this which is handled by this PR but the probability of that happening is low since there is no compute intensive statements between these two. My point is, should we check for leadership even before updating the log start offset?

@showuon showuon self-assigned this Sep 7, 2023
Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clolov for the PR. Please rebase it with the latest trunk and resolve the conflicts to make it available for review.

@kamalcph
Copy link
Collaborator

kamalcph commented Sep 7, 2023

My point is, should we check for leadership even before updating the log start offset?

yes, this is the expectation and being done inside the handleLogStartOffsetUpdate method:

public void handleLogStartOffsetUpdate(TopicPartition topicPartition, long remoteLogStartOffset) {
    if (isLeader()) {
        logger.debug("Updating {} with remoteLogStartOffset: {}", topicPartition, remoteLogStartOffset);
        updateRemoteLogStartOffset.accept(topicPartition, remoteLogStartOffset);
    }
}

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clolov for the PR. Had an initial review of the changes.

core/src/main/java/kafka/log/remote/RemoteLogManager.java Outdated Show resolved Hide resolved
@@ -179,6 +180,8 @@ public List<EpochEntry> read() {

private final UnifiedLog mockLog = mock(UnifiedLog.class);

private final List<Map<TopicPartition, Long>> events = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use list of tuples instead of list of map?

.thenAnswer(answer -> {
// assert that log-start-offset has been moved accordingly
// we skip the first entry as it is the local replica ensuring it has the correct log start offset
assertEquals(200, events.get(1).get(leaderTopicIdPartition.topicPartition()));
Copy link
Collaborator

@kamalcph kamalcph Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of asserting the log-start-offset via events. Can we assert it similar to testLogStartOffsetUpdatedOnStartup method?

AtomicLong logStartOffset = new AtomicLong(0);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time,
        tp -> Optional.of(mockLog),
        (topicPartition, offset) ->  logStartOffset.set(offset),
        brokerTopicStats) {
    public RemoteLogMetadataManager createRemoteLogMetadataManager() {
        return remoteLogMetadataManager;
    }
}) {
    RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
    task.convertToLeader(0);
    task.run();
    assertEquals(200L, logStartOffset.get());
    verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
    verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to change it to this. The reason why I implemented it with events is that it allowed me to carry out the assertion before deletes were initiated. With your approach we assert that the log start offset has been updated, but we do not assert that it was updated before deletes were carried out. Would you still like me to change it to your proposal?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you mean that the problem is not with the location of the assertion, but that I do not need a list of events where a single AtomicLong will be sufficient?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, single AtomicLong will be sufficient. We can assert before and after calling the task.run() method.

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @clolov for addressing the review comments.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM! Thanks for the fix. Left some comments.

Comment on lines 1015 to 1017
if (shouldDeleteSegment) {
segmentsToDelete.add(metadata);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These if block and L1004-1006 could be put before canProcess = isSegmentDeleted || !isValidSegment;. I.e.

boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
                            metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;

if (!shouldDeleteSegment) {
    ...
  if (isValidSegment) {
      shouldDeleteSegment = remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) || remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
  }
}
if (shouldDeleteSegment) {
   segmentsToDelete.add(metadata);
}

canProcess = isSegmentDeleted || !isValidSegment;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the spot! Hopefully addressed in the next commit

Comment on lines +1665 to +1667
assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
Copy link
Contributor

@showuon showuon Sep 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good place to continue to verify the situation we want to protect:

// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.

// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.

So, I'm thinking we can continue the test with sth like:

RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
task.convertToLeader(1);
....
task.run();

assertEquals(200L, logStartOffset.get());

// verify the 2nd log segment will be deleted by the new leader.
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));

WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great idea, apologies for not doing it myself. Hopefully the next commit addresses this

@clolov
Copy link
Collaborator Author

clolov commented Sep 11, 2023

Thanks for the latest comments! I will review and respond today

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't wait for comments from me to merge this. I am good with the code changes here.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the fix!

@showuon
Copy link
Contributor

showuon commented Sep 12, 2023

@satishd , do you want to have another look?

Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clolov , went through the source code changes and LGTM.

This change is about updating the log-start-offset before the segments are deleted from remote storage. This will do a best effort mechanism for followers to receive log-start-offset and they can update their log-start-offset before it becomes a leader.

I could not take a closer look at the tests. I do not want the PR to be blocking, I am fine as others already reviewed these tests.

@satishd
Copy link
Member

satishd commented Sep 12, 2023

There are a few unrelated test failures. Merging it to trunk and 3.6

@satishd satishd merged commit 7483991 into apache:trunk Sep 12, 2023
1 check failed
satishd pushed a commit that referenced this pull request Sep 12, 2023
…mote segments (#14349)

This change is about the current leader updating the log-start-offset before the segments are deleted from remote storage. This will do a best-effort mechanism for followers to receive log-start-offset from the leader and they can update their log-start-offset before it becomes a leader. 

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Nov 22, 2023
…mote segments (apache#14349)

This change is about the current leader updating the log-start-offset before the segments are deleted from remote storage. This will do a best-effort mechanism for followers to receive log-start-offset from the leader and they can update their log-start-offset before it becomes a leader. 

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
Cerchie pushed a commit to Cerchie/kafka that referenced this pull request Feb 22, 2024
…mote segments (apache#14349)

This change is about the current leader updating the log-start-offset before the segments are deleted from remote storage. This will do a best-effort mechanism for followers to receive log-start-offset from the leader and they can update their log-start-offset before it becomes a leader. 

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
5 participants