Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15169: Added TestCase in RemoteIndexCache #14482

Merged
merged 10 commits into from
Oct 11, 2023
269 changes: 257 additions & 12 deletions core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@
package kafka.log.remote

import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}

import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util
import java.util.Collections
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
Expand Down Expand Up @@ -276,6 +279,8 @@ class RemoteIndexCacheTest {
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished,
"Failed to finish cleanup cache entry after invalidation")

// first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called
verify(cacheEntry, times(2)).markForCleanup()
Expand Down Expand Up @@ -516,7 +521,7 @@ class RemoteIndexCacheTest {
def testClearCacheAndIndexFilesWhenResizeCache(): Unit = {

def getIndexFileFromRemoteCacheDir(suffix: String) = {
Files.walk(cache.cacheDir())
Files.walk(cache.cacheDir().toPath())
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
Expand All @@ -540,6 +545,8 @@ class RemoteIndexCacheTest {
"Failed to mark cache entry for cleanup after resizing cache.")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
"Failed to cleanup cache entry after resizing cache.")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished,
"Failed to finish cleanup cache entry after resizing cache.")

// verify no index files on remote cache dir
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 @hudeqi , adding an isCleanFinished flag just for test is not a good solution. In this case, could we catch the exception in the getIndexFileFromRemoteCacheDir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hudeqi Can you resolve the above comment ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I have removed the isCleanFinished flag changes and catch the Exception in the method.
cc @hudeqi
Please review it .

Expand All @@ -554,23 +561,227 @@ class RemoteIndexCacheTest {
assertTrue(cache.internalCache().estimatedSize() == 0)
}

@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
createCorruptRemoteIndexCacheOffsetFile()
@ParameterizedTest
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = {
// create Corrupted Index File in remote index cache
createCorruptedIndexFile(indexType, cache.cacheDir())
val entry = cache.getIndexEntry(rlsMetadata)
// Test would fail if it throws corrupt Exception
val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
// Test would fail if it throws Exception other than CorruptIndexException
val offsetIndexFile = entry.offsetIndex.file().toPath
val txnIndexFile = entry.txnIndex.file().toPath
val timeIndexFile = entry.timeIndex.file().toPath

val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata)
val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata)

assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString)
assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)

// assert that parent directory for the index files is correct
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
s"offsetIndex=$offsetIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
s"txnIndex=$txnIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
s"timeIndex=$timeIndexFile is created under incorrect parent")

// file is corrupted it should fetch from remote storage again
verifyFetchIndexInvocation(count = 1)
}

@Test
def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
reset(rsm)
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupted index file
createCorruptTimeIndexOffsetFile(tpDir)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})

assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
// Current status
// (cache is null)
// RemoteCacheDir contain
// 1. Offset Index File is fine and not corrupted
// 2. Time Index File is corrupted
// What should be the code flow in next execution
// 1. No rsm call for fetching OffSet Index File.
// 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution.
showuon marked this conversation as resolved.
Show resolved Hide resolved
// 3. Transaction index file should be fetched from remote storage.
reset(rsm)
// delete all files created in tpDir
Files.walk(tpDir.toPath, 1)
.filter(Files.isRegularFile(_))
.forEach(path => Files.deleteIfExists(path))
// rsm should return no corrupted file in the 2nd execution
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
cache.getIndexEntry(rlsMetadata)
// rsm should not be called to fetch offset Index
verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
// Transaction index would be fetched again
// as previous getIndexEntry failed before fetchTransactionIndex
verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Txn index file was not corrupted. Can we add a comment to explain why are we fetching it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Test
def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = {
showuon marked this conversation as resolved.
Show resolved Hide resolved
val remoteIndexCacheDir = cache.cacheDir()
val tempSuffix = ".tmptest"

def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}

def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix))))
}

val entry = cache.getIndexEntry(rlsMetadata)
verifyFetchIndexInvocation(count = 1)
// copy files with temporary name
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)))

cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())

// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanFinished,
"Failed to finish cleanup cache entry after invalidation")

// restore index files
renameRemoteCacheIndexFileFromDisk(tempSuffix)
// validate cache entry for the above key should be null
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
cache.getIndexEntry(rlsMetadata)
// Index Files already exist ,rsm should not fetch them again.
verifyFetchIndexInvocation(count = 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be 0 if there is going to be no fetch invocation?

Copy link
Contributor Author

@iit2009060 iit2009060 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya To pre-create the valid index file in remote storage cache dir , I initially fetched it from the remote storage and then run the scenario. The 1 count is for pre-processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. So that will confuse readers. Please do either:

  1. reset the mock rsm
    or
  2. run verifyFetchIndexInvocation(count = 1) again before L672, so that we can make sure the count is not increasing even we call cache.getIndexEntry again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 , have you addressed this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @showuon Thanks for identifying it , I missed it.
Changes Done.

// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
}

@ParameterizedTest
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = {
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bug in existing createOffsetIndexForSegmentMetadata and similar function. They create cache at the wrong place in the line:
new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata)
Instead of tpDir, it should point to remote cache director which is a folder inside tpDir.

Please fix that and see if it impacts your test in any way.

Copy link
Contributor Author

@iit2009060 iit2009060 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya But when we call get index entry , it will create indexes file under remote cache directory only. It will read file content from tpDir and create indexes in "remoteIndexCache" dir.
IMO "createOffsetIndexForSegmentMetadata" will create indexes file and tpDir act as a placeholder for files fetched from remoteStorage. If we change it to remote cache dir , all test cases will always have the file exist behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya We can keep this function i.e createOffsetIndexForSegmentMetadata to be used for indexes received from remote storage rather than remoteindexcache.

