Skip to content

Commit

Permalink
Address comments from third round of review
Browse files Browse the repository at this point in the history
  • Loading branch information
clolov committed Sep 11, 2023
1 parent 63da324 commit e96e096
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
9 changes: 3 additions & 6 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1001,9 +1001,6 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
// since the checkpoint file was already truncated.
boolean shouldDeleteSegment = remoteLogRetentionHandler.deleteLogStartOffsetBreachedSegments(
metadata, logStartOffset, epochWithOffsets);
if (shouldDeleteSegment) {
segmentsToDelete.add(metadata);
}
boolean isValidSegment = false;
if (!shouldDeleteSegment) {
// check whether the segment contains the required epoch range with in the current leader epoch lineage.
Expand All @@ -1012,11 +1009,11 @@ private void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, Ex
shouldDeleteSegment =
remoteLogRetentionHandler.deleteRetentionTimeBreachedSegments(metadata) ||
remoteLogRetentionHandler.deleteRetentionSizeBreachedSegments(metadata);
if (shouldDeleteSegment) {
segmentsToDelete.add(metadata);
}
}
}
if (shouldDeleteSegment) {
segmentsToDelete.add(metadata);
}
canProcess = shouldDeleteSegment || !isValidSegment;
}
}
Expand Down
34 changes: 30 additions & 4 deletions core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1626,8 +1626,8 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
return remoteLogMetadataManager;
}
}) {
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
task.convertToLeader(0);
RemoteLogManager.RLMTask leaderTask = remoteLogManager.new RLMTask(leaderTopicIdPartition, 128);
leaderTask.convertToLeader(0);

when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.logEndOffset()).thenReturn(200L);
Expand Down Expand Up @@ -1655,16 +1655,42 @@ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> {
// cancel the task so that we don't delete the second segment
task.cancel();
leaderTask.cancel();
return CompletableFuture.runAsync(() -> {
});
});

task.run();
leaderTask.run();

assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager, never()).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));

// test that the 2nd log segment will be deleted by the new leader
RemoteLogManager.RLMTask newLeaderTask = remoteLogManager.new RLMTask(followerTopicIdPartition, 128);
newLeaderTask.convertToLeader(1);

Iterator<RemoteLogSegmentMetadata> firstIterator = remoteLogSegmentMetadatas.iterator();
firstIterator.next();
Iterator<RemoteLogSegmentMetadata> secondIterator = remoteLogSegmentMetadatas.iterator();
secondIterator.next();
Iterator<RemoteLogSegmentMetadata> thirdIterator = remoteLogSegmentMetadatas.iterator();
thirdIterator.next();

when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition))
.thenReturn(firstIterator);
when(remoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition, 0))
.thenReturn(secondIterator)
.thenReturn(thirdIterator);

when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class)))
.thenAnswer(answer -> CompletableFuture.runAsync(() -> { }));

newLeaderTask.run();

assertEquals(200L, logStartOffset.get());
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(0));
verify(remoteStorageManager).deleteLogSegmentData(remoteLogSegmentMetadatas.get(1));
}
}

Expand Down

0 comments on commit e96e096

Please sign in to comment.