-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14481: Move LogSegment/LogSegments to storage module #14529
Conversation
32ac0c1
to
db75c14
Compare
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.lazyOffsetIndex().get().file()), | ||
toPathIfExists(segment.lazyTimeIndex().get().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())), | ||
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.offsetIndex().file()), | ||
toPathIfExists(segment.timeIndex().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@satishd Is it intentional that we force the indexes to be materialized here? We could pass the file without materializing if that's better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This metadata is passed to RSM plugin which is external to Kafka. I would like to hide the details (that we have a lazy materialized index) from the external RSM plugin and instead have a clean contract which states, "Kafka guarantees that these files will be present, RSM can pick up and upload these". This provides a clean decoupling where Kafka <-> RSM plugin state sharing is only via files.
That is why we need to materialize the indexes before giving them to RSM plugin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is cost in materializing and hence why we should be sure it's needed. If that's the case here, then we're all good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been thinking about this. I take back my original statement.
We want to ensure that the file we are passing to RSM plugin contains all the data which is present in MemoryByteBuffer i.e. we should have flushed the MemoryByteBuffer to the file using force(). In Kafka, when we close a segment, indexes are flushed asynchronously. Hence, it might be possible that when we are passing the file to RSM, the file doesn't contain flushed data. This is a bug but it is not related to the change we are trying to make here. I will create a separate JIRA for this.
RSM doesn't need to read the index file during archive, hence it's ok to pass just the file name "without" materializing the index in the in-memory mmapped buffers.
Hence, we should flush() the content of indexes into the file before this operation but it is not necessary to materialize (i.e. read the content of file into memory).
@satishd we require your opinion here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can file the JIRA and discuss it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ijuma, Good catch! afaik, no need to load the lazyIndex to write the contents of the index files to remote storage.
@Divij Right, we do not need to materialize the indexes for RSM to write them to the remote storage. Whenever a log segment is rolled over, segment and its indexes are flushed to disk in an asynchrnous manner. As indexes are mmapped, any file reads fetch from page cache which will be consistent with whatever is written to memory. We can explore whether it is really needed to flush it. RLM has access to segment, that can be flushed before the files are passed to RSM.
Filed https://issues.apache.org/jira/browse/KAFKA-15612 for follow-up discussion.
@@ -1945,26 +1947,17 @@ object UnifiedLog extends Logging { | |||
logOffsetsListener) | |||
} | |||
|
|||
def logFile(dir: File, offset: Long, suffix: String = ""): File = LogFileUtils.logFile(dir, offset, suffix) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and other deleted methods were simply passing through to LogFileUtils
and hence did not add enough value to retain.
case Level.WARN => logging.warn(e.getMessage, e) | ||
case Level.INFO => logging.info(e.getMessage, e) | ||
case Level.DEBUG => logging.debug(e.getMessage, e) | ||
case Level.TRACE => logging.trace(e.getMessage, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed a bug where we were not using the passed logging
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add a test (perhaps for one of the functions that are using this utility) which could have caught this? Could be done as a separate JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I vote for separate JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
separate JIRA is good. Could be a great newbie task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ac459c9
to
1600c76
Compare
storage/src/main/java/org/apache/kafka/storage/internals/log/LazyIndex.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Outdated
Show resolved
Hide resolved
return lazyTimeIndex.get(); | ||
} | ||
|
||
public File timeIndexFile() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are leaking the implementation detail of index here (the fact that it is backed by a file). IMO, this should not be a public method. If someone wants to access the underlying index file, they should be asking the Index about it and not the LogSegment using LogSegment.timeIndex().file() instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's public because it needs to be a public method (this is an internal class though). What you suggested doesn't work because it forces materialization and it would be a serious regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are leaking the fact here that the index is implemented using a lazy index implementation outside the segment.
Does the caller know that File returned by timeIndexFile()
may not be consistent with the in-memory state of index? We are adding the responsibility on the caller to ensure that it calls flush() if it requires consistency. This means that we are leaking the internal implementation of index (being lazy) to the caller. This is concerning because it can cause bugs where authors using this method in other parts of the code may not understand that it could be eventually inconsistent.
If we really want to provide an index reference which doesn't require materialization, why not share the LazyIndex with the caller. Then the caller explicitly knows that this index is lazily evaluated?
Also, where are we using this function outside this class? Should this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not leaking anything that is not already exposed. It would be different if that was not accessible via the overall interface already. The only discussion here is not whether this is exposed (it already is), but how to expose it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not leaking anything that is not already exposed. It would be different if that was not accessible via the overall interface already. The only discussion here is not whether this is exposed (it already is), but how to expose it.
Fair point. Let tackle the how question in this PR and take up plugging the underlying file implementation leak separately later.
On the how part, can we keep the index or LaxyIndex as the components who choose to expose internal implementation outside (instead of LogSegment)? What that means is that we will probably have a function here in LogSegment which returns LazyIndex.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LazyIndex
exposes a larger surface area though. And it's confusing to expose both LazyIndex
and the materialized index. The way it is now is actually simpler. Local indexes are file based and that's core to how they work. LogSegment
gives you the file if that's all you need or the materialized index. And LazyIndex
is used internally to simplify the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, could we add a javadoc on this method that the physical file pointed to by this file object may not be consistent with in-memory copy of index? It doesn't totally address my concern of a potential bug when someone uses this File and assumes that it represents a consistent view of the index but I will am happy with a javadoc now. The whole index implemented by a file thing needs to be hidden away but that belongs in a separate JIRA.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indexes are memory mapped, so in theory it should be consistent for reads that go through the page cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last clarification: I am not opposed to improving the overall modularity of the classes in the storage layer. As you said, in a separate PR/discussion.
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
Show resolved
Hide resolved
Thanks for the prompt review @divijvaidya! Please note that this is still in draft because there are still failing tests. Since it was still in draft mode, I have been force pushing and so on. In the future, please let me know if you are reviewing a draft PR so I can avoid force pushes (which are painful for you as the reviewer). |
1600c76
to
ee4ef39
Compare
…sible for testing
…ode vs the old code
1f3b432
to
01d2179
Compare
TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, | ||
"Timed out waiting for deletion of old segments") | ||
assertEquals(1, log.numberOfSegments) | ||
|
||
cleaner.shutdown() | ||
closeLog(log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed we did not close the log before reopening it a bit later. It didn't seem to cause any problems, but it made it more difficult to debug test failures (since the behavior is not clearly defined in this case).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
* The first time this is invoked, it will result in a time index lookup (including potential materialization of | ||
* the time index). | ||
*/ | ||
public TimestampOffset readMaxTimestampAndOffsetSoFar() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a read
prefix to this to make it clearer that it does more than than the field maxTimestampAndOffsetSoFar
. The conversion from Scala to Java had originally caused some code to use the field instead of the method, which led to some subtly different behavior in some cases.
I believe the tests should pass this time, let's see. |
The Java 11 build has 3 unrelated failures:
This is ready for review. |
Test failures:
Java 8 passed and the failures look unrelated, but I kicked off another build to get more signal. |
@ijuma Thanks for the PR. I will review it tomorrow. |
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java
Show resolved
Hide resolved
case Level.WARN => logging.warn(e.getMessage, e) | ||
case Level.INFO => logging.info(e.getMessage, e) | ||
case Level.DEBUG => logging.debug(e.getMessage, e) | ||
case Level.TRACE => logging.trace(e.getMessage, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to add a test (perhaps for one of the functions that are using this utility) which could have caught this? Could be done as a separate JIRA.
TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, | ||
"Timed out waiting for deletion of old segments") | ||
assertEquals(1, log.numberOfSegments) | ||
|
||
cleaner.shutdown() | ||
closeLog(log) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for patiently answering my comments, Ismael. This looks good to me.
Thanks for the review and for paying close attention to code quality (modularity, readability, etc.) - it's important! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ijuma for the PR covering the refactoring and the cleanup . LGTM.
else if (e instanceof RuntimeException) | ||
throw (RuntimeException) e; | ||
else | ||
throw new IllegalStateException("Unexpected exception thrown: " + e, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good change to maintain the semantics while moving to Java.
public void close() throws IOException { | ||
if (maxTimestampAndOffsetSoFar != TimestampOffset.UNKNOWN) | ||
Utils.swallow(LOGGER, Level.WARN, "maybeAppend", () -> timeIndex().maybeAppend(maxTimestampSoFar(), offsetOfMaxTimestampSoFar(), true)); | ||
Utils.closeQuietly(lazyOffsetIndex, "offsetIndex", LOGGER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to use closeQuietly
instead of using swallow by closing here.
) A few notes: * Delete a few methods from `UnifiedLog` that were simply invoking the related method in `LogFileUtils` * Fix `CoreUtils.swallow` to use the passed in `logging` * Fix `LogCleanerParameterizedIntegrationTest` to close `log` before reopening * Minor tweaks in `LogSegment` for readability For broader context on this change, please check: * KAFKA-14470: Move log layer to storage module Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>
A few notes:
UnifiedLog
that were simply invoking the related method inLogFileUtils
CoreUtils.swallow
to use the passed inlogging
LogCleanerParameterizedIntegrationTest
to closelog
before reopeningLogSegment
for readabilityFor broader context on this change, please check:
Committer Checklist (excluded from commit message)