New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15169: Added TestCase in RemoteIndexCache #14482
Conversation
def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ | ||
val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please follow style guide used in rest of the code. For example, space between = and {.
You can find more information here: https://kafka.apache.org/coding-guide.html
I remember your previous PR also had similar comments. I would recommend to set your IDE to use this style and automatically format newly added code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks , I have enabled on IDE to format it on save everytime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
def testOffsetIndexFileAlreadyExistOnDiskButNotInCache(): Unit ={ | ||
val remoteIndexCacheDir = new File(tpDir,RemoteIndexCache.DIR_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please create a getter in the RemoteIndexCache to get this dir, add javadoc for it // visible for testing
and use that here. It will make this test independent of the remote cache location in case we change that in future,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Files.walk(remoteIndexCacheDir.toPath) | ||
.filter(Files.isRegularFile(_)) | ||
.filter(path => path.getFileName.toString.endsWith(suffix)) | ||
.forEach(f => Files.move(f,f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of files.move, please use our custom utility function Utils.atomicMoveWithFallback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// rsm should not be called again | ||
// instead files exist on disk | ||
// should be used | ||
verifyFetchIndexInvocation(count = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be 0 if there is going to be no fetch invocation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@divijvaidya To pre-create the valid index file in remote storage cache dir , I initially fetched it from the remote storage and then run the scenario. The 1 count is for pre-processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. So that will confuse readers. Please do either:
- reset the mock rsm
or - run
verifyFetchIndexInvocation(count = 1)
again before L672, so that we can make sure the count is not increasing even we callcache.getIndexEntry
again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iit2009060 , have you addressed this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @showuon Thanks for identifying it , I missed it.
Changes Done.
// The size of the string written in the file is 12 bytes, | ||
// but it should be multiple of Offset Index EntrySIZE which is equal to 8. | ||
pw.close() | ||
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a bug in existing createOffsetIndexForSegmentMetadata
and similar function. They create cache at the wrong place in the line:
new OffsetIndex(remoteOffsetIndexFile(tpDir, metadata)
Instead of tpDir
, it should point to remote cache director which is a folder inside tpDir
.
Please fix that and see if it impacts your test in any way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@divijvaidya But when we call get index entry , it will create indexes file under remote cache directory only. It will read file content from tpDir and create indexes in "remoteIndexCache" dir.
IMO "createOffsetIndexForSegmentMetadata" will create indexes file and tpDir act as a placeholder for files fetched from remoteStorage. If we change it to remote cache dir , all test cases will always have the file exist behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@divijvaidya We can keep this function i.e createOffsetIndexForSegmentMetadata to be used for indexes received from remote storage rather than remoteindexcache.
The existing test case are working because they directly put values into internalCache(Caeffine) rather than the getIndexEntry route.
In the above test case I want to corrupt the Index files received from remote storage rather than from remote index cache.
getIndexEntry does following steps
- Fetch indexes from remote Storage( i.e indexes stored in tpDir directory as per the mock)
- Manually Corrupt the indexes file stored in tpDir Directory after fetch from remote storage.
- It tries to copy indexes files in tpDir directory to remote index cache directory.
- Run sanity check on indexes stored in tpDir/remoteindexcache directory.
- It throws CorruptedIndexException.
// The size of the string written in the file is 12 bytes, | ||
// but it should be multiple of Offset Index EntrySIZE which is equal to 8. | ||
pw.close() | ||
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we have 3 variants of this test where we are corruption all 3 indexes one by one. You can use @Parameterized
to run those scenarios in same test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@divijvaidya Is it possible to parametrize based on function as a parameter ? Otherwise I need to write if else condition in the same test ?
Corrupting Time and Offset Index is easy based on entry_size , any suggestion how to do it for TransactionIndex ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iit2009060 , if / else is fine with me, just make the test clear.
For txnIndex corruption, I usually check other tests if they did something similar as we want. Here's a good reference:
kafka/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
Lines 62 to 73 in fbc39bc
def testSanityCheck(): Unit = { | |
val abortedTxns = List( | |
new AbortedTxn(0L, 0, 10, 11), | |
new AbortedTxn(1L, 5, 15, 13), | |
new AbortedTxn(2L, 18, 35, 25), | |
new AbortedTxn(3L, 32, 50, 40)) | |
abortedTxns.foreach(index.append) | |
index.close() | |
// open the index with a different starting offset to fake invalid data | |
val reopenedIndex = new TransactionIndex(100L, file) | |
assertThrows(classOf[CorruptIndexException], () => reopenedIndex.sanityCheck()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@divijvaidya @showuon done
// wait until entry is marked for deletion | ||
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, | ||
"Failed to mark cache entry for cleanup after invalidation") | ||
TestUtils.waitUntilTrue(() => entry.isCleanStarted, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clean started doesn't mean that index has been deleted. There could be a case where next line executes even though file has not been deleted yet. I would suggest to wrap the next assertions in waitUntilTrue()
. It would make the test non-flaky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iit2009060 , thanks for the PR. Left some comments.
|
||
// restore index files | ||
renameRemoteCacheIndexFileFromDisk(tempSuffix) | ||
// validate cache entry for the above key should be null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: additional space before null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// The size of the string written in the file is 12 bytes, | ||
// but it should be multiple of Offset Index EntrySIZE which is equal to 8. | ||
pw.close() | ||
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iit2009060 , if / else is fine with me, just make the test clear.
For txnIndex corruption, I usually check other tests if they did something similar as we want. Here's a good reference:
kafka/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
Lines 62 to 73 in fbc39bc
def testSanityCheck(): Unit = { | |
val abortedTxns = List( | |
new AbortedTxn(0L, 0, 10, 11), | |
new AbortedTxn(1L, 5, 15, 13), | |
new AbortedTxn(2L, 18, 35, 25), | |
new AbortedTxn(3L, 32, 50, 40)) | |
abortedTxns.foreach(index.append) | |
index.close() | |
// open the index with a different starting offset to fake invalid data | |
val reopenedIndex = new TransactionIndex(100L, file) | |
assertThrows(classOf[CorruptIndexException], () => reopenedIndex.sanityCheck()) |
99bf0ad
to
220dfb6
Compare
// Index Files already exist | ||
// rsm should not be called again | ||
// instead files exist on disk | ||
// should be used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make these 4 lines into 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}) | ||
cache.getIndexEntry(rlsMetadata) | ||
// No exception should occur | ||
// Offset rsm 0,TimeIndex 1 , TxnIndex 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is offset rsm
? OffsetIndex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the description. Yes it was OffsetIndex
// However cache fetch failed | ||
// but it has already created offset and time index file in remote cache dir. | ||
// Current status | ||
// ( cache is null) | ||
// RemoteCacheDir contain | ||
// 1. Offset Index File which is fine and not corrupted | ||
// 2. Time Index File which is corrupted | ||
// What should be the code flow in next execution | ||
// 1. No rsm call for fetching OffSet Index File. | ||
// 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove addition space between words.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// 1. Offset Index File which is fine and not corrupted | ||
// 2. Time Index File which is corrupted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word which
can be removed:
// 1. Offset Index File is fine and not corrupted
// 2. Time Index File is corrupted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@showuon @iit2009060 please don't wait for my review on this one. I might not get to it until next week. |
@showuon Can you review @divijvaidya comments. I have taken care of all the reviews. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iit2009060 , overall LGTM, left some refactor comments.
@ParameterizedTest | ||
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) | ||
def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { | ||
// create Corrupt Index File in remote index cache |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// create Corrupted Index File in remote index cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format is wrong:
if (indexType == IndexType.OFFSET) {
createCorruptOffsetIndexFile(cache.cacheDir())
} else if (indexType == IndexType.TIMESTAMP) {
createCorruptTimeIndexOffsetFile(cache.cacheDir())
} else if (indexType == IndexType.TRANSACTION) {
createCorruptTxnIndexForSegmentMetadata(cache.cacheDir(), rlsMetadata)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// assert that parent directory for the index files is correct | ||
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString, | ||
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent") | ||
s"offsetIndex=entry.offsetIndex().file().toPath is created under incorrect parent") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be s"offsetIndex=$offsetIndexFile is created under incorrect parent")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
val timeIdx = createTimeIndexForSegmentMetadata(metadata) | ||
val txnIdx = createTxIndexForSegmentMetadata(metadata) | ||
maybeAppendIndexEntries(offsetIdx, timeIdx) | ||
// Create corrupt index file which would be returned by rsm |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Create corrupt index file which would be returned by rsm
->
// Create corrupted index file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// However cache fetch failed | ||
// but it has already created offset and time index file in remote cache dir. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 lines can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// No exception should occur | ||
// rsm should not be called for offset Index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// No exception should occur
<- should be removed
// rsm should not be called for offset Index
-> // rsm should not be called to fetch offset Index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Index Files already exist ,rsm should not be called again | ||
// instead files exist on disk should be used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Index Files already exist, rsm should not fetch them again.
and remove L654
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// rsm should not be called again | ||
// instead files exist on disk | ||
// should be used | ||
verifyFetchIndexInvocation(count = 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. So that will confuse readers. Please do either:
- reset the mock rsm
or - run
verifyFetchIndexInvocation(count = 1)
again before L672, so that we can make sure the count is not increasing even we callcache.getIndexEntry
again.
if (testIndexType == IndexType.OFFSET) { | ||
createCorruptOffsetIndexFile(tpDir) | ||
} | ||
else if (testIndexType == IndexType.TIMESTAMP) { | ||
createCorruptTimeIndexOffsetFile(tpDir) | ||
} | ||
else if (testIndexType == IndexType.TRANSACTION) { | ||
txnIdx = createCorruptTxnIndexForSegmentMetadata(tpDir, rlsMetadata) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And since these logic repeat again, we can extract them into another helper method. Ex:
private void createCorruptedIndexFile(IndexType type) {
if (testIndexType == IndexType.OFFSET) {
createCorruptOffsetIndexFile(tpDir)
.....
}
Then, in the test, we can just call createCorruptedIndexFile(testIndexType)
to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@showuon done as mentioned. |
4edf149
to
db648c1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Just a minor formatting comment.
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) | ||
def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { | ||
// create Corrupted Index File in remote index cache | ||
createCorruptedIndexFile(indexType,cache.cacheDir()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after comma (indexType, cache.cacheDir()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick fix. Let's wait for the CI build completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@showuon Unrelated test failures.
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/9/tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are adding new tests, can we create a new RemoteIndexCacheTest under storage
module and start writing the tests in java
?
The source code is in Java but the test is in scala. We can move the existing tests in a separate PR.
@@ -166,6 +166,11 @@ public Cache<Uuid, Entry> internalCache() { | |||
return internalCache; | |||
} | |||
|
|||
// Visible for testing | |||
public File cacheDir() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we file a ticket to move the RemoteIndexCacheTest to storage
module under the same package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kamalcph I have just started understanding of the kafka ecosystem , Can you help me understand the rational behind it , Then i can create a ticket with details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can find more details about the intention to move the source code to Java in KAFKA-14524
683ab3a
to
f953439
Compare
@kamalcph Can i create a ticket for same and work on it after PR merge. This has been pending from last week and already approved.
|
@showuon @kamalcph @divijvaidya I have rebased changes and resolved conflict on the changes merged by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes in the patch, LGTM. We can rewrite it to Java in a separate patch.
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) | ||
def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { | ||
// create Corrupted Index File in remote index cache | ||
createCorruptedIndexFile(indexType, cache.cacheDir()) | ||
val entry = cache.getIndexEntry(rlsMetadata) | ||
// Test would fail if it throws corrupt Exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update this comment to?
Test would fail it it throws exception other than CorruptIndexException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// rsm should not be called to fetch offset Index | ||
verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET)) | ||
verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP)) | ||
verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Txn index file was not corrupted. Can we add a comment to explain why are we fetching it again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
f953439
to
951decd
Compare
@iit2009060 , there are compilation failure, could you fix it? |
on it. |
@hudeqi @showuon There is one test case failing related to the above change #14381, can you help me resolving it , in the local it is not failing ? |
hi, I think this may be the reason: when executing |
Yes sure @hudeqi . This seems to be a case of flaky behaviour i.e. race condition ? |
I think so, please cherry-pick this commit: hudeqi@854c773, thanks. @iit2009060 |
cfe4ee5
to
414bcbb
Compare
@hudeqi This is given me bad object/revision . Can you have the command handy to do cherry pick from another fork ? |
|
Unrelated test failures |
@showuon Can we merge this changes ? |
TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished, | ||
"Failed to finish cleanup cache entry after resizing cache.") | ||
|
||
// verify no index files on remote cache dir | ||
TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iit2009060 @hudeqi , adding an isCleanFinished
flag just for test is not a good solution. In this case, could we catch the exception in the getIndexFileFromRemoteCacheDir
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hudeqi Can you resolve the above comment ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for the update.
Failed tests are unrelated and also failed in trunk build. |
est Cases Covered 1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present. 2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution. 3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169 Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
est Cases Covered 1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present. 2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution. 3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169 Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
Test Cases Covered
Committer Checklist (excluded from commit message)