From 2f98c5ecc5506ed3d5e697bbe5b003b6ff506012 Mon Sep 17 00:00:00 2001 From: Liebing Date: Wed, 24 Sep 2025 15:26:46 +0800 Subject: [PATCH] [hotfix] Add bucket info for bucket related logs --- .../org/apache/fluss/server/log/LocalLog.java | 38 +++++----- .../apache/fluss/server/log/LogLoader.java | 3 +- .../apache/fluss/server/log/LogTablet.java | 42 +++++++---- .../server/log/remote/LogTieringTask.java | 17 +++-- .../server/log/remote/RemoteLogManager.java | 5 +- .../apache/fluss/server/replica/Replica.java | 73 +++++++++++++------ .../fluss/server/replica/ReplicaManager.java | 4 +- 7 files changed, 119 insertions(+), 63 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java index 061c83ebb5..130b3855d9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java @@ -362,10 +362,11 @@ public FetchDataInfo read( throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( - "Reading maximum {} bytes at offset {} from log with total length {} bytes", + "Reading maximum {} bytes at offset {} from log with total length {} bytes for bucket {}", maxLength, readOffset, - segments.sizeInBytes()); + segments.sizeInBytes(), + tableBucket); } long startOffset = localLogStartOffset; @@ -471,21 +472,22 @@ LogSegment roll(Optional expectedNextOffset) throws IOException { // true for an active segment of size zero because one of the indexes is // "full" (due to _maxEntries == 0). LOG.warn( - "Trying to roll a new log segment with start offset " - + newOffset - + " =max(provided offset = " - + expectedNextOffset - + ", LEO = " - + getLocalLogEndOffset() - + ") while it already exists and is active with size 0." - + ", size of offset index: " - + activeSegment.offsetIndex().entries() - + "."); + "Trying to roll a new log segment for bucket {} with start offset {} " + + "=max(provided offset = {}, LEO = {}) while it already exists " + + "and is active with size 0, size of offset index: {}.", + tableBucket, + newOffset, + expectedNextOffset, + getLocalLogEndOffset(), + activeSegment.offsetIndex().entries()); LogSegment newSegment = createAndDeleteSegment( newOffset, activeSegment, SegmentDeletionReason.LOG_ROLL); updateLogEndOffset(getLocalLogEndOffset()); - LOG.info("Rolled new log segment at offset " + newOffset); + LOG.info( + "Rolled new log segment for bucket {} at offset {}", + tableBucket, + newOffset); return newSegment; } else { throw new FlussRuntimeException( @@ -520,9 +522,9 @@ LogSegment roll(Optional expectedNextOffset) throws IOException { for (File file : Arrays.asList(logFile, offsetIdxFile, timeIndexFile)) { if (file.exists()) { LOG.warn( - "Newly rolled segment file " - + file.getAbsolutePath() - + " already exists; deleting it first"); + "Newly rolled segment file {} for bucket {} already exists; deleting it first", + tableBucket, + file.getAbsolutePath()); Files.delete(file.toPath()); } } @@ -536,7 +538,7 @@ LogSegment roll(Optional expectedNextOffset) throws IOException { // metadata when log rolls. // The next offset should not change. updateLogEndOffset(getLocalLogEndOffset()); - LOG.info("Rolled new log segment at offset " + newOffset); + LOG.info("Rolled new log segment for bucket {} at offset {}", tableBucket, newOffset); return newSegment; } @@ -547,7 +549,7 @@ LogSegment roll(Optional expectedNextOffset) throws IOException { * @return the list of segments that were scheduled for deletion */ List truncateFullyAndStartAt(long newOffset) throws IOException { - LOG.debug("Truncate and start at offset " + newOffset); + LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, tableBucket); checkIfMemoryMappedBufferClosed(); List segmentsToDelete = segments.values(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java index fac372305b..62d38581c5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java @@ -149,7 +149,8 @@ private void loadSegmentFiles() throws IOException { File logFile = FlussPaths.logFile(logTabletDir, offset); if (!logFile.exists()) { LOG.warn( - "Found an orphaned index file {}, with no corresponding log file.", + "Found an orphaned index file {} for bucket {}, with no corresponding log file.", + logSegments.getTableBucket(), file.getAbsolutePath()); Files.deleteIfExists(file.toPath()); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index b4c8f1c9a0..bf75410e42 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -430,14 +430,18 @@ private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) { synchronized (lock) { if (newHighWatermark.getMessageOffset() < highWatermarkMetadata.getMessageOffset()) { LOG.warn( - "Non-monotonic update of high watermark from {} to {}", + "Non-monotonic update of high watermark from {} to {} for bucket {}", highWatermarkMetadata, - newHighWatermark); + newHighWatermark, + localLog.getTableBucket()); } highWatermarkMetadata = newHighWatermark; // TODO log offset listener to update log offset. } - LOG.trace("Setting high watermark {}", newHighWatermark); + LOG.trace( + "Setting high watermark {} for bucket {}", + newHighWatermark, + localLog.getTableBucket()); } /** @@ -567,8 +571,9 @@ private void deleteSegments(long cleanUpToOffset) { long localLogStartOffset = localLog.getLocalLogStartOffset(); if (cleanUpToOffset < localLogStartOffset) { LOG.debug( - "Ignore the delete segments action while the input cleanUpToOffset {} " + "Ignore the delete segments action for bucket {} while the input cleanUpToOffset {} " + "is smaller than the current localLogStartOffset {}", + getTableBucket(), cleanUpToOffset, localLogStartOffset); return; @@ -576,8 +581,9 @@ private void deleteSegments(long cleanUpToOffset) { if (cleanUpToOffset > getHighWatermark()) { LOG.warn( - "Ignore the delete segments action while the input cleanUpToOffset {} " + "Ignore the delete segments action for bucket {} while the input cleanUpToOffset {} " + "is larger than the current highWatermark {}", + getTableBucket(), cleanUpToOffset, getHighWatermark()); return; @@ -716,11 +722,13 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader) // todo update the first unstable offset (which is used to compute lso) LOG.trace( - "Appended message set with last offset: {}, first offset {}, next offset: {} and messages {}", + "Appended message set with last offset: {}, first offset {}, next offset: {} " + + "and messages {} for bucket {}", appendInfo.lastOffset(), appendInfo.firstOffset(), localLog.getLocalLogEndOffset(), - validRecords); + validRecords, + getTableBucket()); if (localLog.unflushedMessages() >= logFlushIntervalMessages) { flush(false); @@ -787,11 +795,12 @@ private void flush(long offset, boolean includingOffset) throws IOException { if (flushOffset > localLog.getRecoveryPoint()) { if (LOG.isDebugEnabled()) { LOG.debug( - "Flushing log up to offset {} ({}) with recovery point {}, unflushed: {}", + "Flushing log up to offset {} ({}) with recovery point {}, unflushed: {}, for bucket {}", offset, includingOffsetStr, flushOffset, - localLog.unflushedMessages()); + localLog.unflushedMessages(), + getTableBucket()); } localLog.flush(flushOffset); @@ -810,7 +819,9 @@ private void maybeRoll(int messageSize, LogAppendInfo appendInfo) throws Excepti new RollParams(maxSegmentFileSize, appendInfo.lastOffset(), messageSize))) { if (LOG.isDebugEnabled()) { LOG.debug( - "Rolling new log segment (log_size = {}/{}), offset_index_size = {}/{}, time_index_size = {}/{}", + "Rolling new log segment for bucket {} (log_size = {}/{}), offset_index_size = {}/{}, " + + "time_index_size = {}/{}", + getTableBucket(), segment.getSizeInBytes(), maxSegmentFileSize, segment.offsetIndex().entries(), @@ -863,12 +874,13 @@ boolean truncateTo(long targetOffset) throws LogStorageException { if (targetOffset >= localLog.getLocalLogEndOffset()) { LOG.info( - "Truncate to {} has no effect as the largest offset in the log is {}.", + "Truncate to {} for bucket {} has no effect as the largest offset in the log is {}.", targetOffset, + getTableBucket(), localLog.getLocalLogEndOffset() - 1); return false; } else { - LOG.info("Truncating to offset {}", targetOffset); + LOG.info("Truncating to offset {} for bucket {}", targetOffset, getTableBucket()); synchronized (lock) { try { localLog.checkIfMemoryMappedBufferClosed(); @@ -902,7 +914,7 @@ boolean truncateTo(long targetOffset) throws LogStorageException { /** Delete all data in the log and start at the new offset. */ void truncateFullyAndStartAt(long newOffset) throws LogStorageException { - LOG.debug("Truncate and start at offset {}", newOffset); + LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, getTableBucket()); synchronized (lock) { try { localLog.truncateFullyAndStartAt(newOffset); @@ -950,14 +962,14 @@ public List logSegments() { } public void close() { - LOG.debug("close log tablet"); + LOG.debug("close log tablet for bucket {}", getTableBucket()); synchronized (lock) { localLog.checkIfMemoryMappedBufferClosed(); writerExpireCheck.cancel(true); try { writerStateManager.takeSnapshot(); } catch (IOException e) { - LOG.error("Error while taking writer snapshot.", e); + LOG.error("Error while taking writer snapshot for bucket {}.", getTableBucket(), e); } localLog.close(); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java index 6df83faa4e..cdde3842dc 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java @@ -200,8 +200,9 @@ private List candidateToCopyLogSegments(LogTablet log) { long fromOffset = Math.max(copiedOffset + 1, log.localLogStartOffset()); candidateLogSegments = candidateLogSegments(log, fromOffset, highWatermark); LOG.debug( - "Candidate log segments: logLocalStartOffset: {}, copiedOffset: {}, " + "Candidate log segments for bucket {}: logLocalStartOffset: {}, copiedOffset: {}, " + "fromOffset: {}, highWatermark: {} and candidateLogSegments: {}", + tableBucket, log.localLogStartOffset(), copiedOffset, fromOffset, @@ -216,7 +217,8 @@ private List candidateToCopyLogSegments(LogTablet log) { } } else { LOG.debug( - "Skipping copying segments to remote, current read-offset:{}, and highWatermark:{}", + "Skipping copying segments for bucket {} to remote, current read-offset:{}, and highWatermark:{}", + tableBucket, copiedOffset, highWatermark); } @@ -314,7 +316,10 @@ public boolean tryToCommitRemoteLogManifest( remoteLogManifestPath = remoteLogStorage.writeRemoteLogManifestSnapshot(newRemoteLogManifest); } catch (Exception e) { - LOG.error("Write remote log manifest file to remote storage failed.", e); + LOG.error( + "Write remote log manifest file to remote storage failed for bucket {}.", + tableBucket, + e); return false; } @@ -364,8 +369,9 @@ public boolean tryToCommitRemoteLogManifest( // the commit failed with unexpected exception, like network error, we will // retry send. LOG.error( - "The {} time try to commit remote log manifest failed.", + "The {} time try to commit remote log manifest failed for bucket {}.", retrySendCommitTimes, + tableBucket, e); retrySendCommitTimes++; } @@ -458,8 +464,9 @@ private void deleteRemoteLogSegmentFiles( metricGroup.remoteLogDeleteRequests().inc(); } catch (Exception e) { LOG.error( - "Error occurred while deleting remote log segment files: {}, " + "Error occurred while deleting remote log segment files: {} for bucket {}, " + "the delete files operation will be skipped.", + tableBucket, remoteLogSegment, e); metricGroup.remoteLogDeleteErrors().inc(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java index d963f4fcfd..85f77e2b36 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java @@ -289,7 +289,10 @@ private void doHandleLeaderReplica( remoteLogStorage, coordinatorGateway, clock); - LOG.info("Created a new remote log task: {} and getting scheduled", task); + LOG.info( + "Created a new remote log task for table-bucket{}: {} and getting scheduled", + tableBucket, + task); ScheduledFuture future = rlManagerScheduledThreadPool.scheduleWithFixedDelay( task, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index e4e25de80f..0916730853 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -408,7 +408,10 @@ public void makeLeader(NotifyLeaderAndIsrData data) throws IOException { "the leader epoch %s in notify leader and isr data is smaller than the " + "current leader epoch %s for table bucket %s", requestLeaderEpoch, leaderEpoch, tableBucket); - LOG.warn("Ignore make leader because {}", errorMessage); + LOG.warn( + "Ignore make leader for bucket {} because {}", + tableBucket, + errorMessage); throw new FencedLeaderEpochException(errorMessage); } @@ -456,7 +459,10 @@ public boolean makeFollower(NotifyLeaderAndIsrData data) { "the leader epoch %s in notify leader and isr data is smaller than the " + "current leader epoch %s for table bucket %s", requestLeaderEpoch, leaderEpoch, tableBucket); - LOG.warn("Ignore make follower because {}", errorMessage); + LOG.warn( + "Ignore make follower for bucket {} because {}", + tableBucket, + errorMessage); throw new FencedLeaderEpochException(errorMessage); } @@ -559,7 +565,10 @@ private void createKv() { // resister the closeable registry for kv closeableRegistry.registerCloseable(closeableRegistryForKv); } catch (IOException e) { - LOG.warn("Fail to registry closeable registry for kv, it may cause resource leak.", e); + LOG.warn( + "Fail to registry closeable registry for kv for bucket {}, it may cause resource leak.", + tableBucket, + e); } // init kv tablet and get the snapshot it uses to init if have any @@ -569,7 +578,11 @@ private void createKv() { snapshotUsed = initKvTablet(); break; } catch (Exception e) { - LOG.warn("Fail to init kv tablet, retrying for {} times", i, e); + LOG.warn( + "Fail to init kv tablet for bucket {}, retrying for {} times", + tableBucket, + i, + e); } } // start periodic kv snapshot @@ -642,7 +655,10 @@ private Optional initKvTablet() { checkNotNull(kvTablet, "kv tablet should not be null."); restoreStartOffset = completedSnapshot.getLogOffset(); } else { - LOG.info("No snapshot found, restore from log."); + LOG.info( + "No snapshot found for {} of {}, restore from log.", + tableBucket, + physicalPath); // actually, kv manager always create a kv tablet since we will drop the kv // if it exists before init kv tablet kvTablet = @@ -830,7 +846,7 @@ private void startPeriodicKvSnapshot(@Nullable CompletedSnapshot completedSnapsh kvSnapshotManager.start(); closeableRegistryForKv.registerCloseable(kvSnapshotManager); } catch (Exception e) { - LOG.error("init kv periodic snapshot failed.", e); + LOG.error("init kv periodic snapshot for {} failed.", tableBucket, e); } } @@ -1003,7 +1019,11 @@ private boolean maybeIncrementLeaderHW(LogTablet leaderLog, long currentTimeMs) Optional oldWatermark = leaderLog.maybeIncrementHighWatermark(newHighWatermark); if (oldWatermark.isPresent()) { - LOG.debug("High watermark update from {} to {}.", oldWatermark.get(), newHighWatermark); + LOG.debug( + "High watermark update from {} to {} for bucket {}.", + oldWatermark.get(), + newHighWatermark, + tableBucket); return true; } else { return false; @@ -1080,9 +1100,10 @@ private void updateFollowerFetchState( } LOG.debug( - "Recorded replica {} log end offset (LEO) position {}.", + "Recorded replica {} log end offset (LEO) position {} for bucket {}.", localTabletServerId, - followerFetchOffsetMetadata.getMessageOffset()); + followerFetchOffsetMetadata.getMessageOffset(), + tableBucket); } private FollowerReplica getFollowerReplicaOrThrown(int followerId) { @@ -1534,7 +1555,7 @@ CompletableFuture submitAdjustIsr(IsrState.PendingIsrState propose private CompletableFuture submitAdjustIsr( IsrState.PendingIsrState proposedIsrState, CompletableFuture result) { - LOG.debug("Submitting ISR state change {}.", proposedIsrState); + LOG.debug("Submitting ISR state change {} for bucket {}.", proposedIsrState, tableBucket); adjustIsrManager .submit(tableBucket, proposedIsrState.sentLeaderAndIsr()) .whenComplete( @@ -1553,8 +1574,9 @@ private CompletableFuture submitAdjustIsr( // exactly, but we do know this response is out of date, // so we ignore it. LOG.debug( - "Ignoring failed ISR update to {} since we have already updated state to {}", + "Ignoring failed ISR update to {} for bucket {} since we have already updated state to {}", proposedIsrState, + tableBucket, isrState); } else if (leaderAndIsr != null) { hwIncremented.set( @@ -1600,8 +1622,9 @@ private boolean handleAdjustIsrUpdate( // Success from coordinator, still need to check a few things. if (leaderAndIsr.bucketEpoch() < bucketEpoch) { LOG.debug( - "Ignoring new ISR {} since we have a newer replica epoch {}", + "Ignoring new ISR {} for bucket {} since we have a newer replica epoch {}", leaderAndIsr, + tableBucket, bucketEpoch); return false; } else { @@ -1630,7 +1653,7 @@ private boolean handleAdjustIsrUpdate( try { return maybeIncrementLeaderHW(logTablet, clock.milliseconds()); } catch (IOException e) { - LOG.error("Failed to increment leader HW", e); + LOG.error("Failed to increment leader HW for bucket {}", tableBucket, e); return false; } } @@ -1656,34 +1679,39 @@ private boolean handleAdjustIsrError(IsrState.PendingIsrState proposedIsrState, // response. isrState = proposedIsrState.lastCommittedState(); LOG.info( - "Failed to adjust isr to {} since the adjust isr manager rejected the request with error {}. " + "Failed to adjust isr to {} for bucket {} since the adjust isr manager rejected the request with error {}. " + "Replica state has been reset to the latest committed state {}", proposedIsrState, + tableBucket, error, isrState); return false; case UNKNOWN_TABLE_OR_BUCKET_EXCEPTION: LOG.debug( - "Failed to adjust isr to {} since the coordinator doesn't know about this table or bucket. " + "Failed to adjust isr to {} for bucket {} since the coordinator doesn't know about this table or bucket. " + "Replica state may be out of sync, awaiting new the latest metadata.", - proposedIsrState); + proposedIsrState, + tableBucket); return false; case INVALID_UPDATE_VERSION_EXCEPTION: LOG.debug( - "Failed to adjust isr to {} because the request is invalid. Replica state may be out of sync, " + "Failed to adjust isr to {} for bucket {} because the request is invalid. Replica state may be out of sync, " + "awaiting new the latest metadata.", - proposedIsrState); + proposedIsrState, + tableBucket); return false; case FENCED_LEADER_EPOCH_EXCEPTION: LOG.debug( - "Failed to adjust isr to {} because the leader epoch is fenced which indicate this replica " + "Failed to adjust isr to {} for bucket {} because the leader epoch is fenced which indicate this replica " + "maybe no long leader. Replica state may be out of sync, awaiting new the latest metadata.", - proposedIsrState); + proposedIsrState, + tableBucket); return false; default: LOG.warn( - "Failed to adjust isr to {} due to unexpected error {}. Retrying.", + "Failed to adjust isr to {} for bucket {} due to unexpected error {}. Retrying.", proposedIsrState, + tableBucket, error); return true; } @@ -1863,7 +1891,8 @@ private void traceAckInfo(List curMaximalIsr, long requiredOffset) { }); LOG.trace( - "Progress awaiting ISR acks for offset {}, acked replicas: {}, awaiting replicas: {}", + "Progress awaiting ISR acks for bucket {} for offset {}, acked replicas: {}, awaiting replicas: {}", + tableBucket, requiredOffset, ackedReplicas.stream() .map(tuple -> "server-" + tuple.f0 + ":" + tuple.f1) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index f11fe1399f..8a5f856155 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -653,7 +653,9 @@ public void stopReplicas( + "The latest known leader epoch is %s for table bucket %s.", requestLeaderEpoch, currentLeaderEpoch, tb); LOG.warn( - "Ignore the stop replica request because {}", errorMessage); + "Ignore the stop replica request for bucket {} because {}", + tb, + errorMessage); result.add( new StopReplicaResultForBucket( tb,