New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14522 Rewrite/Move of RemoteIndexCache to storage module. #13275
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had an early review, left some comments. Thanks.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
da17d44
to
90b914c
Compare
storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
81ceeee
to
e49a357
Compare
Thanks @showuon for your review. Addressed the comments with the latest commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the PR. A few comments below.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
try { | ||
entry.markForCleanup(); | ||
} catch (IOException e) { | ||
throw new KafkaException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor declares throwing IOException. Why do we need to convert IOException to KafkaException? Ditto in other methods like init()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception is not directly thrown to the constructor. It is inside an anonymous inner class of LinkedHashMap#removeEldestEntry
. This method is not declared with IOException
.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
Thanks @junrao for your review. Addressed your comments inline and/or updated with the latest commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the updated PR. A few more comments.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogFileUtils.java
Outdated
Show resolved
Hide resolved
b290f00
to
22a9c72
Compare
Will take a look this week or next week. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd , thanks for the update. Left some comments.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the updated PR. Just a few minor comments.
cleanerThread.start(); | ||
} | ||
|
||
public LinkedBlockingQueue<Entry> expiredIndexes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the next two methods are only used for tests. Should we expose them at the package level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are written at different package level in core module. We can revisit this once the tests are also moved to java and the respective module/package.
log.error("Cleaner thread received interruption but remote index cache is not closed", ex); | ||
throw new KafkaException(ex); | ||
} else { | ||
log.debug("Cleaner thread was interrupted on cache shutdown"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShutdownableThread already logs the shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is very specific to the interrupted scenario.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
// Delete any .deleted files remained from the earlier run of the broker. | ||
try (Stream<Path> paths = Files.list(cacheDir.toPath())) { | ||
paths.forEach(path -> { | ||
if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we clean up the tmp file too?
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
Cleanedup index file suffix usages and other minor cleaups
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
Thanks @junrao for the review comment. Addressed them with the latest commits and/or inline. |
Thanks @divijvaidya for the review. Addressed them with inline and/or with the latest commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a last round of minor comments. I am good to ship this PR once they are resolved.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Outdated
Show resolved
Hide resolved
Thanks @divijvaidya for the latest review. Addressed them with the latest commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few comments related to TxnIndex potentially being optional.
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java
Show resolved
Hide resolved
@jeqo There is already KAFKA-14993 to address that. @kamalcph was working on that. I did not want to add those changes to this PR as it is tracked separately. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thanks for the reminder. LGTM, thanks @satishd !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd : Thanks for the updated PR. LGTM
A few tests are failed but those are not related to the changes in the PR. |
…he#13275) KAFKA-14522 Rewrite and Move of RemoteIndexCache to storage module. Cleanedup index file suffix usages and other minor cleanups Reviewers: Jun Rao <junrao@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>
KAFKA-14522 Rewrite/Move of RemoteIndexCache to the storage module.
Committer Checklist (excluded from commit message)