Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15481: Fix concurrency bug in RemoteIndexCache #14483

Merged
merged 16 commits into from Oct 23, 2023

Conversation

jeel2420
Copy link
Contributor

@jeel2420 jeel2420 commented Oct 4, 2023

This PR fixes the concurrency bug in RemoteIndexCache.

(From Jira description)
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.

Below events in order of timeline -

  • Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
  • Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
  • Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
  • Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.

Fix: Used EvictionListener instead of RemovalListener to perform the eviction synchronously in Caffeine cache and for the manual removal used computeIfAbsent to rename and delete the key from the cache synchronously by returning null.

Added testConcurrentRemoveReadForCache to reproduce the bug by following the above timeline of events. Unit test case is passing now.

Jira
https://issues.apache.org/jira/browse/KAFKA-15481

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@divijvaidya divijvaidya added the tiered-storage Pull requests associated with KIP-405 (Tiered Storage) label Oct 4, 2023
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 , thanks for the PR. Left some comments.

Comment on lines 598 to 600
assertCacheSize(1)
val entry = cache.getIndexEntry(rlsMetadata)
assertTrue(Files.exists(entry.offsetIndex().file().toPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please help me understand these lines. My understanding is:
In L598, we want to verify the cache.getIndexEntry will be run after cache.remove completion. So, that's where the cacheSize 1 comes frome.
If so, then, In L599, why should we getIndexEntry again before asserting L600?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we want to prove that the entry is present in the cache so when we call getIndexEntry we should get the entry and index files should be present in the cacheDir.

This reproduce the actual issue by replicating the scenario where we can have inconsistency where key is present in the cache but reference files are already renamed by that time.

@clolov clolov self-assigned this Oct 9, 2023
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Left some comments.

Comment on lines 140 to 143
// When resizing the cache, we always start with an empty cache. There are two main reasons:
// 1. Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old
// When resizing the cache, we always start with an empty cache.
// Resizing the cache is not a high-frequency operation, and there is no need to fill the data in the old
// cache to the new cache in time when resizing inside.
// 2. Since the eviction of the caffeine cache is cleared asynchronously, it is possible that after the entry
// in the old cache is filled in the new cache, the old cache will clear the entry, and the data in the two caches
// will be inconsistent.
internalCache.invalidateAll();
removeAll(internalCache.asMap().keySet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can keep this change, there will be update at this: #14511 . cc @hudeqi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted the change

Comment on lines 160 to 158
// removeListener is invoked when either the entry is invalidated (means manual removal by the caller) or
// evictionListener is invoked when either the entry is invalidated (means manual removal by the caller) or
// evicted (means removal due to the policy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evictionListener will be invoked when evicted, as the doc said:

When the operation must be performed synchronously with eviction, use Caffeine.evictionListener(RemovalListener) instead. This listener will only be notified when RemovalCause.wasEvicted() is true. For an explicit removal, Cache.asMap() offers compute methods that are performed atomically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 640 to 646
executor.submit(removeCache: Runnable)
executor.submit(readCache: Runnable)

// Wait for signal to complete the test
latchForTestWait.await()
val entry = cache.getIndexEntry(rlsMetadata)
assertTrue(Files.exists(entry.offsetIndex().file().toPath))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify something between L644 and L645, that is, after 2 threads completed, what state we're expecting?

My understanding is,spyEntry.markForCleanup will get invoked before cache.getIndexEntry for the same entry, that means, entry update for getIndexEntry needs to wait until entry remove completed, since they are both atomic operation. In this case, after the L644, the new entry should existed in cache and storage. So, we don't actually need L645 to getIndexEntry. Is my understanding correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right.
I have removed the cache.getIndexEntry call and just checking that new entry should exist in cache and storage.

@jeel2420 jeel2420 requested a review from showuon October 13, 2023 14:33
Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the fix.

@showuon
Copy link
Contributor

showuon commented Oct 14, 2023

@jeel2420 , there's a test failed due to this change, please take a look.
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14483/10/

internalCache.asMap().computeIfPresent(key, (k, v) -> {
try {
v.markForCleanup();
expiredIndexes.put(v);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon @jeel2420
Should not we use expiredIndxes.offer instead of expiredIndexes.put()
as put will block the operation if queue size is full ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use offer. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do something like we did in the other place:

if (!expiredIndexes.offer(entry)) {
                            log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
                        }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I was just thinking around it , if this fails , it will create a dump of files with delete suffix entry which never gets deleted from the disk ? Is the behaviour ok ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon @kamalcph @jeel2420 I think we are entering into a deadlock state here
During remove we try to take a readLock but in markForCleanUp we try to take a write lock , Will it not result a deadlock state ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking around it , if this fails , it will create a dump of files with delete suffix entry which never gets deleted from the disk ? Is the behaviour ok ?

It's OK, it will be deleted in next startup. Of course we can have a way to retry enqueue them, but I think it's a pretty rare case, we can keep it as is.

During remove we try to take a readLock but in markForCleanUp we try to take a write lock , Will it not result a deadlock state ?

The readlock in remove method is RemoteIndexCache#lock. The writeLock in markForCleanUp method is Entry#lock. I agree it's confusing to have the same name within the same parent class. Could you file another PR to update the lock variable name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah sure @showuon Let me do it. Should I create a separate ticket or another PR attached with the same ticket ?

Copy link
Contributor

@showuon showuon Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another PR with MINOR prefixed like this: #14559

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using expiredIndexes.offer now

// Wait for signal to start renaming the files
latchForCacheRemove.await()
// Calling the markForCleanup() actual method to start renaming the files
invocation.callRealMethod()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we are calling actual rename of the file in callRealMethod ?
Correct me if my understanding is wrong here ?
Two threads are defined

  1. removalCache - executing remove function of the cache.
  2. ReadCache - for reading data from the cache when spy.markforEntryCleanUp is executed.
    =====

Operations

  1. removeCache triggered
  2. spyEntry.markForCleanup when executed ( the files are already renamed to .delete)
  3. readCache executed and finished because no lock is pending on the remove operation
  4. It creates new file in the entry again(fetched from remote storage, rather than existed in the cache)(We should validate the number of calls to rsm here )
    5.After latchfoCacheRemove.await() ,why we are explicitlly calling markCleanup again ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. readCache executed and finished because no lock is pending on the remove operation

No, it's not right. The lock is in the concurrentMap inside cache.
But checking it again, I found the lock is when "updating the entry". That is, we have 2 threads trying to do:

  1. create a new entry for the key
  2. remove the entry for the key
    We can't make sure which one will complete first. All we can make sure is the 2 operations are atomic. So, after L646:
      // Wait for signal to complete the test
      latchForTestWait.await()
    
     // Here, we can't make sure if the cache size is 0 or 1 at this point. I think we can only do is to make sure either it's cache size 1 and file existed (create goes later), or cache size is 0 and file inexisted (remove goes later)

    // So, maybe we verify with this:
      if (Files.exists(entry.offsetIndex().file().toPath)) {
          assertCacheSize(1)
     } else {
        assertCacheSize(0)
     } 

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Yes seems correct , as we do not have much control which will get executed we can assert Cache size.

As there may be a case where remove and read operation has a bug , and nothing happens , but this test case would still pass as file still exists. Adding one more validation when file exists would ensure the correctness of the test case.

if (Files.exists(entry.offsetIndex().file().toPath)) {
          assertCacheSize(1)
     //  validate the timestamp of the file created before running the `remove`  and  `read` operation concurrently with the file recreated after the operation and it should be lesser than the later.  

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @showuon @iit2009060 for your valuable comments. I will update the PR with the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated test case.
@iit2009060 I test the test case without the fix and test case is failing as expected so I haven't added timestamp. Can you please tell me in which particular case the test case could pass with the older code (without fix).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I found below two test cases flaky which doesn't seems to be related to this change.

  • testIndexFileAlreadyExistOnDiskButNotInCache
  • testCorrectnessForCacheAndIndexFilesWhenResizeCache

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 I am not seeing testIndexFileAlreadyExistOnDiskButNotInCache this as a failure in integration test.

@hudeqi testCorrectnessForCacheAndIndexFilesWhenResizeCache can you help here , I see we already handled NoFileException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testIndexFileAlreadyExistOnDiskButNotInCache is failing locally sometimes with the same failure mode as testCorrectnessForCacheAndIndexFilesWhenResizeCache

internalCache.asMap().computeIfPresent(key, (k, v) -> {
try {
v.markForCleanup();
expiredIndexes.put(v);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon I was just thinking around it , if this fails , it will create a dump of files with delete suffix entry which never gets deleted from the disk ? Is the behaviour ok ?

internalCache.asMap().computeIfPresent(key, (k, v) -> {
try {
v.markForCleanup();
expiredIndexes.put(v);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon @kamalcph @jeel2420 I think we are entering into a deadlock state here
During remove we try to take a readLock but in markForCleanUp we try to take a write lock , Will it not result a deadlock state ?

// Wait for signal to start renaming the files
latchForCacheRemove.await()
// Calling the markForCleanup() actual method to start renaming the files
invocation.callRealMethod()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 Why we are invoking this again invocation.callRealMethod()
It is already called in
712 line when(spyEntry).markForCleanup()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invocation.callRealMethod() is called to call the markForCleanup() after read is called and before we start asserting to make sure indexes get renamed before we assert the results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 markForCleanUp should be called only one time. We need to test the behaviour when there are concurrent read/remove happens on the cache for the same entry.
In the test we just need to assert the way @showuon suggested
// So, maybe we verify with this:
if (Files.exists(entry.offsetIndex().file().toPath)) {
assertCacheSize(1)
} else {
assertCacheSize(0)
}
Calling 'markForCleanUp' twice will always result in cacheSize 0 eventually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 Read markForCleanUp() is not getting called twice. Please see the mock, inside that during the first execution of markForCleanUp() I am calling the actual markForCleanup() function (i.e Index are getting renamed) but for subsequent calls, mock is doing nothing so we actual markForCleanup() function to rename the indexes is getting called once only and it is as expected.

I have verified this behaviour as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 During read MarkforCleanUp should not be called not even once as per the functionality.

Then why we need to call it explicitly again here invocation.callRealMethod().
I am seeing two invocation of markForCleanUp

  1. }).when(spyEntry).markForCleanup() 712 line no
  2. invocation.callRealMethod() 708 line no

Copy link
Contributor

@showuon showuon Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two times when markForCleanUp is called.

  1. remove function which we are calling in removeCache Runnable.
  2. One at invocation.callRealMethod() 708 line no

You're right, but they are "different" markForCleanUp.
For (1), the markForCleanUp is an injected method for controlling the invoking order. So there are latches wait/countdown.
For (2), it's the real markForCleanUp method to rename the cache files.

The goal is to simulate the race condition happened in KAFKA-15481.

Even i tried running your test case locally it always assert with cacheSize 0 , as it is eventually getting deleted.

Yes, I think so. But in some cases, there could be 1 if getEntry goes after. The thread management are all decided by OS, we can't assure which one will go first, right?

I think the goal of this test is to make sure the issue in KAFKA-15481 will not happen again. That's why I added this comment.

IMO we should read and remove concurrently in the separate thread and validate the cacheSize based on the order of execution.

I'm not following you here. What we're doing in this test is to read and remove concurrently in the separate thread. About validate the cacheSize based on the order of execution, since we can't assure which thread will be executed first, we can't do this, right? If we can decide the execution order, then it means they are not running concurrently, is that right?

We should not need to call explicitly for the scenario.

Maybe you can show us if it were you, what test you'll create. Some pseudo code are enough. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon
For (1), the markForCleanUp is an injected method for controlling the invoking order. So there are latches wait/countdown.
Do you mean this is a mock method and no rename would happen in this case ?
Effectively the functionality/logic of markCleanUp is called one time only ?

I was thinking something like this

`val latchForTestWait = new CountDownLatch(2)
 val removeCache = (() => {
      cache.remove(rlsMetadata.remoteLogSegmentId().id())
      latchForTestWait.countdown()
    }): Runnable

    val readCache = (() => {
      entry = cache.getIndexEntry(rlsMetadata)
      // Signal the CacheRemove to start renaming the files
      latchForTestWait.countDown()
    }): Runnable

    val executor = Executors.newFixedThreadPool(2)
    try {
      executor.submit(removeCache: Runnable)
      executor.submit(readCache: Runnable)

      // Wait for signal to complete the test
      latchForTestWait.await()

      // validate cache size based on the file existence`
       if offset file exists validate this 
     // validate rsm call should happen  if( execution order is remove,read)
     if cache size == 0
     // validate no rsm call should happen if ( execution order is read,remove)
     

In the test case mentioned in the jira KAFKA-15481
the execution order is remove,read and the overall result is cache size 0 which is wrong because of timegap between removal and renaming the files. Here we are validating the same with rsm call count. If they are atomic rsm execution should happen and files should be restored.

Copy link
Contributor

@showuon showuon Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this is a mock method and no rename would happen in this case ?
Effectively the functionality/logic of markCleanUp is called one time only ?

Correct.

// validate cache size based on the file existence`
if offset file exists validate this
// validate rsm call should happen if( execution order is remove,read)
if cache size == 0
// validate no rsm call should happen if ( execution order is read,remove)

Yes, they are basically similar with what we have now. By injecting mock implementation for markForCleanUp is just to make the 2 thread execution more close. In the end, what we have now is to invoke "realMethod", which is what you did above. I'm fine if you think we should validate the rsm call count. But again, they are basically testing the same thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Yes correct , it is testing the same thing. I am also fine. But from readability perspective the one I propose is simpler to understand and does not require any future change if markForCleanUp function changed. I left @jeel2420 to make a decision here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iit2009060 As current test case is able to reproduce the case mentioned in the jira KAFKA-15481 I think we should be fine.

The only reason to have markForCleanUp mock is to have control over the 2 thread execution.

Comment on lines 728 to 729
executor.submit(removeCache: Runnable)
executor.submit(readCache: Runnable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeel2420 , sorry, I had another look and found we should also verify these 2 threads has no exception thrown. In the issue description, without this fix, there will be IOException thrown. So, we should verify there's no exception using the returned future from executor.submit. Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon Nice catch. Calling .get() on both task future objects so if there will be any error test will fail with that exception.

Copy link
Contributor

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@showuon
Copy link
Contributor

showuon commented Oct 20, 2023

@iit2009060 , do you have any comments to this PR?

@iit2009060
Copy link
Contributor

@iit2009060 , do you have any comments to this PR?
@showuon No , I am good . Thanks @jeel2420 for addressing the review comments.

@jeel2420
Copy link
Contributor Author

jeel2420 commented Oct 20, 2023

Thank you @showuon @iit2009060 for your review. Can we merge this PR now?

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a minor comment and we can merge after that.

Thank you all for diligent review on this one.

Copy link
Contributor

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you fixing this Jeel! I will wait for CI to pass before merging. Given that this fixes a bug in 3.6, I will also port this back to 3.6 branch.

@clolov clolov removed their assignment Oct 20, 2023
@jeel2420
Copy link
Contributor Author

@jeel2420 https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14483/16/testReport/junit/kafka.log.remote/RemoteIndexCacheTest/Build___JDK_8_and_Scala_2_12___testClearCacheAndIndexFilesWhenResizeCache__/ test is failing in CI. Please check the reason.

I found that it’s a flaky test case. It fails sometimes when I tested locally as well. I will take look into it.

Copy link
Collaborator

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for fixing this issue!

@divijvaidya
Copy link
Contributor

divijvaidya commented Oct 23, 2023

Adding here the reason @jeel2420 mentioned the cause of flaky test (which is unrelated to this PR)

In getIndexFileFromRemoteCacheDir function we basically run below code to get the files from the disk and checking if file with specific suffix is present or not.
      Files.walk(cache.cacheDir().toPath())
        .filter(Files.isRegularFile(_))
        .filter(path => path.getFileName.toString.endsWith(suffix))
        .findAny()
Here in cacheDir after the remove call we can have files with .deleted suffix and those files can be removed from cacheDir at any time. So if .deleted suffix file got deleted in between when the above code is running which basically traverse the directory structure we can get UncheckedIOException which is not handled in the function.

@divijvaidya divijvaidya merged commit 4612fe4 into apache:trunk Oct 23, 2023
1 check failed
@divijvaidya
Copy link
Contributor

Created a JIRA for the flaky test https://issues.apache.org/jira/browse/KAFKA-15671

@hudeqi
Copy link
Collaborator

hudeqi commented Oct 24, 2023

@jeel2420 https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14483/16/testReport/junit/kafka.log.remote/RemoteIndexCacheTest/Build___JDK_8_and_Scala_2_12___testClearCacheAndIndexFilesWhenResizeCache__/ test is failing in CI. Please check the reason.

I found that it’s a flaky test case. It fails sometimes when I tested locally as well. I will take look into it.

I have fixed it in trunk before, maybe this pr need to merge latest trunk and it will pass. @jeel2420

@divijvaidya
Copy link
Contributor

@hudeqi

@jeel2420 https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14483/16/testReport/junit/kafka.log.remote/RemoteIndexCacheTest/Build___JDK_8_and_Scala_2_12___testClearCacheAndIndexFilesWhenResizeCache__/ test is failing in CI. Please check the reason.

I found that it’s a flaky test case. It fails sometimes when I tested locally as well. I will take look into it.

I have fixed it in trunk before, maybe this pr need to merge latest trunk and it will pass. @jeel2420

Hey @hudeqi
This is failing in trunk as well and not just this PR. I have added details in https://issues.apache.org/jira/browse/KAFKA-15671 From a quick look seems like we are catching NoSuchFileFound whereas it's being thrown as a root cause and not a top level exception. Since you are familiar, please help guide Brian (who has volunteered to fix this ticket) towards a fix. It would be great if you would review their code once they have a PR open.

private void enqueueEntryForCleanup(Entry entry, Uuid key) {
try {
entry.markForCleanup();
if (!expiredIndexes.offer(entry)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the max capacity set for the queue? Also isn't there a problem is an entry is removed from the cache, marked for clean-up but never added to the expiration queue as the corresponding index files would be leaked on the file system?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It will be leaked with a ".delete" suffix. It should be cleared at next process restart at

// Delete any .deleted or .tmp files remained from the earlier run of the broker.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure reliance on start-up clean-up is satisfying enough because there is no guarantee the file system hasn't be saturated when the broker restarts and that any clean-up operation will succeed. Also, it does not address the risk (albeit minor) of the unbounded queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Would you like to create an improvement JIRA to discuss the alternative solutions to this?

if (!expiredIndexes.offer(entry)) {
log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
}
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the I/O exception is handled correctly here i.e. follows the convention for I/O error handling in Kafka. Maybe the log dir failure channel should be triggered?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(please correct me if I am wrong) That is not the behaviour for TS in Kafka since we don't want to mark local directory as offline if there is something wrong with functioning of TS. As an example, on IOException on copyLogSegment, we will eat up the exception at

Copy link
Contributor

@Hangleton Hangleton Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache stores its files in the first log directory defined in server properties. Because I/O exceptions captured here originate from a log directory, why shouldn't the pattern which applies to log directory failures in Kafka not be valid here?

Regarding the I/O exceptions surfaced from copyLogSegment - are we talking about those which occurs from within a plugin? If that is the case, then they are of a different nature since not applying to log directories, and by design the plugin implementation needs to handle those while the remote log manager provides a retry mechanism via the periodic scheduling of the RLM tasks. The idea is that unlike file system I/O errors on a log directory which are directly putting local data integrity at risk, transient I/O errors are common for clients transferring data to or from external services e.g. a public cloud storage for which availability recovers on its own and given that durability is not compromised as long as consistency of metadata updates is guaranteed. So the semantic of the I/O exception, the corresponding failure modes, and their implications, are not the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. Please create a JIRA for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @Hangleton ! But I didn't see the JIRA created (maybe I missed). Could you let me know the JIRA ticket number? Or should I create one for you?

divijvaidya pushed a commit that referenced this pull request Nov 16, 2023
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.

The bug could be reproduced as per the following order of events:-

Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.

This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Nov 22, 2023
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.

The bug could be reproduced as per the following order of events:-

Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.

This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
anurag-harness pushed a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.

The bug could be reproduced as per the following order of events:-

Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.

This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal 
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
anurag-harness added a commit to anurag-harness/kafka that referenced this pull request Feb 9, 2024
…156)

RemoteIndexCache has a concurrency bug which leads to IOException while fetching data from remote tier.

The bug could be reproduced as per the following order of events:-

Thread 1 (cache thread): invalidates the entry, removalListener is invoked async, so the files have not been renamed to "deleted" suffix yet.
Thread 2: (fetch thread): tries to find entry in cache, doesn't find it because it has been removed by 1, fetches the entry from S3, writes it to existing file (using replace existing)
Thread 1: async removalListener is invoked, acquires a lock on old entry (which has been removed from cache), it renames the file to "deleted" and starts deleting it
Thread 2: Tries to create in-memory/mmapped index, but doesn't find the file and hence, creates a new file of size 2GB in AbstractIndex constructor. JVM returns an error as it won't allow creation of 2GB random access file.

This commit fixes the bug by using EvictionListener instead of RemovalListener to perform the eviction atomically with the file rename. It handles the manual removal (not handled by EvictionListener) by using computeIfAbsent() and enforcing atomic cache removal & file rename.

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Arpit Goyal 
<goyal.arpit.91@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>

Co-authored-by: Jotaniya Jeel <31836443+jeel2420@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tiered-storage Pull requests associated with KIP-405 (Tiered Storage)
Projects
None yet
8 participants