Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15479: Remote log segments should be considered once for retention breach #14407

Merged
merged 5 commits into from Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
90 changes: 46 additions & 44 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Expand Up @@ -561,7 +561,6 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
}
cache.truncateFromEnd(endOffset);
}

return checkpoint;
}

Expand Down Expand Up @@ -706,7 +705,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
}
}

private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset)
throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
Expand Down Expand Up @@ -832,13 +832,11 @@ public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData,
remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L);
}

private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionSizeData.isPresent()) {
return false;
return shouldDeleteSegment;
}

boolean shouldDeleteSegment = false;

// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
Expand All @@ -847,7 +845,6 @@ private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata met
shouldDeleteSegment = true;
}
}

if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
Expand All @@ -856,12 +853,12 @@ private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata met
return shouldDeleteSegment;
}

public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionTimeData.isPresent()) {
return false;
return shouldDeleteSegment;
}

boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
Expand All @@ -873,9 +870,9 @@ public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata meta
return shouldDeleteSegment;
}

private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries) {
private boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries) {
boolean shouldDeleteSegment = false;
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
Expand Down Expand Up @@ -916,10 +913,8 @@ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata,
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();

// Delete the segment in remote storage.
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);

// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
Expand All @@ -932,7 +927,7 @@ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata,

}

private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
return;
Expand Down Expand Up @@ -993,22 +988,24 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
return;
}
RemoteLogSegmentMetadata metadata = segmentsIterator.next();

if (segmentsToDelete.contains(metadata)) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fix.

continue;
}
// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the checkpoint file. It will always return false
// since the checkpoint file was already truncated.
boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!shouldDeleteSegment) {
// check whether the segment contains the required epoch range with in the current leader epoch lineage.
isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
shouldDeleteSegment =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
}
}
if (shouldDeleteSegment) {
Expand All @@ -1018,6 +1015,27 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
}
}

// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}

// Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
// to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
// unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
Expand All @@ -1040,27 +1058,6 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
}
}
}

// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}
}

private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
Expand Down Expand Up @@ -1179,7 +1176,12 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
}
}
// segment end offset should be with in the log end offset.
return segmentEndOffset < logEndOffset;
if (segmentEndOffset >= logEndOffset) {
LOGGER.debug("Segment {} end offset {} is more than log end offset {}.",
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, logEndOffset);
return false;
}
return true;
}

/**
Expand Down