Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15481: Fix concurrency bug in RemoteIndexCache #14483

Merged
merged 16 commits into from Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 88 additions & 26 deletions core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
Expand Up @@ -22,13 +22,14 @@ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{DIR_NAME, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.invocation.InvocationOnMock
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -132,7 +133,7 @@ class RemoteIndexCacheTest {
// this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
assertEquals(offsetPosition1.position, resultPosition)
verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET))
showuon marked this conversation as resolved.
Show resolved Hide resolved

// this should not cause fetching index from RemoteStorageManager as it is already fetched earlier
reset(rsm)
Expand Down Expand Up @@ -196,21 +197,21 @@ class RemoteIndexCacheTest {
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
verifyFetchIndexInvocation(count = 1)
verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))

// Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2
cache.getIndexEntry(metadataList.head)
cache.getIndexEntry(metadataList(1))
assertCacheSize(2)
verifyFetchIndexInvocation(count = 2)
verifyFetchIndexInvocation(count = 2, indexTypes = Seq(IndexType.OFFSET))

// Getting index for metadataList.last should call rsm#fetchIndex
// to populate this entry one of the other 2 entries will be evicted. We don't know which one since it's based on
// a probabilistic formula for Window TinyLfu. See docs for RemoteIndexCache
assertNotNull(cache.getIndexEntry(metadataList.last))
assertAtLeastOnePresent(cache, metadataList(1).remoteLogSegmentId().id(), metadataList.head.remoteLogSegmentId().id())
assertCacheSize(2)
verifyFetchIndexInvocation(count = 3)
verifyFetchIndexInvocation(count = 3, indexTypes = Seq(IndexType.OFFSET))

// getting index for last expired entry should call rsm#fetchIndex as that entry was expired earlier
val missingEntryOpt = {
Expand All @@ -222,7 +223,7 @@ class RemoteIndexCacheTest {
assertFalse(missingEntryOpt.isEmpty)
cache.getIndexEntry(missingEntryOpt.get)
assertCacheSize(2)
verifyFetchIndexInvocation(count = 4)
verifyFetchIndexInvocation(count = 4, indexTypes = Seq(IndexType.OFFSET))
}

@Test
Expand All @@ -237,7 +238,7 @@ class RemoteIndexCacheTest {
assertCacheSize(0)
cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
verifyFetchIndexInvocation(count = 1)
verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))

cache.close()

Expand All @@ -259,7 +260,7 @@ class RemoteIndexCacheTest {
}

@Test
def testCacheEntryIsDeletedOnInvalidation(): Unit = {
def testCacheEntryIsDeletedOnRemoval(): Unit = {
def getIndexFileFromDisk(suffix: String) = {
Files.walk(tpDir.toPath)
.filter(Files.isRegularFile(_))
Expand All @@ -281,14 +282,8 @@ class RemoteIndexCacheTest {
// no expired entries yet
assertEquals(0, cache.expiredIndexes.size, "expiredIndex queue should be zero at start of test")

// invalidate the cache. it should async mark the entry for removal
cache.internalCache.invalidate(internalIndexKey)

// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => cacheEntry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")
// call remove function to mark the entry for removal
cache.remove(internalIndexKey)

// first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called
verify(cacheEntry, times(2)).markForCleanup()
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -378,7 +373,7 @@ class RemoteIndexCacheTest {
// getIndex for first time will call rsm#fetchIndex
cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET))
reset(rsm)

// Simulate a concurrency situation where one thread is reading the entry already present in the cache (cache hit)
Expand Down Expand Up @@ -443,15 +438,15 @@ class RemoteIndexCacheTest {
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList.head)
assertCacheSize(1)
verifyFetchIndexInvocation(count = 1)
verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))

// Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2
cache.getIndexEntry(metadataList(1))
assertCacheSize(2)
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList(1))
assertCacheSize(2)
verifyFetchIndexInvocation(count = 2)
verifyFetchIndexInvocation(count = 2, indexTypes = Seq(IndexType.OFFSET))

// Here a new key metadataList(2) is invoked, that should call rsm#fetchIndex
// The cache max size is 2, it will remove one entry and keep the overall size to 2
Expand All @@ -460,7 +455,7 @@ class RemoteIndexCacheTest {
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList(2))
assertCacheSize(2)
verifyFetchIndexInvocation(count = 3)
verifyFetchIndexInvocation(count = 3, indexTypes = Seq(IndexType.OFFSET))

// Close the cache
cache.close()
Expand Down Expand Up @@ -538,7 +533,75 @@ class RemoteIndexCacheTest {
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
// file is corrupted it should fetch from remote storage again
verifyFetchIndexInvocation(count = 1)
verifyFetchIndexInvocation(count = 1, indexTypes = Seq(IndexType.OFFSET))
}

@Test
def testConcurrentRemoveReadForCache(): Unit = {
// Create a spy Cache Entry
val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))

val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata))
val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata))
val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata))

val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex))
cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry)

assertCacheSize(1)

