Skip to content

Commit

Permalink
Pulled the changes from the trunk that were made in RemotreIndexCache…
Browse files Browse the repository at this point in the history
….scala

Other minor changes
  • Loading branch information
satishd committed Jul 9, 2023
1 parent 8105464 commit 2d8271e
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 107 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@
<suppress checks="CyclomaticComplexity"
files="(LogValidator|RemoteLogManagerConfig).java"/>
<suppress checks="NPathComplexity"
files="LogValidator.java"/>
files="(LogValidator|RemoteIndexCache).java"/>
<suppress checks="ParameterNumber"
files="(LogAppendInfo|RemoteLogManagerConfig).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
*/
package kafka.log.remote

import kafka.log.UnifiedLog
import kafka.log.remote.RemoteIndexCache.{RemoteLogIndexCacheCleanerThread, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
Expand Down Expand Up @@ -62,7 +61,7 @@ class RemoteIndexCacheTest {

val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
val rsm: RemoteStorageManager = mock(classOf[RemoteStorageManager])
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))

cache = new RemoteIndexCache(rsm, tpDir.toString)

Expand All @@ -72,12 +71,12 @@ class RemoteIndexCacheTest {
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val trxIdx = createTxIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
Expand All @@ -94,7 +93,7 @@ class RemoteIndexCacheTest {
Utils.delete(logDir)
// Verify no lingering threads. It is important to have this as the very last statement in the @aftereach
// because this may throw an exception and prevent cleanup after it
TestUtils.assertNoNonDaemonThreads(RemoteIndexCache.RemoteLogIndexCacheCleanerThread)
TestUtils.assertNoNonDaemonThreads(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD)
}

@Test
Expand All @@ -113,11 +112,11 @@ class RemoteIndexCacheTest {
assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)

// assert that parent directory for the index files is correct
assertEquals(RemoteIndexCache.DirName, offsetIndexFile.getParent.getFileName.toString,
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DirName, txnIndexFile.getParent.getFileName.toString,
assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
s"txnIndex=$txnIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DirName, timeIndexFile.getParent.getFileName.toString,
assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
s"timeIndex=$timeIndexFile is created under incorrect parent")
}

Expand Down Expand Up @@ -242,9 +241,9 @@ class RemoteIndexCacheTest {
val cacheEntry = generateSpyCacheEntry()

// verify index files on disk
assertTrue(getIndexFileFromDisk(UnifiedLog.IndexFileSuffix).isPresent, s"Offset index file should be present on disk at ${tpDir.toPath}")
assertTrue(getIndexFileFromDisk(UnifiedLog.TxnIndexFileSuffix).isPresent, s"Txn index file should be present on disk at ${tpDir.toPath}")
assertTrue(getIndexFileFromDisk(UnifiedLog.TimeIndexFileSuffix).isPresent, s"Time index file should be present on disk at ${tpDir.toPath}")
assertTrue(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${tpDir.toPath}")
assertTrue(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${tpDir.toPath}")
assertTrue(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${tpDir.toPath}")

// add the spied entry into the cache, it will overwrite the non-spied entry
cache.internalCache.put(internalIndexKey, cacheEntry)
Expand Down Expand Up @@ -272,11 +271,11 @@ class RemoteIndexCacheTest {
verify(cacheEntry.txnIndex).renameTo(any(classOf[File]))

// verify no index files on disk
assertFalse(getIndexFileFromDisk(UnifiedLog.IndexFileSuffix).isPresent,
assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(UnifiedLog.TxnIndexFileSuffix).isPresent,
assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(UnifiedLog.TimeIndexFileSuffix).isPresent,
assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${tpDir.toPath}")
assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}")
Expand All @@ -287,7 +286,7 @@ class RemoteIndexCacheTest {
// cache is empty at beginning
assertTrue(cache.internalCache.asMap().isEmpty)
// verify that cleaner thread is running
TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
// create a new entry
val spyEntry = generateSpyCacheEntry()
// an exception should not close the cleaner thread
Expand All @@ -297,19 +296,19 @@ class RemoteIndexCacheTest {
// trigger cleanup
cache.internalCache.invalidate(key)
// wait for cleanup to start
TestUtils.waitUntilTrue(() => spyEntry.cleanStarted, "Failed while waiting for clean up to start")
TestUtils.waitUntilTrue(() => spyEntry.isCleanStarted, "Failed while waiting for clean up to start")
// Give the thread cleaner thread some time to throw an exception
Thread.sleep(100)
// Verify that Cleaner thread is still running even when exception is thrown in doWork()
var threads = TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
var threads = TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
assertEquals(1, threads.size,
s"Found unexpected ${threads.size} threads=${threads.map(t => t.getName).mkString(", ")}")

// close the cache properly
cache.close()

// verify that the thread is closed properly
threads = TestUtils.numThreadsRunning(RemoteLogIndexCacheCleanerThread, isDaemon = true)
threads = TestUtils.numThreadsRunning(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, isDaemon = true)
assertTrue(threads.isEmpty, s"Found unexpected ${threads.size} threads=${threads.map(t => t.getName).mkString(", ")}")
// if the thread is correctly being shutdown it will not be running
assertFalse(cache.cleanerThread.isRunning, "Unexpected thread state=running. Check error logs.")
Expand Down

0 comments on commit 2d8271e

Please sign in to comment.