Skip to content

Commit

Permalink
KAFKA-15410: Delete records with tiered storage integration test (4/4) (
Browse files Browse the repository at this point in the history
#14330)

* Added the integration test for DELETE_RECORDS API for tiered storage enabled topic
* Added validation checks before removing remote log segments for log-start-offset breach

Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Christo Lolov <lolovc@amazon.com>
  • Loading branch information
kamalcph authored and satishd committed Sep 7, 2023
1 parent 522263d commit 946ab8f
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 45 deletions.
108 changes: 67 additions & 41 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Expand Up @@ -837,10 +837,10 @@ private boolean deleteRetentionSizeBreachedSegments(RemoteLogSegmentMetadata met
return false;
}

boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored -> {
// Assumption that segments contain size >= 0
if (remainingBreachedSize > 0) {
long remainingBytes = remainingBreachedSize - x.segmentSizeInBytes();
long remainingBytes = remainingBreachedSize - metadata.segmentSizeInBytes();
if (remainingBytes >= 0) {
remainingBreachedSize = remainingBytes;
return true;
Expand All @@ -864,7 +864,7 @@ public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata meta
}

boolean isSegmentDeleted = deleteRemoteLogSegment(metadata,
x -> x.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs);
ignored -> metadata.maxTimestampMs() <= retentionTimeData.get().cleanupUntilMs);
if (isSegmentDeleted) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
// It is fine to have logStartOffset as `metadata.endOffset() + 1` as the segment offset intervals
Expand All @@ -876,35 +876,48 @@ public boolean deleteRetentionTimeBreachedSegments(RemoteLogSegmentMetadata meta
return isSegmentDeleted;
}

private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata, long startOffset)
private boolean deleteLogStartOffsetBreachedSegments(RemoteLogSegmentMetadata metadata,
long logStartOffset,
NavigableMap<Integer, Long> leaderEpochEntries)
throws RemoteStorageException, ExecutionException, InterruptedException {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x -> startOffset > x.endOffset());
if (isSegmentDeleted && retentionSizeData.isPresent()) {
remainingBreachedSize = Math.max(0, remainingBreachedSize - metadata.segmentSizeInBytes());
logger.info("Deleted remote log segment {} due to log start offset {} breach", metadata.remoteLogSegmentId(), startOffset);
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored -> {
if (!leaderEpochEntries.isEmpty()) {
// Note that `logStartOffset` and `leaderEpochEntries.firstEntry().getValue()` should be same
Integer firstEpoch = leaderEpochEntries.firstKey();
return metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch <= firstEpoch)
&& metadata.endOffset() < logStartOffset;
}
return false;
});
if (isSegmentDeleted) {
logger.info("Deleted remote log segment {} due to log-start-offset {} breach. " +
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
metadata.remoteLogSegmentId(), logStartOffset, leaderEpochEntries.firstEntry(),
metadata.endOffset(), metadata.segmentLeaderEpochs());
}

return isSegmentDeleted;
}

// It removes the segments beyond the current leader's earliest epoch. Those segments are considered as
// unreferenced because they are not part of the current leader epoch lineage.
private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry, RemoteLogSegmentMetadata metadata) throws RemoteStorageException, ExecutionException, InterruptedException {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, x ->
x.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earliestEpochEntry,
RemoteLogSegmentMetadata metadata)
throws RemoteStorageException, ExecutionException, InterruptedException {
boolean isSegmentDeleted = deleteRemoteLogSegment(metadata, ignored ->
metadata.segmentLeaderEpochs().keySet().stream().allMatch(epoch -> epoch < earliestEpochEntry.epoch));
if (isSegmentDeleted) {
logger.info("Deleted remote log segment {} due to leader epoch cache truncation. Current earliest epoch: {}, segmentEndOffset: {} and segmentEpochs: {}",
logger.info("Deleted remote log segment {} due to leader-epoch-cache truncation. " +
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}",
metadata.remoteLogSegmentId(), earliestEpochEntry, metadata.endOffset(), metadata.segmentLeaderEpochs().keySet());
}

// No need to update the log-start-offset as these epochs/offsets are earlier to that value.
return isSegmentDeleted;
}

