Skip to content

Commit

Permalink
fix the condition in KafkaMetadataLog#highWatermark method
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalcph committed May 10, 2024
1 parent c01478b commit 10542d8
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ final class KafkaMetadataLog private (

override def highWatermark: LogOffsetMetadata = {
val hwm = log.fetchOffsetSnapshot.highWatermark
val segmentPosition: Optional[OffsetMetadata] = if (hwm.messageOffsetOnly) {
val segmentPosition: Optional[OffsetMetadata] = if (!hwm.messageOffsetOnly) {
Optional.of(SegmentPosition(hwm.segmentBaseOffset, hwm.relativePositionInSegment))
} else {
Optional.empty()
Expand Down
18 changes: 18 additions & 0 deletions core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,24 @@ final class KafkaMetadataLogTest {
)
}

@Test
def testHighWatermarkOffsetMetadata(): Unit = {
val numberOfRecords = 10
val epoch = 1
val log = buildMetadataLog(tempDir, mockTime)

append(log, numberOfRecords, epoch)
log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))

val highWatermarkMetadata = log.highWatermark
assertEquals(numberOfRecords, highWatermarkMetadata.offset)
assertTrue(highWatermarkMetadata.metadata.isPresent)

val segmentPosition = highWatermarkMetadata.metadata.get().asInstanceOf[SegmentPosition]
assertEquals(0, segmentPosition.baseOffset)
assertTrue(segmentPosition.relativePosition > 0)
}

@Test
def testCreateSnapshotBeforeLogStartOffset(): Unit = {
val numberOfRecords = 10
Expand Down

0 comments on commit 10542d8

Please sign in to comment.