Skip to content

Commit

Permalink
apply the logic only for the remote log enabled topics
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalcph committed Apr 28, 2024
1 parent c38a3ab commit 8c67c19
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
checkLogStartOffset(offset)
if (offset < localLogStartOffset()) {
new LogOffsetMetadata(offset)
if (remoteLogEnabled() && offset < localLogStartOffset()) {
new LogOffsetMetadata(offset, LogOffsetMetadata.REMOTE_LOG_UNKNOWN_OFFSET)
} else {
localLog.convertToOffsetMetadataOrThrow(offset)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class LogOffsetMetadata {

//TODO KAFKA-14484 remove once UnifiedLog has been moved to the storage module
private static final long UNIFIED_LOG_UNKNOWN_OFFSET = -1L;
public static final long REMOTE_LOG_UNKNOWN_OFFSET = -2L;

public static final LogOffsetMetadata UNKNOWN_OFFSET_METADATA = new LogOffsetMetadata(-1L, 0L, 0);

Expand All @@ -41,6 +42,11 @@ public LogOffsetMetadata(long messageOffset) {
this(messageOffset, UNIFIED_LOG_UNKNOWN_OFFSET, UNKNOWN_FILE_POSITION);
}

public LogOffsetMetadata(long messageOffset,
long segmentBaseOffset) {
this(messageOffset, segmentBaseOffset, UNKNOWN_FILE_POSITION);
}

public LogOffsetMetadata(long messageOffset,
long segmentBaseOffset,
int relativePositionInSegment) {
Expand All @@ -51,8 +57,10 @@ public LogOffsetMetadata(long messageOffset,

// check if this offset is already on an older segment compared with the given offset
public boolean onOlderSegment(LogOffsetMetadata that) {
if (messageOffsetOnly() || that.messageOffsetOnly())
if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
return false;
if (messageOffsetOnly())
throw new KafkaException(this + " cannot compare its segment info with " + that + " since it only has message offset info");

return this.segmentBaseOffset < that.segmentBaseOffset;
}
Expand All @@ -65,8 +73,10 @@ private boolean onSameSegment(LogOffsetMetadata that) {
// compute the number of bytes between this offset to the given offset
// if they are on the same segment and this offset precedes the given offset
public int positionDiff(LogOffsetMetadata that) {
if (messageOffsetOnly() || that.messageOffsetOnly())
if (this.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET || that.segmentBaseOffset == REMOTE_LOG_UNKNOWN_OFFSET)
return 1;
if (messageOffsetOnly())
throw new KafkaException(this + " cannot compare its segment position with " + that + " since it only has message offset info");
if (!onSameSegment(that))
throw new KafkaException(this + " cannot compare its segment position with " + that + " since they are not on the same segment");

Expand Down

0 comments on commit 8c67c19

Please sign in to comment.