private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata, Predicate<RemoteLogSegmentMetadata> predicate)
throws RemoteStorageException, ExecutionException, InterruptedException {
if (predicate.test(segmentMetadata)) {
logger.info("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId());
logger.debug("Deleting remote log segment {}", segmentMetadata.remoteLogSegmentId());
// Publish delete segment started event.
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
Expand All @@ -917,10 +930,9 @@ private boolean deleteRemoteLogSegment(RemoteLogSegmentMetadata segmentMetadata,
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(
new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(), time.milliseconds(),
segmentMetadata.customMetadata(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, brokerId)).get();
logger.info("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
logger.debug("Deleted remote log segment {}", segmentMetadata.remoteLogSegmentId());
return true;
}

return false;
}

Expand Down Expand Up @@ -967,7 +979,6 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
LeaderEpochFileCache leaderEpochCache = leaderEpochCacheOption.get();
// Build the leader epoch map by filtering the epochs that do not have any records.
NavigableMap<Integer, Long> epochWithOffsets = buildFilteredLeaderEpochMap(leaderEpochCache.epochWithOffsets());
Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry();

long logStartOffset = log.logStartOffset();
long logEndOffset = log.logEndOffset();
Expand All @@ -977,34 +988,48 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex

RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator();
boolean isSegmentDeleted = true;
while (isSegmentDeleted && epochIterator.hasNext()) {
boolean canProcess = true;
while (canProcess && epochIterator.hasNext()) {
Integer epoch = epochIterator.next();
Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
while (isSegmentDeleted && segmentsIterator.hasNext()) {
while (canProcess && segmentsIterator.hasNext()) {
if (isCancelled() || !isLeader()) {
logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed.");
return;
}
RemoteLogSegmentMetadata metadata = segmentsIterator.next();

// check whether the segment contains the required epoch range with in the current leader epoch lineage.
if (isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets)) {
isSegmentDeleted =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(metadata, logStartOffset);
// When the log-start-offset is moved by the user, the leader-epoch-checkpoint file gets truncated
// as per the log-start-offset. Until the rlm-cleaner-thread runs in the next iteration, those
// remote log segments won't be removed. The `isRemoteSegmentWithinLeaderEpoch` validates whether
// the epochs present in the segment lies in the checkpoint file. It will always return false
// since the checkpoint file was already truncated.
boolean isSegmentDeleted = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
metadata, logStartOffset, epochWithOffsets);
boolean isValidSegment = false;
if (!isSegmentDeleted) {
// check whether the segment contains the required epoch range with in the current leader epoch lineage.
isValidSegment = isRemoteSegmentWithinLeaderEpochs(metadata, logEndOffset, epochWithOffsets);
if (isValidSegment) {
isSegmentDeleted =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
}
}
canProcess = isSegmentDeleted || !isValidSegment;
}
}

// Remove the remote log segments whose segment-leader-epochs are less than the earliest-epoch known
// to the leader. This will remove the unreferenced segments in the remote storage. This is needed for
// unclean leader election scenarios as the remote storage can have epochs earlier to the current leader's
// earliest leader epoch.
Optional<EpochEntry> earliestEpochEntryOptional = leaderEpochCache.earliestEntry();
if (earliestEpochEntryOptional.isPresent()) {
EpochEntry earliestEpochEntry = earliestEpochEntryOptional.get();
Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream().filter(x -> x < earliestEpochEntry.epoch).iterator();
Iterator<Integer> epochsToClean = remoteLeaderEpochs.stream()
.filter(remoteEpoch -> remoteEpoch < earliestEpochEntry.epoch)
.iterator();
while (epochsToClean.hasNext()) {
int epoch = epochsToClean.next();
Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
Expand Down Expand Up @@ -1093,8 +1118,9 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata
Integer segmentFirstEpoch = segmentLeaderEpochs.firstKey();
Integer segmentLastEpoch = segmentLeaderEpochs.lastKey();
if (segmentFirstEpoch < leaderEpochs.firstKey() || segmentLastEpoch > leaderEpochs.lastKey()) {
LOGGER.debug("[{}] Remote segment {} is not within the partition leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}",
segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
LOGGER.debug("Segment {} is not within the partition leader epoch lineage. " +
"Remote segment epochs: {} and partition leader epochs: {}",
segmentMetadata.remoteLogSegmentId(), segmentLeaderEpochs, leaderEpochs);
return false;
}

Expand All @@ -1104,38 +1130,38 @@ public static boolean isRemoteSegmentWithinLeaderEpochs(RemoteLogSegmentMetadata

// If segment's epoch does not exist in the leader epoch lineage then it is not a valid segment.
if (!leaderEpochs.containsKey(epoch)) {
LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}",
segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " +
"Remote segment epochs: {} and partition leader epochs: {}",
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
return false;
}

// Segment's first epoch's offset should be more than or equal to the respective leader epoch's offset.
if (epoch == segmentFirstEpoch && offset < leaderEpochs.get(epoch)) {
LOGGER.debug("[{}] Remote segment {}'s first epoch {}'s offset is less than leader epoch's offset {}.",
segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
LOGGER.debug("Segment {} first epoch {} offset is less than leader epoch offset {}.",
segmentMetadata.remoteLogSegmentId(), epoch, leaderEpochs.get(epoch));
return false;
}

// Segment's end offset should be less than or equal to the respective leader epoch's offset.
if (epoch == segmentLastEpoch) {
Map.Entry<Integer, Long> nextEntry = leaderEpochs.higherEntry(epoch);
if (nextEntry != null && segmentEndOffset > nextEntry.getValue() - 1) {
LOGGER.debug("[{}] Remote segment {}'s end offset {} is more than leader epoch's offset {}.",
segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1);
LOGGER.debug("Segment {} end offset {} is more than leader epoch offset {}.",
segmentMetadata.remoteLogSegmentId(), segmentEndOffset, nextEntry.getValue() - 1);
return false;
}
}

// Next segment epoch entry and next leader epoch entry should be same to ensure that the segment's epoch
// is within the leader epoch lineage.
if (epoch != segmentLastEpoch && !leaderEpochs.higherEntry(epoch).equals(segmentLeaderEpochs.higherEntry(epoch))) {
LOGGER.debug("[{}] Remote segment {}'s epoch {} is not within the leader epoch lineage. Remote segment epochs: {} and partition leader epochs: {}",
segmentMetadata.topicIdPartition(), segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
LOGGER.debug("Segment {} epoch {} is not within the leader epoch lineage. " +
"Remote segment epochs: {} and partition leader epochs: {}",
segmentMetadata.remoteLogSegmentId(), epoch, segmentLeaderEpochs, leaderEpochs);
return false;
}

}

// segment end offset should be with in the log end offset.
return segmentEndOffset < logEndOffset;
}
Expand Down Expand Up @@ -1286,7 +1312,7 @@ private FetchDataInfo addAbortedTransactions(long startOffset,

OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex();
long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
.map(position -> position.offset).orElse(segmentMetadata.endOffset() + 1);

final Set<FetchResponseData.AbortedTransaction> abortedTransactions = new HashSet<>();

Expand Down
Expand Up @@ -323,7 +323,7 @@ public TieredStorageTestBuilder deleteRecords(String topic,
Integer partition,
Long beforeOffset) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
actions.add(new DeleteRecordsAction(topicPartition, beforeOffset));
actions.add(new DeleteRecordsAction(topicPartition, beforeOffset, buildDeleteSegmentSpecList(topic)));
return this;
}

Expand Down Expand Up @@ -377,6 +377,10 @@ private ProducableSpec getOrCreateProducable(String topic,

private DeleteTopicAction buildDeleteTopicAction(String topic,
Boolean shouldDelete) {
return new DeleteTopicAction(topic, buildDeleteSegmentSpecList(topic), shouldDelete);
}

private List<RemoteDeleteSegmentSpec> buildDeleteSegmentSpecList(String topic) {
List<RemoteDeleteSegmentSpec> deleteSegmentSpecList = deletables.entrySet()
.stream()
.filter(e -> e.getKey().topic().equals(topic))
Expand All @@ -389,7 +393,7 @@ private DeleteTopicAction buildDeleteTopicAction(String topic,
})
.collect(Collectors.toList());
deleteSegmentSpecList.forEach(spec -> deletables.remove(spec.getTopicPartition()));
return new DeleteTopicAction(topic, deleteSegmentSpecList, shouldDelete);
return deleteSegmentSpecList;
}
}

0 comments on commit 946ab8f

Please sign in to comment.