diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 36d21d24999c..3a3b4d2c91c1 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager} import org.apache.kafka.server.util.MockTime -import org.apache.kafka.storage.internals.log.RemoteIndexCache.{Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} +import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, Entry, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteDeletedSuffixIndexFileName, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ @@ -32,6 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.any +import org.mockito.invocation.InvocationOnMock import org.mockito.Mockito._ import org.slf4j.{Logger, LoggerFactory} @@ -39,7 +40,7 @@ import java.io.{File, FileInputStream, IOException, PrintWriter} import java.nio.file.{Files, NoSuchFileException, Paths} import java.util import java.util.{Collections, Optional} -import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.{CountDownLatch, Executors, Future, TimeUnit} import scala.collection.mutable class RemoteIndexCacheTest { @@ -138,8 +139,8 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) @@ -249,7 +250,7 @@ class RemoteIndexCacheTest { } @Test - def testCacheEntryIsDeletedOnInvalidation(): Unit = { + def testCacheEntryIsDeletedOnRemoval(): Unit = { def getIndexFileFromDisk(suffix: String) = { Files.walk(tpDir.toPath) .filter(Files.isRegularFile(_)) @@ -271,8 +272,8 @@ class RemoteIndexCacheTest { // no expired entries yet assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be zero at start of test") - // invalidate the cache. it should async mark the entry for removal - cache.internalCache.invalidate(internalIndexKey) + // call remove function to mark the entry for removal + cache.remove(internalIndexKey) // wait until entry is marked for deletion TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup, @@ -672,6 +673,84 @@ class RemoteIndexCacheTest { verifyFetchIndexInvocation(count = 1) } + @Test + def testConcurrentRemoveReadForCache(): Unit = { + // Create a spy Cache Entry + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, + time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + + val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) + cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) + + assertCacheSize(1) + + var entry: RemoteIndexCache.Entry = null + + val latchForCacheRead = new CountDownLatch(1) + val latchForCacheRemove = new CountDownLatch(1) + val latchForTestWait = new CountDownLatch(1) + + var markForCleanupCallCount = 0 + + doAnswer((invocation: InvocationOnMock) => { + markForCleanupCallCount += 1 + + if (markForCleanupCallCount == 1) { + // Signal the CacheRead to unblock itself + latchForCacheRead.countDown() + // Wait for signal to start renaming the files + latchForCacheRemove.await() + // Calling the markForCleanup() actual method to start renaming the files + invocation.callRealMethod() + // Signal TestWait to unblock itself so that test can be completed + latchForTestWait.countDown() + } + }).when(spyEntry).markForCleanup() + + val removeCache = (() => { + cache.remove(rlsMetadata.remoteLogSegmentId().id()) + }): Runnable + + val readCache = (() => { + // Wait for signal to start CacheRead + latchForCacheRead.await() + entry = cache.getIndexEntry(rlsMetadata) + // Signal the CacheRemove to start renaming the files + latchForCacheRemove.countDown() + }): Runnable + + val executor = Executors.newFixedThreadPool(2) + try { + val removeCacheFuture: Future[_] = executor.submit(removeCache: Runnable) + val readCacheFuture: Future[_] = executor.submit(readCache: Runnable) + + // Verify both tasks are completed without any exception + removeCacheFuture.get() + readCacheFuture.get() + + // Wait for signal to complete the test + latchForTestWait.await() + + // We can't determine read thread or remove thread will go first so if, + // 1. Read thread go first, cache file should not exist and cache size should be zero. + // 2. Remove thread go first, cache file should present and cache size should be one. + // so basically here we are making sure that if cache existed, the cache file should exist, + // and if cache is non-existed, the cache file should not exist. + if (getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent) { + assertCacheSize(1) + } else { + assertCacheSize(0) + } + } finally { + executor.shutdownNow() + } + + } + @Test def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { reset(rsm) @@ -679,9 +758,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) // Create corrupted index file createCorruptTimeIndexOffsetFile(tpDir) @@ -717,9 +796,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) @@ -764,7 +843,7 @@ class RemoteIndexCacheTest { Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix))) Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix))) - cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + cache.remove(rlsMetadata.remoteLogSegmentId().id()) // wait until entry is marked for deletion TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, @@ -792,9 +871,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) // Create corrupt index file return from RSM createCorruptedIndexFile(testIndexType, tpDir) @@ -839,7 +918,7 @@ class RemoteIndexCacheTest { // verify deleted file exists on disk assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") - cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + cache.remove(rlsMetadata.remoteLogSegmentId().id()) // wait until entry is marked for deletion TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, @@ -862,9 +941,9 @@ class RemoteIndexCacheTest { = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = { val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) - val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata)) - val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata)) - val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata)) + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir)) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir)) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, tpDir)) spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) } @@ -892,8 +971,8 @@ class RemoteIndexCacheTest { } } - private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex = { - val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata) + private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TransactionIndex = { + val txnIdxFile = remoteTransactionIndexFile(dir, metadata) txnIdxFile.createNewFile() new TransactionIndex(metadata.startOffset(), txnIdxFile) } @@ -914,14 +993,14 @@ class RemoteIndexCacheTest { return new TransactionIndex(100L, txnIdxFile) } - private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = { + private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TimeIndex = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] - new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12) + new TimeIndex(remoteTimeIndexFile(dir, metadata), metadata.startOffset(), maxEntries * 12) } - private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata) = { + private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File) = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] - new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 8) + new OffsetIndex(remoteOffsetIndexFile(dir, metadata), metadata.startOffset(), maxEntries * 8) } private def generateRemoteLogSegmentMetadata(size: Int, @@ -969,9 +1048,9 @@ class RemoteIndexCacheTest { .thenAnswer(ans => { val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) val indexType = ans.getArgument[IndexType](1) - val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) - val timeIdx = createTimeIndexForSegmentMetadata(metadata) - val txnIdx = createTxIndexForSegmentMetadata(metadata) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata, tpDir) + val timeIdx = createTimeIndexForSegmentMetadata(metadata, tpDir) + val txnIdx = createTxIndexForSegmentMetadata(metadata, tpDir) maybeAppendIndexEntries(offsetIdx, timeIdx) indexType match { case IndexType.OFFSET => new FileInputStream(offsetIdx.file) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index 1502f61b6026..929dba356884 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -151,19 +151,16 @@ private Cache initEmptyCache(long maxSize) { .weigher((Uuid key, Entry entry) -> { return (int) entry.entrySizeBytes; }) - // removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or - // evicted (means removal due to the policy) - .removalListener((Uuid key, Entry entry, RemovalCause cause) -> { + // This listener is invoked each time an entry is being automatically removed due to eviction. The cache will invoke this listener + // during the atomic operation to remove the entry (refer: https://github.com/ben-manes/caffeine/wiki/Removal), + // hence, care must be taken to ensure that this operation is not expensive. Note that this listener is not invoked when + // RemovalCause from cache is EXPLICIT or REPLACED (e.g. on Cache.invalidate(), Cache.put() etc.) For a complete list see: + // https://github.com/ben-manes/caffeine/blob/0cef55168986e3816314e7fdba64cb0b996dd3cc/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java#L23 + // Hence, any operation required after removal from cache must be performed manually for these scenarios. + .evictionListener((Uuid key, Entry entry, RemovalCause cause) -> { // Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread. if (entry != null) { - try { - entry.markForCleanup(); - } catch (IOException e) { - throw new KafkaException(e); - } - if (!expiredIndexes.offer(entry)) { - log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key); - } + enqueueEntryForCleanup(entry, key); } else { log.error("Received entry as null for key {} when the it is removed from the cache.", key); } @@ -187,7 +184,11 @@ public File cacheDir() { public void remove(Uuid key) { lock.readLock().lock(); try { - internalCache.invalidate(key); + internalCache.asMap().computeIfPresent(key, (k, v) -> { + enqueueEntryForCleanup(v, k); + // Returning null to remove the key from the cache + return null; + }); } finally { lock.readLock().unlock(); } @@ -196,12 +197,27 @@ public void remove(Uuid key) { public void removeAll(Collection keys) { lock.readLock().lock(); try { - internalCache.invalidateAll(keys); + keys.forEach(key -> internalCache.asMap().computeIfPresent(key, (k, v) -> { + enqueueEntryForCleanup(v, k); + // Returning null to remove the key from the cache + return null; + })); } finally { lock.readLock().unlock(); } } + private void enqueueEntryForCleanup(Entry entry, Uuid key) { + try { + entry.markForCleanup(); + if (!expiredIndexes.offer(entry)) { + log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key); + } + } catch (IOException e) { + throw new KafkaException(e); + } + } + // Visible for testing public ShutdownableThread cleanerThread() { return cleanerThread;