Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,11 @@ public FetchDataInfo read(
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Reading maximum {} bytes at offset {} from log with total length {} bytes",
"Reading maximum {} bytes at offset {} from log with total length {} bytes for bucket {}",
maxLength,
readOffset,
segments.sizeInBytes());
segments.sizeInBytes(),
tableBucket);
}

long startOffset = localLogStartOffset;
Expand Down Expand Up @@ -471,21 +472,22 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
// true for an active segment of size zero because one of the indexes is
// "full" (due to _maxEntries == 0).
LOG.warn(
"Trying to roll a new log segment with start offset "
+ newOffset
+ " =max(provided offset = "
+ expectedNextOffset
+ ", LEO = "
+ getLocalLogEndOffset()
+ ") while it already exists and is active with size 0."
+ ", size of offset index: "
+ activeSegment.offsetIndex().entries()
+ ".");
"Trying to roll a new log segment for bucket {} with start offset {} "
+ "=max(provided offset = {}, LEO = {}) while it already exists "
+ "and is active with size 0, size of offset index: {}.",
tableBucket,
newOffset,
expectedNextOffset,
getLocalLogEndOffset(),
activeSegment.offsetIndex().entries());
LogSegment newSegment =
createAndDeleteSegment(
newOffset, activeSegment, SegmentDeletionReason.LOG_ROLL);
updateLogEndOffset(getLocalLogEndOffset());
LOG.info("Rolled new log segment at offset " + newOffset);
LOG.info(
"Rolled new log segment for bucket {} at offset {}",
tableBucket,
newOffset);
return newSegment;
} else {
throw new FlussRuntimeException(
Expand Down Expand Up @@ -520,9 +522,9 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
for (File file : Arrays.asList(logFile, offsetIdxFile, timeIndexFile)) {
if (file.exists()) {
LOG.warn(
"Newly rolled segment file "
+ file.getAbsolutePath()
+ " already exists; deleting it first");
"Newly rolled segment file {} for bucket {} already exists; deleting it first",
tableBucket,
file.getAbsolutePath());
Files.delete(file.toPath());
}
}
Expand All @@ -536,7 +538,7 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
// metadata when log rolls.
// The next offset should not change.
updateLogEndOffset(getLocalLogEndOffset());
LOG.info("Rolled new log segment at offset " + newOffset);
LOG.info("Rolled new log segment for bucket {} at offset {}", tableBucket, newOffset);
return newSegment;
}

