Skip to content

Commit

Permalink
Using Optional empty instead of nullable
Browse files Browse the repository at this point in the history
  • Loading branch information
iit2009060 committed Jan 3, 2024
1 parent ad76366 commit 5bef7e9
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/kafka/log/remote/RemoteLogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,10 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
try {
int startPos = 0;
RecordBatch firstBatch = null;

// Iteration over multiple RemoteSegmentMetadata is required in case of log compaction.
// It may be possible the offset is log compacted in the current RemoteLogSegmentMetadata
// And we need to iterate over the next segment metadata to fetch messages higher than the given offset.
while (firstBatch == null && rlsMetadataOptional.isPresent()) {
remoteLogSegmentMetadata = rlsMetadataOptional.get();
// Search forward for the position of the last offset that is greater than or equal to the target offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPar

public Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) {
return Optional.ofNullable(null);
return Optional.empty();
}

int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
Expand Down Expand Up @@ -2175,9 +2175,6 @@ public void testReadForFirstBatchInLogCompaction() throws RemoteStorageException
RemoteLogSegmentMetadata segmentMetadata = mock(RemoteLogSegmentMetadata.class);
LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class);
when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1));

when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt()))
.thenAnswer(a -> fileInputStream);
when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));

int fetchOffset = 0;
Expand Down

0 comments on commit 5bef7e9

Please sign in to comment.