diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index e82c35ffb978..1424d1297ff4 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} +import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ @@ -32,14 +32,15 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.any +import org.mockito.invocation.InvocationOnMock import org.mockito.Mockito._ import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, FileInputStream, IOException, PrintWriter} -import java.nio.file.{Files, Paths} +import java.io.{File, FileInputStream, IOException, PrintWriter, UncheckedIOException} +import java.nio.file.{Files, NoSuchFileException, Paths} import java.util -import java.util.Collections -import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.{Collections, Optional} +import java.util.concurrent.{CountDownLatch, Executors, Future, TimeUnit} import scala.collection.mutable class RemoteIndexCacheTest { @@ -73,9 +74,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) @@ -152,8 +153,8 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) @@ -262,7 +263,7 @@ class RemoteIndexCacheTest { } @Test - def testCacheEntryIsDeletedOnInvalidation(): Unit = { + def testCacheEntryIsDeletedOnRemoval(): Unit = { def getIndexFileFromDisk(suffix: String) = { Files.walk(tpDir.toPath) .filter(Files.isRegularFile(_)) @@ -284,8 +285,8 @@ class RemoteIndexCacheTest { // no expired entries yet assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be zero at start of test") - // invalidate the cache. it should async mark the entry for removal - cache.internalCache.invalidate(internalIndexKey) + // call remove function to mark the entry for removal + cache.remove(internalIndexKey) // wait until entry is marked for deletion TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, @@ -304,13 +305,13 @@ class RemoteIndexCacheTest { verify(cacheEntry.txnIndex).renameTo(any(classOf[File])) // verify no index files on disk - assertFalse(getIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}") } @@ -558,6 +559,84 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { + // Create a spy Cache Entry + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + + val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) + cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + + assertCacheSize(1) + + var entry: RemoteIndexCache.Entry = null + + val latchForCacheRead = new CountDownLatch(1) + val latchForCacheRemove = new CountDownLatch(1) + val latchForTestWait = new CountDownLatch(1) + + var markForCleanupCallCount = 0 + + doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { + // Signal the CacheRead to unblock itself + latchForCacheRead.countDown() + // Wait for signal to start renaming the files + latchForCacheRemove.await() + // Calling the markForCleanup() actual method to start renaming the files + invocation.callRealMethod() + // Signal TestWait to unblock itself so that test can be completed + latchForTestWait.countDown() + } + }).when(spyEntry).markForCleanup() + + val removeCache = (() => { + cache.remove(rlsMetadata.remoteLogSegmentId().id()) + }): Runnable + + val readCache = (() => { + // Wait for signal to start CacheRead + latchForCacheRead.await() + entry = cache.getIndexEntry(rlsMetadata) + // Signal the CacheRemove to start renaming the files + latchForCacheRemove.countDown() + }): Runnable + + val executor = Executors.newFixedThreadPool(2) + try { + val removeCacheFuture: Future[_] = executor.submit(removeCache: Runnable) + val readCacheFuture: Future[_] = executor.submit(readCache: Runnable) + + // Verify both tasks are completed without any exception + removeCacheFuture.get() + readCacheFuture.get() + + // Wait for signal to complete the test + latchForTestWait.await() + + // We can't determine read thread or remove thread will go first so if, + // 1. Read thread go first, cache file should not exist and cache size should be zero. + // 2. Remove thread go first, cache file should present and cache size should be one. + // so basically here we are making sure that if cache existed, the cache file should exist, + // and if cache is non-existed, the cache file should not exist. + if (getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent) { + assertCacheSize(1) + } else { + assertCacheSize(0) + } + } finally { + executor.shutdownNow() + } + + } + @Test def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { reset(rsm) @@ -565,9 +644,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) // Create corrupted index file createCorruptTimeIndexOffsetFile(tpDir) @@ -603,9 +682,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) @@ -629,13 +708,6 @@ class RemoteIndexCacheTest { val remoteIndexCacheDir = cache.cacheDir() val tempSuffix = ".tmptest" - def getRemoteCacheIndexFileFromDisk(suffix: String) = { - Files.walk(remoteIndexCacheDir.toPath) - .filter(Files.isRegularFile(_)) - .filter(path => path.getFileName.toString.endsWith(suffix)) - .findAny() - } - def renameRemoteCacheIndexFileFromDisk(suffix: String) = { Files.walk(remoteIndexCacheDir.toPath) .filter(Files.isRegularFile(_)) @@ -650,7 +722,7 @@ class RemoteIndexCacheTest { Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix))) Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix))) - cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + cache.remove(rlsMetadata.remoteLogSegmentId().id()) // wait until entry is marked for deletion TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, @@ -666,9 +738,9 @@ class RemoteIndexCacheTest { // Index Files already exist ,rsm should not fetch them again. verifyFetchIndexInvocation(count = 1) // verify index files on disk - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") - assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") } @ParameterizedTest @@ -678,9 +750,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) // Create corrupt index file return from RSM createCorruptedIndexFile(testIndexType, tpDir) @@ -725,7 +797,7 @@ class RemoteIndexCacheTest { // verify deleted file exists on disk assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") - cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + cache.remove(rlsMetadata.remoteLogSegmentId().id()) // wait until entry is marked for deletion TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, @@ -748,9 +820,9 @@ class RemoteIndexCacheTest { = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = { val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) - val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata)) - val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata)) - val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata)) + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir)) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir)) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, tpDir)) spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) } @@ -778,8 +850,8 @@ class RemoteIndexCacheTest { } } - private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex = { - val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata) + private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TransactionIndex = { + val txnIdxFile = remoteTransactionIndexFile(dir, metadata) txnIdxFile.createNewFile() new TransactionIndex(metadata.startOffset(), txnIdxFile) } @@ -800,14 +872,14 @@ class RemoteIndexCacheTest { return new TransactionIndex(100L, txnIdxFile) } - private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = { + private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TimeIndex = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] - new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12) + new TimeIndex(remoteTimeIndexFile(dir, metadata), metadata.startOffset(), maxEntries * 12) } - private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata) = { + private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File) = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] - new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 8) + new OffsetIndex(remoteOffsetIndexFile(dir, metadata), metadata.startOffset(), maxEntries * 8) } private def generateRemoteLogSegmentMetadata(size: Int, @@ -860,4 +932,15 @@ class RemoteIndexCacheTest { createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata) } } + + private def getIndexFileFromRemoteCacheDir(cache: RemoteIndexCache, suffix: String) = { + try { + Files.walk(cache.cacheDir().toPath()) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } catch { + case e @ (_ : NoSuchFileException | _ : UncheckedIOException) => Optional.empty() + } + } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index 5cdaf77c2947..51b6ee3e5bc1 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -132,19 +132,16 @@ public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager, internalCache = Caffeine.newBuilder() .maximumSize(maxSize) - // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or - // evicted (means removal due to the policy) - .removalListener((Uuid key, Entry entry, RemovalCause cause) -> { + // This listener is invoked each time an entry is being automatically removed due to eviction. The cache will invoke this listener + // during the atomic operation to remove the entry (refer: https://github.com/ben-manes/caffeine/wiki/Removal), + // hence, care must be taken to ensure that this operation is not expensive. Note that this listener is not invoked when + // RemovalCause from cache is EXPLICIT or REPLACED (e.g. on Cache.invalidate(), Cache.put() etc.) For a complete list see: + // https://github.com/ben-manes/caffeine/blob/0cef55168986e3816314e7fdba64cb0b996dd3cc/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java#L23 + // Hence, any operation required after removal from cache must be performed manually for these scenarios. + .evictionListener((Uuid key, Entry entry, RemovalCause cause) -> { // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread. if (entry != null) { - try { - entry.markForCleanup(); - } catch (IOException e) { - throw new KafkaException(e); - } - if (!expiredIndexes.offer(entry)) { - log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key); - } + enqueueEntryForCleanup(entry, key); } else { log.error("Received entry as null for key {} when the it is removed from the cache.", key); } @@ -174,7 +171,11 @@ public File cacheDir() { public void remove(Uuid key) { lock.readLock().lock(); try { - internalCache.invalidate(key); + internalCache.asMap().computeIfPresent(key, (k, v) -> { + enqueueEntryForCleanup(v, k); + // Returning null to remove the key from the cache + return null; + }); } finally { lock.readLock().unlock(); } @@ -183,12 +184,27 @@ public void remove(Uuid key) { public void removeAll(Collection keys) { lock.readLock().lock(); try { - internalCache.invalidateAll(keys); + keys.forEach(key -> internalCache.asMap().computeIfPresent(key, (k, v) -> { + enqueueEntryForCleanup(v, k); + // Returning null to remove the key from the cache + return null; + })); } finally { lock.readLock().unlock(); } } + private void enqueueEntryForCleanup(Entry entry, Uuid key) { + try { + entry.markForCleanup(); + if (!expiredIndexes.offer(entry)) { + log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + // Visible for testing public ShutdownableThread cleanerThread() { return cleanerThread;