val latchForCacheRead = new CountDownLatch(1)
val latchForCacheRemove = new CountDownLatch(1)
val latchForTestWait = new CountDownLatch(1)

var markForCleanupCallCount = 0

doAnswer((invocation: InvocationOnMock) => {
markForCleanupCallCount += 1

if (markForCleanupCallCount == 1) {
// Signal the CacheRead to unblock itself
latchForCacheRead.countDown()
// Wait for signal to start renaming the files
latchForCacheRemove.await()
// Calling the markForCleanup() actual method to start renaming the files
invocation.callRealMethod()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 Why we are invoking this again invocation.callRealMethod()
It is already called in
712 line when(spyEntry).markForCleanup()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invocation.callRealMethod() is called to call the markForCleanup() after read is called and before we start asserting to make sure indexes get renamed before we assert the results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 markForCleanUp should be called only one time. We need to test the behaviour when there are concurrent read/remove happens on the cache for the same entry.
In the test we just need to assert the way @showuon suggested
// So, maybe we verify with this:
if (Files.exists(entry.offsetIndex().file().toPath)) {
assertCacheSize(1)
} else {
assertCacheSize(0)
}
Calling 'markForCleanUp' twice will always result in cacheSize 0 eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 Read markForCleanUp() is not getting called twice. Please see the mock, inside that during the first execution of markForCleanUp() I am calling the actual markForCleanup() function (i.e Index are getting renamed) but for subsequent calls, mock is doing nothing so we actual markForCleanup() function to rename the indexes is getting called once only and it is as expected.

I have verified this behaviour as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 During read MarkforCleanUp should not be called not even once as per the functionality.

Then why we need to call it explicitly again here invocation.callRealMethod().
I am seeing two invocation of markForCleanUp

  1. }).when(spyEntry).markForCleanup() 712 line no
  2. invocation.callRealMethod() 708 line no

Copy link
Contributor

@showuon showuon Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two times when markForCleanUp is called.

  1. remove function which we are calling in removeCache Runnable.
  2. One at invocation.callRealMethod() 708 line no

You're right, but they are "different" markForCleanUp.
For (1), the markForCleanUp is an injected method for controlling the invoking order. So there are latches wait/countdown.
For (2), it's the real markForCleanUp method to rename the cache files.

The goal is to simulate the race condition happened in KAFKA-15481.

Even i tried running your test case locally it always assert with cacheSize 0 , as it is eventually getting deleted.

Yes, I think so. But in some cases, there could be 1 if getEntry goes after. The thread management are all decided by OS, we can't assure which one will go first, right?

I think the goal of this test is to make sure the issue in KAFKA-15481 will not happen again. That's why I added this comment.

IMO we should read and remove concurrently in the separate thread and validate the cacheSize based on the order of execution.

I'm not following you here. What we're doing in this test is to read and remove concurrently in the separate thread. About validate the cacheSize based on the order of execution, since we can't assure which thread will be executed first, we can't do this, right? If we can decide the execution order, then it means they are not running concurrently, is that right?

We should not need to call explicitly for the scenario.

Maybe you can show us if it were you, what test you'll create. Some pseudo code are enough. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon
For (1), the markForCleanUp is an injected method for controlling the invoking order. So there are latches wait/countdown.
Do you mean this is a mock method and no rename would happen in this case ?
Effectively the functionality/logic of markCleanUp is called one time only ?

I was thinking something like this

`val latchForTestWait = new CountDownLatch(2)
 val removeCache = (() => {
      cache.remove(rlsMetadata.remoteLogSegmentId().id())
      latchForTestWait.countdown()
    }): Runnable

    val readCache = (() => {
      entry = cache.getIndexEntry(rlsMetadata)
      // Signal the CacheRemove to start renaming the files
      latchForTestWait.countDown()
    }): Runnable

    val executor = Executors.newFixedThreadPool(2)
    try {
      executor.submit(removeCache: Runnable)
      executor.submit(readCache: Runnable)

      // Wait for signal to complete the test
      latchForTestWait.await()

      // validate cache size based on the file existence`
       if offset file exists validate this 
     // validate rsm call should happen  if( execution order is remove,read)
     if cache size == 0
     // validate no rsm call should happen if ( execution order is read,remove)
     

In the test case mentioned in the jira KAFKA-15481
the execution order is remove,read and the overall result is cache size 0 which is wrong because of timegap between removal and renaming the files. Here we are validating the same with rsm call count. If they are atomic rsm execution should happen and files should be restored.

Copy link
Contributor

@showuon showuon Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this is a mock method and no rename would happen in this case ?
Effectively the functionality/logic of markCleanUp is called one time only ?

Correct.

// validate cache size based on the file existence`
if offset file exists validate this
// validate rsm call should happen if( execution order is remove,read)
if cache size == 0
// validate no rsm call should happen if ( execution order is read,remove)

Yes, they are basically similar with what we have now. By injecting mock implementation for markForCleanUp is just to make the 2 thread execution more close. In the end, what we have now is to invoke "realMethod", which is what you did above. I'm fine if you think we should validate the rsm call count. But again, they are basically testing the same thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Yes correct , it is testing the same thing. I am also fine. But from readability perspective the one I propose is simpler to understand and does not require any future change if markForCleanUp function changed. I left @jeel2420 to make a decision here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 As current test case is able to reproduce the case mentioned in the jira KAFKA-15481 I think we should be fine.

The only reason to have markForCleanUp mock is to have control over the 2 thread execution.

// Signal TestWait to unblock itself so that test can be completed
latchForTestWait.countDown()
} else {
// Subsequent call for markForCleanup method
latchForCacheRead.countDown()
latchForCacheRemove.countDown()
}
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
}).when(spyEntry).markForCleanup()

