New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12552: Introduce LogSegments class abstracting the segments map #10401
KAFKA-12552: Introduce LogSegments class abstracting the segments map #10401
Conversation
@junrao, @dhruvilshah3, @satishd - this PR is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the PR. Just one comment below.
* The active segment that is currently taking appends. | ||
*/ | ||
@threadsafe | ||
def activeSegment = lastEntry.get.getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the same as lastSegment()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That's a good question. I'll eliminate this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
5830b78
to
4d73714
Compare
Thanks for the review @junrao ! I've addressed the review comment(s) in 8f1e292 and 4d73714. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR looks reasonable to me. Thanks @kowshik.
/** | ||
* @return the base offsets of all segments | ||
*/ | ||
def baseOffsets: Seq[Long] = segments.values().asScala.map(_.baseOffset).toSeq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this return an Iterable
instead of Seq
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suggestion feels reasonable to me. I've addressed it in 43bf65c.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kowshik : Thanks for the updated PR. LGTM. Do you know why the jenkins tests didn't run?
@junrao For jenkins, let me try bumping this PR with a rebase to see if that will start up jenkins. I'm not sure why there is a delay. It seems the |
43bf65c
to
9dbb679
Compare
@junrao The Jenkins run has started now. We can wait for the tests to pass. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @kowshik for the PR, LGTM.
@junrao The jenkins tests have finished. I checked the test failures, they seem to be unrelated to this PR:
|
…apache#10401) This PR is a precursor to the recovery logic refactor work (KAFKA-12553). In this PR, I've extracted the behavior surrounding segments map access within kafka.log.Log class into a new class: kafka.log.LogSegments. This class encapsulates a thread-safe navigable map of kafka.log.LogSegment instances and provides the required read and write behavior on the map. The Log class now encapsulates an instance of the LogSegments class. Couple advantages of this PR: Makes the Log class a bit more modular as it moves out certain private behavior thats otherwise within the Log class. This is a precursor to refactoring the recovery logic (KAFKA-12553). In the future, the logic for recovery and loading of segments from disk (during Log) init will reside outside the Log class. Such logic would need to instantiate and access an instance of the newly added LogSegments class. Tests: Added a new test suite: kafka.log.LogSegmentsTest covering the APIs of the newly introduced class. Reviewers: Satish Duggana <satishd@apache.org>, Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
…10684) In Log.collectAbortedTransactions() I've restored a previously used logic, such that it would handle the case where the starting segment could be null. This was the case previously, but the PR #10401 accidentally changed the behavior causing the code to assume that the starting segment won't be null. In Log.rebuildProducerState() I've removed usage of the allSegments local variable. The logic looks a bit simpler after I removed it. I've introduced a new LogSegments.higherSegments() API. This is now used to make the logic a bit more readable in Log. collectAbortedTransactions() and Log.deletableSegments() APIs. I've removed the unnecessary use of java.lang.Long in LogSegments class' segments map definition. I've converted a few LogSegments API from public to private, as they need not be public. Reviewers: Ismael Juma <ismael@juma.me.uk>, Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>
This PR is a precursor to the recovery logic refactor work (KAFKA-12553).
In this PR, I've extracted the behavior surrounding segments map access within
kafka.log.Log
class into a new class:kafka.log.LogSegments
. This class encapsulates a thread-safe navigable map ofkafka.log.LogSegment
instances and provides the required read and write behavior on the map. TheLog
class now encapsulates an instance of theLogSegments
class.Couple advantages of this PR:
Log
class a bit more modular as it moves out certain private behavior thats otherwise within theLog
class.Log
) init will reside outside theLog
class. Such logic would need to instantiate and access an instance of the newly addedLogSegments
class.Tests:
Added a new test suite:
kafka.log.LogSegmentsTest
covering the APIs of the newly introduced class.