Skip to content

Commit

Permalink
KAFKA-15352: Update log-start-offset before initiating deletion of re…
Browse files Browse the repository at this point in the history
…mote segments (#14349)

This change is about the current leader updating the log-start-offset before the segments are deleted from remote storage. This will do a best-effort mechanism for followers to receive log-start-offset from the leader and they can update their log-start-offset before it becomes a leader. 

Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>
  • Loading branch information
clolov authored and satishd committed Sep 12, 2023
1 parent 2a56edc commit 4e831b9
Show file tree
Hide file tree
Showing 2 changed files with 251 additions and 40 deletions.
94 changes: 55 additions & 39 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -832,70 +832,64 @@ public RemoteLogRetentionHandler(Optional<RetentionSizeData> retentionSizeData,
remainingBreachedSize = retentionSizeData.map(sizeData -> sizeData.remainingBreachedSize).orElse(0L);
}

private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata metadata) {
if (!retentionSizeData.isPresent()) {
return false;
}

boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored -> {
// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
if (remainingBytes >= 0) {
remainingBreachedSize = remainingBytes;
return true;
}
boolean shouldDeleteSegment = false;

// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
if (remainingBytes >= 0) {
remainingBreachedSize = remainingBytes;
shouldDeleteSegment = true;
}
}

return false;
});
if (isSegmentDeleted) {
if (shouldDeleteSegment) {
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
logger.info("Deleted remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
logger.info("About to delete remote log segment {} due to retention size {} breach. Log size after deletion will be {}.",
metadata.remoteLogSegmentId(), retentionSizeData.get().retentionSize, remainingBreachedSize + retentionSizeData.get().retentionSize);
}
return isSegmentDeleted;
return shouldDeleteSegment;
}

public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata)
throws RemoteStorageException, ExecutionException, InterruptedException {
public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata metadata) {
if (!retentionTimeData.isPresent()) {
return false;
}

boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
ignored -> metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs);
if (isSegmentDeleted) {
boolean shouldDeleteSegment = metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs;
if (shouldDeleteSegment) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
// are ascending with in an epoch.
logStartOffset = OptionalLong.of(metadata.endOffset() + 1);
logger.info("Deleted remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
logger.info("About to delete remote log segment {} due to retention time {}ms breach based on the largest record timestamp in the segment",
metadata.remoteLogSegmentId(), retentionTimeData.get().retentionMs);
}
return isSegmentDeleted;
return shouldDeleteSegment;
}

private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries)
throws RemoteStorageException, ExecutionException, InterruptedException {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored -> {
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
Integer firstEpoch = leaderEpochEntries.firstKey();
return metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= firstEpoch)
&& metadata.endOffset() < logStartOffset;
}
return false;
});
if (isSegmentDeleted) {
logger.info("Deleted remote log segment {} due to log-start-offset {} breach. " +
NavigableMap<Integer, Long> leaderEpochEntries) {
boolean shouldDeleteSegment = false;
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
Integer firstEpoch = leaderEpochEntries.firstKey();
shouldDeleteSegment = metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= firstEpoch)
&& metadata.endOffset() < logStartOffset;
}
if (shouldDeleteSegment) {
logger.info("About to delete remote log segment {} due to log-start-offset {} breach. " +
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
metadata.remoteLogSegmentId(), logStartOffset, leaderEpochEntries.firstEntry(),
metadata.endOffset(), metadata.segmentLeaderEpochs());
}
return isSegmentDeleted;
return shouldDeleteSegment;
}

// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as
Expand Down Expand Up @@ -989,6 +983,7 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator();
boolean canProcess = true;
List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>();
while (canProcess && epochIterator.hasNext()) {
Integer epoch = epochIterator.next();
Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
Expand All @@ -1004,19 +999,22 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the checkpoint file. It will always return false
// since the checkpoint file was already truncated.
boolean isSegmentDeleted = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!isSegmentDeleted) {
if (!shouldDeleteSegment) {
// check whether the segment contains the required epoch range with in the current leader epoch lineage.
isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
isSegmentDeleted =
shouldDeleteSegment =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
}
}
canProcess = isSegmentDeleted || !isValidSegment;
if (shouldDeleteSegment) {
segmentsToDelete.add(metadata);
}
canProcess = shouldDeleteSegment || !isValidSegment;
}
}

Expand Down Expand Up @@ -1045,6 +1043,24 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex

// Update log start offset with the computed value after retention cleanup is done
remoteLogRetentionHandler.logStartOffset.ifPresent(offset -> handleLogStartOffsetUpdate(topicIdPartition.topicPartition(), offset));

// At this point in time we have updated the log start offsets, but not initiated a deletion.
// Either a follower has picked up the changes to the log start offset, or they have not.
// If the follower HAS picked up the changes, and they become the leader this replica won't successfully complete
// the deletion.
// However, the new leader will correctly pick up all breaching segments as log start offset breaching ones
// and delete them accordingly.
// If the follower HAS NOT picked up the changes, and they become the leader then they will go through this process
// again and delete them with the original deletion reason i.e. size, time or log start offset breach.
List<String> undeletedSegments = new ArrayList<>();
for (RemoteLogSegmentMetadata segmentMetadata : segmentsToDelete) {
if (!remoteLogRetentionHandler.deleteRemoteLogSegment(segmentMetadata, x -> !isCancelled() && isLeader())) {
undeletedSegments.add(segmentMetadata.remoteLogSegmentId().toString());
}
}
if (!undeletedSegments.isEmpty()) {
logger.info("The following remote segments could not be deleted: {}", String.join(",", undeletedSegments));
}
}

private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
Expand Down

0 comments on commit 4e831b9

Please sign in to comment.