Skip to content

Commit

Permalink
KAFKA-15169: Added TestCase in RemoteIndexCache (apache#14482)
Browse files Browse the repository at this point in the history
est Cases Covered

    1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
    2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
    3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169

Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
  • Loading branch information
iit2009060 authored and mjsax committed Nov 22, 2023
1 parent 82b168a commit bad8345
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 12 deletions.
258 changes: 247 additions & 11 deletions core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
Expand Up @@ -17,23 +17,26 @@
package kafka.log.remote

import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}

import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util
import java.util.Collections
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
Expand Down Expand Up @@ -524,23 +527,223 @@ class RemoteIndexCacheTest {
}
}

@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
createCorruptRemoteIndexCacheOffsetFile()
@ParameterizedTest
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = {
// create Corrupted Index File in remote index cache
createCorruptedIndexFile(indexType, cache.cacheDir())
val entry = cache.getIndexEntry(rlsMetadata)
// Test would fail if it throws corrupt Exception
val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
// Test would fail if it throws Exception other than CorruptIndexException
val offsetIndexFile = entry.offsetIndex.file().toPath
val txnIndexFile = entry.txnIndex.file().toPath
val timeIndexFile = entry.timeIndex.file().toPath

val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata)
val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata)

assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString)
assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)

// assert that parent directory for the index files is correct
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
s"offsetIndex=$offsetIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
s"txnIndex=$txnIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
s"timeIndex=$timeIndexFile is created under incorrect parent")

// file is corrupted it should fetch from remote storage again
verifyFetchIndexInvocation(count = 1)
}

@Test
def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
reset(rsm)
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupted index file
createCorruptTimeIndexOffsetFile(tpDir)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})

assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
// Current status
// (cache is null)
// RemoteCacheDir contain
// 1. Offset Index File is fine and not corrupted
// 2. Time Index File is corrupted
// What should be the code flow in next execution
// 1. No rsm call for fetching OffSet Index File.
// 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution.
// 3. Transaction index file should be fetched from remote storage.
reset(rsm)
// delete all files created in tpDir
Files.walk(tpDir.toPath, 1)
.filter(Files.isRegularFile(_))
.forEach(path => Files.deleteIfExists(path))
// rsm should return no corrupted file in the 2nd execution
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
cache.getIndexEntry(rlsMetadata)
// rsm should not be called to fetch offset Index
verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
// Transaction index would be fetched again
// as previous getIndexEntry failed before fetchTransactionIndex
verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))
}

@Test
def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = {
val remoteIndexCacheDir = cache.cacheDir()
val tempSuffix = ".tmptest"

def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}

def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix))))
}

val entry = cache.getIndexEntry(rlsMetadata)
verifyFetchIndexInvocation(count = 1)
// copy files with temporary name
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)))

cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())

// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")

// restore index files
renameRemoteCacheIndexFileFromDisk(tempSuffix)
// validate cache entry for the above key should be null
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
cache.getIndexEntry(rlsMetadata)
// Index Files already exist ,rsm should not fetch them again.
verifyFetchIndexInvocation(count = 1)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
}

@ParameterizedTest
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = {
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupt index file return from RSM
createCorruptedIndexFile(testIndexType, tpDir)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
}

@Test
def testConcurrentCacheDeletedFileExists(): Unit = {
val remoteIndexCacheDir = cache.cacheDir()

def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}

val entry = cache.getIndexEntry(rlsMetadata)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")

// Simulating a concurrency issue where deleted files already exist on disk
// This happen when cleanerThread is slow and not able to delete index entries
// while same index Entry is cached again and invalidated.
// The new deleted file created should be replaced by existing deleted file.

// create deleted suffix file
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))

// verify deleted file exists on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")

cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())

// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")

// verify no index files on disk
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}")
}

private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
Expand Down Expand Up @@ -581,6 +784,22 @@ class RemoteIndexCacheTest {
new TransactionIndex(metadata.startOffset(), txnIdxFile)
}

private def createCorruptTxnIndexForSegmentMetadata(dir: File, metadata: RemoteLogSegmentMetadata): TransactionIndex = {
val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
txnIdxFile.createNewFile()
val txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile)
val abortedTxns = List(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(txnIndex.append)
txnIndex.close()

// open the index with a different starting offset to fake invalid data
return new TransactionIndex(100L, txnIdxFile)
}

private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12)
Expand Down Expand Up @@ -616,12 +835,29 @@ class RemoteIndexCacheTest {
}
}

private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
private def createCorruptOffsetIndexFile(dir: File): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(dir, rlsMetadata))
pw.write("Hello, world")
// The size of the string written in the file is 12 bytes,
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}

private def createCorruptTimeIndexOffsetFile(dir: File): Unit = {
val pw = new PrintWriter(remoteTimeIndexFile(dir, rlsMetadata))
pw.write("Hello, world1")
// The size of the string written in the file is 13 bytes,
// but it should be multiple of Time Index EntrySIZE which is equal to 12.
pw.close()
}

private def createCorruptedIndexFile(indexType: IndexType, dir: File): Unit = {
if (indexType == IndexType.OFFSET) {
createCorruptOffsetIndexFile(dir)
} else if (indexType == IndexType.TIMESTAMP) {
createCorruptTimeIndexOffsetFile(dir)
} else if (indexType == IndexType.TRANSACTION) {
createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
}
}
}
Expand Up @@ -166,6 +166,11 @@ public Cache<Uuid, Entry> internalCache() {
return internalCache;
}

// Visible for testing
public File cacheDir() {
return cacheDir;
}

public void remove(Uuid key) {
lock.readLock().lock();
try {
Expand Down Expand Up @@ -674,4 +679,4 @@ public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata rem
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}

}
}

0 comments on commit bad8345

Please sign in to comment.