The existing test case are working because they directly put values into internalCache(Caeffine) rather than the getIndexEntry route.

In the above test case I want to corrupt the Index files received from remote storage rather than from remote index cache.
getIndexEntry does following steps

  1. Fetch indexes from remote Storage( i.e indexes stored in tpDir directory as per the mock)
  2. Manually Corrupt the indexes file stored in tpDir Directory after fetch from remote storage.
  3. It tries to copy indexes files in tpDir directory to remote index cache directory.
  4. Run sanity check on indexes stored in tpDir/remoteindexcache directory.
  5. It throws CorruptedIndexException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have 3 variants of this test where we are corruption all 3 indexes one by one. You can use @Parameterized to run those scenarios in same test.

Copy link
Contributor Author

@iit2009060 iit2009060 Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@divijvaidya Is it possible to parametrize based on function as a parameter ? Otherwise I need to write if else condition in the same test ?
Corrupting Time and Offset Index is easy based on entry_size , any suggestion how to do it for TransactionIndex ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 , if / else is fine with me, just make the test clear.

For txnIndex corruption, I usually check other tests if they did something similar as we want. Here's a good reference:

def testSanityCheck(): Unit = {
val abortedTxns = List(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(index.append)
index.close()
// open the index with a different starting offset to fake invalid data
val reopenedIndex = new TransactionIndex(100L, file)
assertThrows(classOf[CorruptIndexException], () => reopenedIndex.sanityCheck())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupt index file return from RSM
createCorruptedIndexFile(testIndexType, tpDir)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
}

@Test
def testConcurrentCacheDeletedFileExists(): Unit = {
val remoteIndexCacheDir = cache.cacheDir()

def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}

val entry = cache.getIndexEntry(rlsMetadata)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")

// Simulating a concurrency issue where deleted files already exist on disk
// This happen when cleanerThread is slow and not able to delete index entries
// while same index Entry is cached again and invalidated.
// The new deleted file created should be replaced by existing deleted file.

// create deleted suffix file
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))

// verify deleted file exists on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")

cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())

// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanStarted,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean started doesn't mean that index has been deleted. There could be a case where next line executes even though file has not been deleted yet. I would suggest to wrap the next assertions in waitUntilTrue(). It would make the test non-flaky.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"Failed to cleanup cache entry after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanFinished,
"Failed to finish cleanup cache entry after invalidation")

// verify no index files on disk
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}")
}

private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
Expand Down Expand Up @@ -611,6 +822,22 @@ class RemoteIndexCacheTest {
new TransactionIndex(metadata.startOffset(), txnIdxFile)
}

private def createCorruptTxnIndexForSegmentMetadata(dir: File, metadata: RemoteLogSegmentMetadata): TransactionIndex = {
val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
txnIdxFile.createNewFile()
val txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile)
val abortedTxns = List(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(txnIndex.append)
txnIndex.close()

// open the index with a different starting offset to fake invalid data
return new TransactionIndex(100L, txnIdxFile)
}

private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12)
Expand Down Expand Up @@ -680,11 +907,29 @@ class RemoteIndexCacheTest {
})
}

private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
private def createCorruptOffsetIndexFile(dir: File): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(dir, rlsMetadata))
pw.write("Hello, world")
// The size of the string written in the file is 12 bytes,
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}

private def createCorruptTimeIndexOffsetFile(dir: File): Unit = {
val pw = new PrintWriter(remoteTimeIndexFile(dir, rlsMetadata))
pw.write("Hello, world1")
// The size of the string written in the file is 13 bytes,
// but it should be multiple of Time Index EntrySIZE which is equal to 12.
pw.close()
}

private def createCorruptedIndexFile(indexType: IndexType, dir: File): Unit = {
if (indexType == IndexType.OFFSET) {
createCorruptOffsetIndexFile(dir)
} else if (indexType == IndexType.TIMESTAMP) {
createCorruptTimeIndexOffsetFile(dir)
} else if (indexType == IndexType.TRANSACTION) {
createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ public Cache<Uuid, Entry> internalCache() {
}

// Visible for testing
public Path cacheDir() {
return cacheDir.toPath();
public File cacheDir() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we file a ticket to move the RemoteIndexCacheTest to storage module under the same package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph I have just started understanding of the kafka ecosystem , Can you help me understand the rational behind it , Then i can create a ticket with details.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find more details about the intention to move the source code to Java in KAFKA-14524

return cacheDir;
}

public void remove(Uuid key) {
Expand Down Expand Up @@ -489,6 +489,8 @@ public static class Entry implements AutoCloseable {

private boolean cleanStarted = false;

private boolean cleanFinished = false;

private boolean markedForCleanup = false;

private final long entrySizeBytes;
Expand Down Expand Up @@ -520,6 +522,11 @@ public boolean isCleanStarted() {
return cleanStarted;
}

// Visible for testing
public boolean isCleanFinished() {
return cleanFinished;
}

// Visible for testing
public boolean isMarkedForCleanup() {
return markedForCleanup;
Expand Down Expand Up @@ -597,6 +604,7 @@ public void cleanup() throws IOException {
});

tryAll(actions);
cleanFinished = true;
}
} finally {
lock.writeLock().unlock();
Expand Down