Skip to content

Commit

Permalink
KAFKA-15479: Remote log segments should be considered once for retent…
Browse files Browse the repository at this point in the history
…ion breach (apache#14407)

When a remote log segment contains multiple epoch, then it gets considered for multiple times during breach by retention size/time/start-offset. This will affect the deletion by remote log retention size as it deletes the number of segments less than expected. This is a follow-up of KAFKA-15352

Reviewers: Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Satish Duggana <satishd@apache.org>
  • Loading branch information
kamalcph authored and mjsax committed Nov 22, 2023
1 parent 3f6eea1 commit f175319
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 207 deletions.
90 changes: 46 additions & 44 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Expand Up @@ -561,7 +561,6 @@ InMemoryLeaderEpochCheckpoint getLeaderEpochCheckpoint(UnifiedLog log, long star
}
cache.truncateFromEnd(endOffset);
}

return checkpoint;
}

Expand Down Expand Up @@ -706,7 +705,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
}
}

private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset) throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegmentBaseOffset)
throws InterruptedException, ExecutionException, RemoteStorageException, IOException,
CustomMetadataSizeLimitExceededException {
File logFile = segment.log().file();
String logFileName = logFile.getName();
Expand Down Expand Up @@ -832,13 +832,11 @@ public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData,
remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L);
}

private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
private boolean isSegmentBreachedByRetentionSize(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionSizeData.isPresent()) {
return false;
return shouldDeleteSegment;
}

boolean shouldDeleteSegment = false;

// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
Expand All @@ -847,7 +845,6 @@ private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata met
shouldDeleteSegment = true;
}
}

if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
Expand All @@ -856,12 +853,12 @@ private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata met
return shouldDeleteSegment;
}

public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
public boolean isSegmentBreachedByRetentionTime(RemoteLogSegmentMetadata metadata) {
boolean shouldDeleteSegment = false;
if (!retentionTimeData.isPresent()) {
return false;
return shouldDeleteSegment;
}

boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
Expand All @@ -873,9 +870,9 @@ public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata meta
return shouldDeleteSegment;
}

private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries) {
private boolean isSegmentBreachByLogStartOffset(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries) {
boolean shouldDeleteSegment = false;
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
Expand Down Expand Up @@ -916,10 +913,8 @@ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata,
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, brokerId)).get();

// Delete the segment in remote storage.
remoteLogStorageManager.deleteLogSegmentData(segmentMetadata);

// Publish delete segment finished event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
Expand All @@ -932,7 +927,7 @@ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata,

}

private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionException, InterruptedException {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments cleanup as the task state is changed");
return;
Expand Down Expand Up @@ -993,22 +988,24 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
return;
}
RemoteLogSegmentMetadata metadata = segmentsIterator.next();

if (segmentsToDelete.contains(metadata)) {
continue;
}
// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the checkpoint file. It will always return false
// since the checkpoint file was already truncated.
boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
boolean shouldDeleteSegment = remoteLogRetentionHandler.isSegmentBreachByLogStartOffset(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!shouldDeleteSegment) {
// check whether the segment contains the required epoch range with in the current leader epoch lineage.
isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
shouldDeleteSegment =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
remoteLogRetentionHandler.isSegmentBreachedByRetentionTime(metadata) ||
remoteLogRetentionHandler.isSegmentBreachedByRetentionSize(metadata);
}
}
if (shouldDeleteSegment) {
Expand All @@ -1018,6 +1015,27 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
}
}

// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}

// Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
// to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
// unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
Expand All @@ -1040,27 +1058,6 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
}
}
}

// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}
}

private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
Expand Down Expand Up @@ -1179,7 +1176,12 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
}
}
// segment end offset should be with in the log end offset.
return segmentEndOffset < logEndOffset;
if (segmentEndOffset >= logEndOffset) {
LOGGER.debug("Segment {} end offset {} is more than log end offset {}.",
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, logEndOffset);
return false;
}
return true;
}

/**
Expand Down

0 comments on commit f175319

Please sign in to comment.