diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index fce5f15331..7aa7343adc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -820,7 +820,8 @@ protected class RecoverIterator implements Iterator { private final long windowLength; private final long skipRecordAtOffset; private long nextRecoverOffset; - private long firstInvalidOffset = -1; + private long maybeFirstInvalidCycle = -1; + private long maybeFirstInvalidOffset = -1; private RecoverResult next; public RecoverIterator(long nextRecoverOffset, long windowLength, long skipRecordAtOffset) { @@ -860,12 +861,18 @@ private boolean tryReadNextRecord() { if (next != null) { return true; } - while (firstInvalidOffset == -1 || nextRecoverOffset < firstInvalidOffset + windowLength) { + while (maybeFirstInvalidOffset == -1 || nextRecoverOffset < maybeFirstInvalidOffset + windowLength) { + long cycle = WALUtil.calculateCycle(nextRecoverOffset, walHeader.getCapacity(), WAL_HEADER_TOTAL_CAPACITY); + boolean skip = nextRecoverOffset == skipRecordAtOffset; try { - boolean skip = nextRecoverOffset == skipRecordAtOffset; ByteBuf nextRecordBody = readRecord(nextRecoverOffset, offset -> WALUtil.recordOffsetToPosition(offset, walHeader.getCapacity(), WAL_HEADER_TOTAL_CAPACITY)); RecoverResultImpl recoverResult = new RecoverResultImpl(nextRecordBody, nextRecoverOffset); nextRecoverOffset += RECORD_HEADER_SIZE + nextRecordBody.readableBytes(); + if (maybeFirstInvalidCycle != -1 && maybeFirstInvalidCycle != cycle) { + // we meet a valid record in the next cycle, so the "invalid" record we met before is not really invalid + maybeFirstInvalidOffset = -1; + maybeFirstInvalidCycle = -1; + } if (skip) { nextRecordBody.release(); continue; @@ -873,11 +880,12 @@ private boolean tryReadNextRecord() { next = recoverResult; return true; } catch (ReadRecordException e) { - if (firstInvalidOffset == -1 && WALUtil.isAligned(nextRecoverOffset) && nextRecoverOffset != skipRecordAtOffset) { - // first invalid offset - LOGGER.info("meet the first invalid offset during recovery. offset: {}, window: {}, detail: '{}'", - nextRecoverOffset, windowLength, e.getMessage()); - firstInvalidOffset = nextRecoverOffset; + if (maybeFirstInvalidOffset == -1 && WALUtil.isAligned(nextRecoverOffset) && !skip) { + maybeFirstInvalidCycle = cycle; + maybeFirstInvalidOffset = nextRecoverOffset; + // maybe the first invalid offset + LOGGER.info("maybe meet the first invalid offset during recovery. cycle: {}, offset: {}, window: {}, detail: '{}'", + maybeFirstInvalidCycle, maybeFirstInvalidOffset, windowLength, e.getMessage()); } nextRecoverOffset = e.getJumpNextRecoverOffset(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java index d306410ee8..7aeaf964b4 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/RecoverTool.java @@ -62,6 +62,7 @@ private void run(Config config) { private Iterator recover(WALHeader header, Config config) { long recoverOffset = config.offset != null ? config.offset : header.getTrimOffset(); long windowLength = header.getSlidingWindowMaxLength(); + // TODO: in this tool, we don't need to skip the record at the trimmed offset long skipRecordAtOffset = config.skipTrimmed ? header.getTrimOffset() : -1; return new RecoverIterator(recoverOffset, windowLength, skipRecordAtOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java index 62f03ed17b..588215894c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java @@ -54,6 +54,11 @@ public static long recordOffsetToPosition(long offset, long physicalCapacity, lo return offset % capacity + headerSize; } + public static long calculateCycle(long offset, long physicalCapacity, long headerSize) { + long capacity = physicalCapacity - headerSize; + return offset / capacity; + } + public static long alignLargeByBlockSize(long offset) { return offset % BLOCK_SIZE == 0 ? offset : offset + BLOCK_SIZE - offset % BLOCK_SIZE; } diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java index 5213a33754..f4df06494f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/wal/BlockWALServiceTest.java @@ -412,11 +412,20 @@ public static Stream testRecoverFromDisasterData() { WALUtil.BLOCK_SIZE + 1, 100L, 180L, - 50L, + 30L, Arrays.asList(150L, 160L, 170L, 180L, 190L, 200L, 202L, 210L, 220L, 230L, 240L), Arrays.asList(190L, 200L, 202L, 210L, 220L, 230L), WALUtil.BLOCK_SIZE ).toArguments("round robin"), + new RecoverFromDisasterParam( + WALUtil.BLOCK_SIZE * 2 + 1, + 100L, + 192L, + 3L, + Arrays.asList(192L, 195L, /* no place for 198L, */ 200L, 203L, 206L, 209L, 212L, 215L), + Arrays.asList(195L, 200L, 203L, 206L, 209L, 212L, 215L), + WALUtil.BLOCK_SIZE + ).toArguments("round robin - no place for the last record"), new RecoverFromDisasterParam( WALUtil.BLOCK_SIZE + 1, 100L,