Expand All @@ -547,7 +549,7 @@ LogSegment roll(Optional<Long> expectedNextOffset) throws IOException {
* @return the list of segments that were scheduled for deletion
*/
List<LogSegment> truncateFullyAndStartAt(long newOffset) throws IOException {
LOG.debug("Truncate and start at offset " + newOffset);
LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, tableBucket);

checkIfMemoryMappedBufferClosed();
List<LogSegment> segmentsToDelete = segments.values();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private void loadSegmentFiles() throws IOException {
File logFile = FlussPaths.logFile(logTabletDir, offset);
if (!logFile.exists()) {
LOG.warn(
"Found an orphaned index file {}, with no corresponding log file.",
"Found an orphaned index file {} for bucket {}, with no corresponding log file.",
logSegments.getTableBucket(),
file.getAbsolutePath());
Files.deleteIfExists(file.toPath());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,14 +430,18 @@ private void updateHighWatermarkMetadata(LogOffsetMetadata newHighWatermark) {
synchronized (lock) {
if (newHighWatermark.getMessageOffset() < highWatermarkMetadata.getMessageOffset()) {
LOG.warn(
"Non-monotonic update of high watermark from {} to {}",
"Non-monotonic update of high watermark from {} to {} for bucket {}",
highWatermarkMetadata,
newHighWatermark);
newHighWatermark,
localLog.getTableBucket());
}
highWatermarkMetadata = newHighWatermark;
// TODO log offset listener to update log offset.
}
LOG.trace("Setting high watermark {}", newHighWatermark);
LOG.trace(
"Setting high watermark {} for bucket {}",
newHighWatermark,
localLog.getTableBucket());
}

/**
Expand Down Expand Up @@ -567,17 +571,19 @@ private void deleteSegments(long cleanUpToOffset) {
long localLogStartOffset = localLog.getLocalLogStartOffset();
if (cleanUpToOffset < localLogStartOffset) {
LOG.debug(
"Ignore the delete segments action while the input cleanUpToOffset {} "
"Ignore the delete segments action for bucket {} while the input cleanUpToOffset {} "
+ "is smaller than the current localLogStartOffset {}",
getTableBucket(),
cleanUpToOffset,
localLogStartOffset);
return;
}

if (cleanUpToOffset > getHighWatermark()) {
LOG.warn(
"Ignore the delete segments action while the input cleanUpToOffset {} "
"Ignore the delete segments action for bucket {} while the input cleanUpToOffset {} "
+ "is larger than the current highWatermark {}",
getTableBucket(),
cleanUpToOffset,
getHighWatermark());
return;
Expand Down Expand Up @@ -716,11 +722,13 @@ private LogAppendInfo append(MemoryLogRecords records, boolean appendAsLeader)
// todo update the first unstable offset (which is used to compute lso)

LOG.trace(
"Appended message set with last offset: {}, first offset {}, next offset: {} and messages {}",
"Appended message set with last offset: {}, first offset {}, next offset: {} "
+ "and messages {} for bucket {}",
appendInfo.lastOffset(),
appendInfo.firstOffset(),
localLog.getLocalLogEndOffset(),
validRecords);
validRecords,
getTableBucket());

if (localLog.unflushedMessages() >= logFlushIntervalMessages) {
flush(false);
Expand Down Expand Up @@ -787,11 +795,12 @@ private void flush(long offset, boolean includingOffset) throws IOException {
if (flushOffset > localLog.getRecoveryPoint()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Flushing log up to offset {} ({}) with recovery point {}, unflushed: {}",
"Flushing log up to offset {} ({}) with recovery point {}, unflushed: {}, for bucket {}",
offset,
includingOffsetStr,
flushOffset,
localLog.unflushedMessages());
localLog.unflushedMessages(),
getTableBucket());
}

localLog.flush(flushOffset);
Expand All @@ -810,7 +819,9 @@ private void maybeRoll(int messageSize, LogAppendInfo appendInfo) throws Excepti
new RollParams(maxSegmentFileSize, appendInfo.lastOffset(), messageSize))) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Rolling new log segment (log_size = {}/{}), offset_index_size = {}/{}, time_index_size = {}/{}",
"Rolling new log segment for bucket {} (log_size = {}/{}), offset_index_size = {}/{}, "
+ "time_index_size = {}/{}",
getTableBucket(),
segment.getSizeInBytes(),
maxSegmentFileSize,
segment.offsetIndex().entries(),
Expand Down Expand Up @@ -863,12 +874,13 @@ boolean truncateTo(long targetOffset) throws LogStorageException {

if (targetOffset >= localLog.getLocalLogEndOffset()) {
LOG.info(
"Truncate to {} has no effect as the largest offset in the log is {}.",
"Truncate to {} for bucket {} has no effect as the largest offset in the log is {}.",
targetOffset,
getTableBucket(),
localLog.getLocalLogEndOffset() - 1);
return false;
} else {
LOG.info("Truncating to offset {}", targetOffset);
LOG.info("Truncating to offset {} for bucket {}", targetOffset, getTableBucket());
synchronized (lock) {
try {
localLog.checkIfMemoryMappedBufferClosed();
Expand Down Expand Up @@ -902,7 +914,7 @@ boolean truncateTo(long targetOffset) throws LogStorageException {

/** Delete all data in the log and start at the new offset. */
void truncateFullyAndStartAt(long newOffset) throws LogStorageException {
LOG.debug("Truncate and start at offset {}", newOffset);
LOG.debug("Truncate and start at offset {} for bucket {}", newOffset, getTableBucket());
synchronized (lock) {
try {
localLog.truncateFullyAndStartAt(newOffset);
Expand Down Expand Up @@ -950,14 +962,14 @@ public List<LogSegment> logSegments() {
}

public void close() {
LOG.debug("close log tablet");
LOG.debug("close log tablet for bucket {}", getTableBucket());
synchronized (lock) {
localLog.checkIfMemoryMappedBufferClosed();
writerExpireCheck.cancel(true);
try {
writerStateManager.takeSnapshot();
} catch (IOException e) {
LOG.error("Error while taking writer snapshot.", e);
LOG.error("Error while taking writer snapshot for bucket {}.", getTableBucket(), e);
}
localLog.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ private List<EnrichedLogSegment> candidateToCopyLogSegments(LogTablet log) {
long fromOffset = Math.max(copiedOffset + 1, log.localLogStartOffset());
candidateLogSegments = candidateLogSegments(log, fromOffset, highWatermark);
LOG.debug(
"Candidate log segments: logLocalStartOffset: {}, copiedOffset: {}, "
"Candidate log segments for bucket {}: logLocalStartOffset: {}, copiedOffset: {}, "
+ "fromOffset: {}, highWatermark: {} and candidateLogSegments: {}",
tableBucket,
log.localLogStartOffset(),
copiedOffset,
fromOffset,
Expand All @@ -216,7 +217,8 @@ private List<EnrichedLogSegment> candidateToCopyLogSegments(LogTablet log) {
}
} else {
LOG.debug(
"Skipping copying segments to remote, current read-offset:{}, and highWatermark:{}",
"Skipping copying segments for bucket {} to remote, current read-offset:{}, and highWatermark:{}",
tableBucket,
copiedOffset,
highWatermark);
}
Expand Down Expand Up @@ -314,7 +316,10 @@ public boolean tryToCommitRemoteLogManifest(
remoteLogManifestPath =
remoteLogStorage.writeRemoteLogManifestSnapshot(newRemoteLogManifest);
} catch (Exception e) {
LOG.error("Write remote log manifest file to remote storage failed.", e);
LOG.error(
"Write remote log manifest file to remote storage failed for bucket {}.",
tableBucket,
e);
return false;
}

Expand Down Expand Up @@ -364,8 +369,9 @@ public boolean tryToCommitRemoteLogManifest(
// the commit failed with unexpected exception, like network error, we will
// retry send.
LOG.error(
"The {} time try to commit remote log manifest failed.",
"The {} time try to commit remote log manifest failed for bucket {}.",
retrySendCommitTimes,
tableBucket,
e);
retrySendCommitTimes++;
}
Expand Down Expand Up @@ -458,8 +464,9 @@ private void deleteRemoteLogSegmentFiles(
metricGroup.remoteLogDeleteRequests().inc();
} catch (Exception e) {
LOG.error(
"Error occurred while deleting remote log segment files: {}, "
"Error occurred while deleting remote log segment files: {} for bucket {}, "
+ "the delete files operation will be skipped.",
tableBucket,
remoteLogSegment,
e);
metricGroup.remoteLogDeleteErrors().inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ private void doHandleLeaderReplica(
remoteLogStorage,
coordinatorGateway,
clock);
LOG.info("Created a new remote log task: {} and getting scheduled", task);
LOG.info(
"Created a new remote log task for table-bucket{}: {} and getting scheduled",
tableBucket,
task);
ScheduledFuture<?> future =
rlManagerScheduledThreadPool.scheduleWithFixedDelay(
task,
Expand Down
Loading