val removeCache = (() => {
cache.remove(rlsMetadata.remoteLogSegmentId().id())
}): Runnable

val readCache = (() => {
// Wait for signal to start CacheRead
latchForCacheRead.await()
cache.getIndexEntry(rlsMetadata)
// Signal the CacheRemove to start renaming the files
latchForCacheRemove.countDown()
}): Runnable

val executor = Executors.newFixedThreadPool(2)
try {
executor.submit(removeCache: Runnable)
executor.submit(readCache: Runnable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 , sorry, I had another look and found we should also verify these 2 threads has no exception thrown. In the issue description, without this fix, there will be IOException thrown. So, we should verify there's no exception using the returned future from executor.submit. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Nice catch. Calling .get() on both task future objects so if there will be any error test will fail with that exception.


// Wait for signal to complete the test
latchForTestWait.await()
assertCacheSize(1)
val entry = cache.getIndexEntry(rlsMetadata)
assertTrue(Files.exists(entry.offsetIndex().file().toPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please help me understand these lines. My understanding is:
In L598, we want to verify the cache.getIndexEntry will be run after cache.remove completion. So, that's where the cacheSize 1 comes frome.
If so, then, In L599, why should we getIndexEntry again before asserting L600?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we want to prove that the entry is present in the cache so when we call getIndexEntry we should get the entry and index files should be present in the cacheDir.

This reproduce the actual issue by replicating the scenario where we can have inconsistency where key is present in the cache but reference files are already renamed by that time.

} finally {
executor.shutdownNow()
}

}

private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
Expand Down Expand Up @@ -568,27 +631,26 @@ class RemoteIndexCacheTest {
}

private def verifyFetchIndexInvocation(count: Int,
indexTypes: Seq[IndexType] =
Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
indexTypes: Seq[IndexType]): Unit = {
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
for (indexType <- indexTypes) {
verify(rsm, times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType))
}
}

private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TransactionIndex = {
val txnIdxFile = remoteTransactionIndexFile(tpDir, metadata)
val txnIdxFile = remoteTransactionIndexFile(new File(tpDir, DIR_NAME), metadata)
txnIdxFile.createNewFile()
new TransactionIndex(metadata.startOffset(), txnIdxFile)
}

private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12)
new TimeIndex(remoteTimeIndexFile(new File(tpDir, DIR_NAME), metadata), metadata.startOffset(), maxEntries * 12)
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
}

private def createOffsetIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata) = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 8)
new OffsetIndex(remoteOffsetIndexFile(new File(tpDir, DIR_NAME), metadata), metadata.startOffset(), maxEntries * 8)
}

private def generateRemoteLogSegmentMetadata(size: Int,
Expand Down
Expand Up @@ -134,7 +134,7 @@ public RemoteIndexCache(int maxSize, RemoteStorageManager remoteStorageManager,
.maximumSize(maxSize)
// removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
// evicted (means removal due to the policy)
.removalListener((Uuid key, Entry entry, RemovalCause cause) -> {
.evictionListener((Uuid key, Entry entry, RemovalCause cause) -> {
// Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
if (entry != null) {
try {
Expand Down Expand Up @@ -169,7 +169,16 @@ public Cache<Uuid, Entry> internalCache() {
public void remove(Uuid key) {
lock.readLock().lock();
try {
internalCache.invalidate(key);
internalCache.asMap().computeIfPresent(key, (k, v) -> {
try {
v.markForCleanup();
v.cleanup();
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
} catch (IOException e) {
throw new KafkaException(e);
}
// Returning null to remove the key from the cache
return null;
});
} finally {
lock.readLock().unlock();
}
Expand All @@ -178,7 +187,16 @@ public void remove(Uuid key) {
public void removeAll(Collection<Uuid> keys) {
lock.readLock().lock();
try {
internalCache.invalidateAll(keys);
keys.forEach(key -> internalCache.asMap().computeIfPresent(key, (k, v) -> {
try {
v.markForCleanup();
jeel2420 marked this conversation as resolved.
Show resolved Hide resolved
v.cleanup();
} catch (IOException e) {
throw new KafkaException(e);
}
// Returning null to remove the key from the cache
return null;
}));
} finally {
lock.readLock().unlock();
}
Expand Down