From b4ce673a2cdf90358361e73a46ae38a7fcd18c54 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 24 Mar 2025 14:37:14 +0800 Subject: [PATCH 1/2] [server] Fix CorruptIndexException for cached remote log index --- .../log/remote/RemoteLogIndexCache.java | 31 +++++++++++++------ .../log/remote/RemoteLogIndexCacheTest.java | 31 +++++++++++++++++++ 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java index 126ff9f31b..00b1e3b665 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java @@ -406,16 +406,27 @@ private void init() throws IOException { if (Files.exists(offsetIndexFile.toPath()) && Files.exists(timestampIndexFile.toPath())) { long offset = offsetFromRemoteIndexCacheFileName(indexFileName); - OffsetIndex offsetIndex = - new OffsetIndex(offsetIndexFile, offset, Integer.MAX_VALUE, false); - offsetIndex.sanityCheck(); - - TimeIndex timeIndex = - new TimeIndex(timestampIndexFile, offset, Integer.MAX_VALUE, false); - timeIndex.sanityCheck(); - - Entry entry = new Entry(offsetIndex, timeIndex); - internalCache.put(remoteSegmentId, entry); + try { + OffsetIndex offsetIndex = + new OffsetIndex( + offsetIndexFile, offset, Integer.MAX_VALUE, false); + offsetIndex.sanityCheck(); + + TimeIndex timeIndex = + new TimeIndex( + timestampIndexFile, offset, Integer.MAX_VALUE, false); + timeIndex.sanityCheck(); + + Entry entry = new Entry(offsetIndex, timeIndex); + internalCache.put(remoteSegmentId, entry); + } catch (CorruptIndexException e) { + LOG.debug( + "Remote offset/time log index is corrupt, delete corrupt index.", + e); + // let's delete offset index & time index + Files.deleteIfExists(offsetIndexFile.toPath()); + Files.deleteIfExists(timestampIndexFile.toPath()); + } } else { // Delete all of them if any one of those indexes is not available for a // specific segment id. diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCacheTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCacheTest.java index 34ba8d6cd4..637d6abb7d 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCacheTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCacheTest.java @@ -31,7 +31,10 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.File; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.UUID; @@ -106,6 +109,34 @@ void testFetchTimeIndexFromRemoteLogStorage(boolean partitionTable) throws Excep assertThat(offsetPosition.getOffset()).isEqualTo(resultOffset); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testInitWithCorruptIndex(boolean partitionTable) throws Exception { + LogTablet logTablet = makeLogTabletAndAddSegments(partitionTable); + // 1. first upload one segment to remote. + RemoteLogSegment remoteLogSegment = copyLogSegmentToRemote(logTablet, remoteLogStorage, 0); + + rlIndexCache = new RemoteLogIndexCache(1024 * 1024L, remoteLogStorage, tempDir); + File file = rlIndexCache.getIndexEntry(remoteLogSegment).timeIndex().file(); + // mock corrupt index + try (FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.APPEND)) { + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {0})); + } + } + + // re-initialize index cache and lookup offset + rlIndexCache = new RemoteLogIndexCache(1024 * 1024L, remoteLogStorage, tempDir); + TimeIndex timeIndex = rlIndexCache.getIndexEntry(remoteLogSegment).timeIndex(); + TimestampOffset timestampOffset = timeIndex.entry(0); + + OffsetIndex offsetIndex = rlIndexCache.getIndexEntry(remoteLogSegment).offsetIndex(); + OffsetPosition offsetPosition = offsetIndex.lookup(timestampOffset.offset); + long resultOffset = + rlIndexCache.lookupOffsetForTimestamp(remoteLogSegment, timestampOffset.timestamp); + assertThat(offsetPosition.getOffset()).isEqualTo(resultOffset); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testPositionForNonExistingIndexFromRemoteStorage(boolean partitionTable) throws Exception { From 8d7524027e2e37b890cf34cfbd66a19b78b4c27b Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Mon, 24 Mar 2025 15:43:31 +0800 Subject: [PATCH 2/2] address comments --- .../fluss/server}/exception/CorruptIndexException.java | 6 +++--- .../java/com/alibaba/fluss/server/log/AbstractIndex.java | 2 +- .../main/java/com/alibaba/fluss/server/log/OffsetIndex.java | 2 +- .../main/java/com/alibaba/fluss/server/log/TimeIndex.java | 2 +- .../fluss/server/log/remote/RemoteLogIndexCache.java | 2 +- .../java/com/alibaba/fluss/server/log/TimeIndexTest.java | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) rename {fluss-common/src/main/java/com/alibaba/fluss => fluss-server/src/main/java/com/alibaba/fluss/server}/exception/CorruptIndexException.java (84%) diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/CorruptIndexException.java b/fluss-server/src/main/java/com/alibaba/fluss/server/exception/CorruptIndexException.java similarity index 84% rename from fluss-common/src/main/java/com/alibaba/fluss/exception/CorruptIndexException.java rename to fluss-server/src/main/java/com/alibaba/fluss/server/exception/CorruptIndexException.java index 05110aadcb..fbad7c25d9 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/exception/CorruptIndexException.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/exception/CorruptIndexException.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.alibaba.fluss.exception; +package com.alibaba.fluss.server.exception; import com.alibaba.fluss.annotation.PublicEvolving; @@ -25,7 +25,7 @@ * @since 0.1 */ @PublicEvolving -public class CorruptIndexException extends RetriableException { +public class CorruptIndexException extends RuntimeException { public CorruptIndexException(String message) { super(message); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java index 666a523af1..ebe7448f9d 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java @@ -16,8 +16,8 @@ package com.alibaba.fluss.server.log; -import com.alibaba.fluss.exception.CorruptIndexException; import com.alibaba.fluss.exception.IndexOffsetOverflowException; +import com.alibaba.fluss.server.exception.CorruptIndexException; import com.alibaba.fluss.utils.FileUtils; import com.alibaba.fluss.utils.IOUtils; import com.alibaba.fluss.utils.OperatingSystem; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java index 679ce9fc70..6502d0213f 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java @@ -16,9 +16,9 @@ package com.alibaba.fluss.server.log; -import com.alibaba.fluss.exception.CorruptIndexException; import com.alibaba.fluss.exception.IndexOffsetOverflowException; import com.alibaba.fluss.exception.InvalidOffsetException; +import com.alibaba.fluss.server.exception.CorruptIndexException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java index 4fbd266699..247a51718b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java @@ -16,8 +16,8 @@ package com.alibaba.fluss.server.log; -import com.alibaba.fluss.exception.CorruptIndexException; import com.alibaba.fluss.exception.InvalidOffsetException; +import com.alibaba.fluss.server.exception.CorruptIndexException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java index 00b1e3b665..33cd22497a 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/RemoteLogIndexCache.java @@ -17,10 +17,10 @@ package com.alibaba.fluss.server.log.remote; import com.alibaba.fluss.annotation.VisibleForTesting; -import com.alibaba.fluss.exception.CorruptIndexException; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.exception.RemoteStorageException; import com.alibaba.fluss.remote.RemoteLogSegment; +import com.alibaba.fluss.server.exception.CorruptIndexException; import com.alibaba.fluss.server.log.OffsetIndex; import com.alibaba.fluss.server.log.OffsetPosition; import com.alibaba.fluss.server.log.StorageAction; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/TimeIndexTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/TimeIndexTest.java index 86683790a5..fff0f00def 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/TimeIndexTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/TimeIndexTest.java @@ -16,8 +16,8 @@ package com.alibaba.fluss.server.log; -import com.alibaba.fluss.exception.CorruptIndexException; import com.alibaba.fluss.exception.InvalidOffsetException; +import com.alibaba.fluss.server.exception.CorruptIndexException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test;