New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14993: Improve TransactionIndex instance handling while copying to and fetching from RSM #14363
Conversation
… to and fetching from RSM.
22b82d3
to
07378c3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abhijeetk88 for the PR. Left a comment for clarification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abhijeetk88 for addressing the review comments.
@@ -123,7 +123,7 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, | |||
* Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}. | |||
* <p> | |||
* Note: The transaction index may not exist because of no transactional records. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine to leave this comment for better clarity even though param doc clarifies that RemoteResourceNotFoundException
is thrown when any index is not found in remote storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to me, too.
cc @ivanyu , as you requested it for 3.6.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abhijeetk88 for addressing the comments. LGTM.
@@ -123,7 +123,7 @@ InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, | |||
* Returns the index for the respective log segment of {@link RemoteLogSegmentMetadata}. | |||
* <p> | |||
* Note: The transaction index may not exist because of no transactional records. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to me, too.
@@ -382,6 +385,8 @@ private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteL | |||
TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> { | |||
try { | |||
return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION); | |||
} catch (RemoteResourceNotFoundException e) { | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better to add some comment here to explain why we treat txnIndex differently. Ex:
Don't throw exception since the transaction index may not exist because of no transactional records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
@@ -451,7 +456,7 @@ public static class Entry implements AutoCloseable { | |||
|
|||
private final OffsetIndex offsetIndex; | |||
private final TimeIndex timeIndex; | |||
private final TransactionIndex txnIndex; | |||
private final Optional<TransactionIndex> txnIndexOpt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it clear again why we treat txnIndex differently. Please add a comment. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
Thank you @abhijeetk88! |
@showuon Thanks for review. I have addressed your comments. Please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, left minor comment to address. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During the initial load in init()
method, we have a condition where we load existing entries on disk into the cache based on the condition:
if (Files.exists(offsetIndexFile.toPath()) &&
Files.exists(timestampIndexFile.toPath()) &&
Files.exists(txnIndexFile.toPath()))
Since, trxIndex is optional, for most cases, this will end up deleting these pre-existing index files and loading them again from remote. This is the safe way because we don't know if the trxIndex is genuinely missing because it doesn't exist for this segment or it's missing due to some error.
May I suggest that we use an empty file on disk to specify that trxIndex is genuinely absent (and not because of some error)? This would help make the interface more deterministic since now we can differentiate cases genuine vs. error scenario.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
@@ -382,6 +388,9 @@ private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteL | |||
TransactionIndex txnIndex = loadIndexFile(txnIndexFile, remoteLogSegmentMetadata, rlsMetadata -> { | |||
try { | |||
return remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION); | |||
} catch (RemoteResourceNotFoundException e) { | |||
// Don't throw an exception since the transaction index may not exist because of no transactional records. | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a javadoc for the usage of this method that null is expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should add javadoc to the below method? This one is the caller and the second parameter could receive null input stream.
private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata,
Function<RemoteLogSegmentMetadata, InputStream> fetchRemoteIndex,
Function<File, T> readIndex) throws IOException {
The method is private. Should we still add a javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes please. I know it is not necessary for private methods but it would be useful for future code implementors to understand that they are expected to handle nulls returned by this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got confused with this statement:
This one is the caller and the second parameter could receive null input stream.
It's the third parameter. Can you place the one parameter per line for readability?
@divijvaidya I do not have a strong opinion on that, I am fine with either way. Sure, we can create an empty file in the file cache if there is no txnIndex for a specific segment. |
Nice catch! Yes, this is indeed an issue. For your suggestion @divijvaidya , I was thinking about how we could distinguish between "real empty txnInex file" and the one with absent txnIndex file. Then, I realize we don't have to care about that because in both cases, we basically will do no-op when getting the file. So, I'm +1 to store an empty txnIndex file instead of |
d01fc72
to
778d05a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @abhijeetk88 for addressing the latest review comments in the updated PR, that includes the test changes for checking empty files. LGTM.
@divijvaidya @showuon I have addressed your comments. Please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes looks clean with the latest commit. Thanks, @abhijeetk88 for working on this! LGTM.
There are unrelated test failures, merging it to trunk and 3.6. |
… to and fetching from RSM (#14363) - Updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException instead of returning an empty InputStream when it does not have a TransactionIndex. - Updated the LocalTieredStorage implementation to adhere to the new contract. - Added Unit Tests for the change. Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
… to and fetching from RSM (apache#14363) - Updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException instead of returning an empty InputStream when it does not have a TransactionIndex. - Updated the LocalTieredStorage implementation to adhere to the new contract. - Added Unit Tests for the change. Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
… to and fetching from RSM (apache#14363) - Updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException instead of returning an empty InputStream when it does not have a TransactionIndex. - Updated the LocalTieredStorage implementation to adhere to the new contract. - Added Unit Tests for the change. Reviewers: Satish Duggana <satishd@apache.org>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
In this PR, we have updated the contract for RSM's fetchIndex to throw a ResourceNotFoundException if it does not have TransactionIndex, instead of returning an empty inputstream.
Also, updated the LocalTieredStorage implementation to adhere to the new contract.
Added Unit Tests for the change.
Committer Checklist (excluded from commit message)