Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets #9590

Closed
wants to merge 20 commits into from

Conversation

jolshan
Copy link
Contributor

@jolshan jolshan commented Nov 12, 2020

The current behavior for KafkaConsumer.beginningOffsets on compacted topics is to always return offset 0. However, when looking at the record with the earliest timestamp in the log with KafkaConsumer.offsetsForTimes, the offset is nonzero.

This PR seeks to correct this issue by updating the segment baseOffsets and the logStartOffset during compaction.
The segment offset will be the offset of the first non-compacted batch, or the original baseOffset if the segment is empty.
Generally, all updates to logStartOffset must maintain the invariant logStartOffset <= end offset of first batch that can be returned by Fetch.

There are a few key changes/decisions to note that may not be immediately apparent:

  1. Producer Snapshot Behavior Currently the behavior of producer snapshots is that during compaction, if the segment offset of the producer snapshot file is not in the new segment offsets, the snapshot is deleted. This will lead to behavior like the following:
  • So for example we have segments with offsets 0 and 1, we want to delete first segment
    New segment is base offset 0 so snapshot for segment 1 deleted. One segment, no snapshot files

Under this PR, the new behavior is this

  • We have segments with offsets 0 and 1, we want to delete first segment
    New segment is base offset 1 so we don't delete snapshot. One segment, one snapshot file

I was not sure if this is behavior we wanted, so I'm pointing it out.

  1. High Watermark Compaction I wasn't sure if we allowed compaction past the high watermark. It seems like there is not a check to prevent it. I've added such a check for this PR.

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @jolshan. Left some high-level comments and questions.

core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved
@jolshan
Copy link
Contributor Author

jolshan commented Nov 16, 2020

I've updated the code to delay creating a new segment until there is a non-compacted record. If a segment is never created in cleanSegments, the old segments are simply deleted rather than replaced. I had to change some code surrounding the transactionMetadata that allows a delay before updating the transactionIndex of a segment until the segment is actually created. Any aborted transactions will be added once the segment is created.

@jolshan
Copy link
Contributor Author

jolshan commented Nov 17, 2020

Also looks like the test I added may be flaky, so I'll take a look at that.

@chia7712
Copy link
Contributor

(I'm still reading this story so pardon me for asking stupid question.)

If using timestamp=0 can bring correct offset, why not following that way to handle ListOffsetRequest.EARLIEST_TIMESTAMP? For example, (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1643) if timestamp is equal to ListOffsetRequest.EARLIEST_TIMESTAMP, we pass 0 to find offset. Does it work?

@jolshan
Copy link
Contributor Author

jolshan commented Nov 19, 2020

@chia7712 The code path you linked is the code path beginningOffsets uses. Are you suggesting removing ListOffsetRequest.EARLIEST_TIMESTAMP and replacing it with 0?

@chia7712
Copy link
Contributor

Are you suggesting removing ListOffsetRequest.EARLIEST_TIMESTAMP and replacing it with 0?

yep, it seems like a simple solution without much changes. However, I have not understood this issue totally. It is just my imagination.

@jolshan
Copy link
Contributor Author

jolshan commented Nov 19, 2020

@chia7712 I did think about this solution initially. I'm wondering if we do want to update the segment offsets and logStartOffsets correctly though. If that isn't as important then maybe we can go with the simpler solution. This solution removes empty segments and keeps the baseOffsets updated. If these baseOffsets are used in other places, then maybe this solution is better.

@jolshan
Copy link
Contributor Author

jolshan commented Dec 3, 2020

@chia7712 Were you able to take a closer look at this?

@@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR
if (!retainedRecords.isEmpty()) {
if (writeOriginalBatch) {
batch.writeTo(bufferOutputStream);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
filterResult.updateRetainedBatchMetadata(batch, retainedRecords.get(0).offset(), retainedRecords.size(), false);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... It seems a little inconsistent to use the offset of the first record. When the batch is empty, we use the base offset of the batch. Shouldn't we do the same here? Otherwise we will end up with some batches which have base offset smaller than the segment base offset. Note that the base offset is always preserved by compaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I guess that the goal here was to give the actual offset of the first record. If we choose to give the batch offset instead, we may return an offset that is still not the actual first offset (but will be much closer than 0). If this behavior is acceptable we can go with this option, but it will not match the behavior of consumer.offsetsForTimes() with timestamp 0L, as mentioned in KAFKA-7556.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have much choice given retention semantics. I think we have to keep the invariant that the segment base offset is less than or equal to the base offset of any batch contained in it. Note that these batches are still returned in Fetch requests, so it seems reasonable to let the log start offset reflect them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, I understand that there are issues with the segment base offset mismatching with the base offset.
But would there also be issues if the batch offset was lower than the log start offset as well?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that a call to DeleteRecords can advance the start offset to an arbitrary offset which could be in the middle of a batch. I do not think today that we have any hard guarantees that we won't return batches or even records that are lower than the current log start offset. It might be worth experimenting with this to clarify what the behavior is today. With that said, given the fact that we always return the whole batch anyway, my preference would probably be to try and keep the log start offset aligned on a batch boundary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'll look into that behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option I considered that could give the exact offset without changing any offsets in the log would be to handle the listOffset request for the earliest timestamp on compacted topics differently. Instead of grabbing the log start offset, it could behave as though the request was a call for timestamp 0 and search for the first offset that is >=0 (as it does for any other timestamp search) Would this be a preferable solution if we wanted to ensure the exact offset?

Copy link

@hachikuji hachikuji Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're on the right track really. We don't have to implement exactly what the jira describes. One thing to keep in mind is that the first record in the log might be a control record which a consumer won't return to applications anyway. Also note that the log start offset has implications for replication. Only the records above the log start offset will be replicated to a new replica, but all of the retained records after cleaning should be replicated. Hence we already have some constraints which make keeping the log start offset aligned with the first returnable record difficult.

I would just say that we define and document a reasonable invariant. I think it should be clear that we need to ensure the segment base offset is less than or equal to the base offset of the first batch contained in it. The invariant I would suggest is that the log start offset should be less than or equal to the end offset of the first batch that can be returned from Fetch. For the case of log cleaning, the simplest thing to do is keep the log start offset aligned with the base offset of the first batch. In the case of DeleteRecords, the log start offset will point to some offset within the first batch.

logStartOffset <= endOffset of first batch returned by fetch, but for cleaning, logStartOffset = segment base offset
segment baseOffset <= baseOffset of first record batch, in log cleaning, they should be equal
@jolshan
Copy link
Contributor Author

jolshan commented Dec 29, 2020

Updated to match invariants discussed above, still need to look at a few other areas like producer snapshots and high watermark

@jolshan
Copy link
Contributor Author

jolshan commented Dec 30, 2020

Updated to not clean past high watermark. If this should be its own ticket/PR, let me know.

@@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int,
transactionMetadata.addAbortedTransactions(abortedTransactions)

val retainDeletesAndTxnMarkers = currentSegment.lastModified > deleteHorizonMs
info(s"Cleaning $currentSegment in log ${log.name} into ${cleaned.baseOffset} " +
info(s"Cleaning $currentSegment in log ${log.name} into ${currentSegment.baseOffset} " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log message no longer makes sense. Is the intent to use cleanedSegment instead of currentSegment? Perhaps we should consider moving this log line to after the cleaning when we know what the new segment is.

Copy link
Contributor Author

@jolshan jolshan Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I think I didn't really understand this log at first. It does make sense to move it to when we know what the new segment is. I think the issue I had was what to do with including retainDeletesAndTxnMarkers. Maybe it makes sense to have a log saying the current segment and the deletion info and a second log to say what the new segment's baseOffset is later.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow what the complication is. We can still mention whether deletes/markers have been retained after the fact. It would be useful to clearly indicate when no records were retained from a segment though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

Copy link
Contributor Author

@jolshan jolshan Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was concerned about putting this information in the try block and it not logging due to LogSegmentOffsetOverflowException. But I looking at it again, it's not a huge deal not to log in that case.

core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved

// swap in new segment
info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in log $log")
log.replaceSegments(List(cleaned), segments)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to update log start offset in replaceSegments when we are protected by the lock? Otherwise, it seems like the log start offset will be out of sync with the segments until all of them have been cleaned and maybeIncrementLogStartOffset is called.

Note that deleteSegments does update the log start offset. Interestingly, SegmentDeletion is used for the reason in this case, and not SegmentCompaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I wasn't sure if it made sense to change the deleteSegments code to support the SegmentCompaction reason. The entire segment is being deleted here, but I see how this may be confusing when reading in the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the first part about replaceSegments. It seems that there are a few other times we call this method. I'm wondering if we would want to update the logStartOffset in these cases (probably) and what the reason should be. I'm also wondering if we should include the reason as a parameter to replaceSegments.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are really only two cases for replaceSegments: log compaction, and segment splitting. We also call it in completeSwapOperations, but that is for recovery if the broker crashes in the middle of replaceSegments. It should be safe in either case to adjust the log start offset. Probably something similar to the deletion case which just ensures that the log start offset did not fall off the log:

maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentDeletion)

Ideally LogCompaction would be the reason, at least in the log compaction case. It's becoming cumbersome to pass through the reason in all cases it is used, but I'm not sure I see an alternative. Perhaps we can consider how to refactor this in the future. Maybe there should be some kind of context object which encapsulates the higher level operation that is being done (such as log compaction or retention enforcement).

@@ -1094,6 +1119,8 @@ private[log] class CleanedTransactionMetadata {

// Output cleaned index to write retained aborted transactions
var cleanedIndex: Option[TransactionIndex] = None

var toAppend: List[AbortedTxn] = List.empty

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand why we needed to change the logic for appending to the transaction index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the issue is that this code expects the log to be created already but it is not. I think my intention was to delay this operation until the log is created. I'll double check this is actually the case.

if (destSegment.isEmpty) {
// create a new segment with a suffix appended to the name of the log and indexes
destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset()))
transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)
Copy link
Contributor Author

@jolshan jolshan Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: changing transaction code
Since cleanedIndex is not created until here, we need to delay appends. Originally it was created at the start of cleanSegments. In that method, we now can not guarantee the log has been created until cleanInto returns with a defined cleanedSegment. In cleanInto there is a chance to call onControlBatchRead which previously just appeneded transactions, but can no longer do so if cleanedIndex is not defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be possible to call transactionMetadata.appendTransactionIndex() here though to append a little bit sooner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is to check if the cleanedIndex exists and append immediately if it does, otherwise put it in toAppend. Then we apply the changes only when the cleanedIndex is first defined.

Copy link

@hachikuji hachikuji Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I understand. One thing that makes this a little confusing is the need to reverse the collection. Why don't just build the index in order?

I am debating whether this is good enough. A potential problem is that we don't have a guarantee on the size of the index. In the common case, it should be small, but there is nothing to prevent an entire segment from being full of aborted transaction markers. Currently we wait until filterInto returns before creating the new segment, but maybe instead we could do it in checkBatchRetention after the first retained batch is observed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think the reverse is just because of how I append the transactions to the front of the list instead of the end. There's a better way to do that, so I'll fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Locally I have it filterTo returns. I'll look for a better way to do it.

Copy link
Contributor Author

@jolshan jolshan Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it in onControlBatchRead directly I believe. I'll add to toAppend when there is no cleanedIndex and as soon as there is, I'll add those transactions in filterInto as soon as the cleanedIndex is created. From then on, I can just add the transactions in onControlBatchRead as before.

Copy link

@hachikuji hachikuji Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to keep pressing this, but do you have an update here? It would be nice to keep the behavior here on par with the existing implementation. Really if you think about the way this is working, we could write to the cleaned segment directly from filterTo instead of building a buffer in memory, but we can leave that as a follow-up improvement.

@hachikuji
Copy link

@jolshan In Log.completeSwapOperations, we have the following comment:

      // We create swap files for two cases:
      // (1) Log cleaning where multiple segments are merged into one, and
      // (2) Log splitting where one segment is split into multiple.
      //
      // Both of these mean that the resultant swap segments be composed of the original set, i.e. the swap segment
      // must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
      // of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
      // do a replace with an existing segment.

This is no longer accurate following the change here. As I understand it, the swap recovery logic looks for each file with the .swap extension. It then tries to find all log segments which have been deleted by the compaction process by searching for any segments which have an end offset larger than the swap segment's base offset, but less than the offset of the first record in the swapped segment. This is handling the case when the broker crashes before it could delete the old segments.

The recovery algorithm is described in the comment above replaceSegments. I believe the logic still works correctly if we change it so that segments that are smaller than the first swap segment get removed. This should work because the swap suffix is removed in descending order, which means that the smallest swap file is the last one to be updated. I think we will need some test cases to ensure that the various cases work correctly.

@jolshan
Copy link
Contributor Author

jolshan commented Mar 4, 2021

@hachikuji thanks for the review. Just to clarify, you are suggesting updating/changing

val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
        segment.readNextOffset > swapSegment.baseOffset
      }

in Log.completeSwapOperations?
When you say "removed" do you mean the segments should be deleted or not included in oldSegments?
When you say "smaller" do you mean a lower offset?
My understanding is that the swap segment's baseOffset can now have a higher offset than the baseOffsets of the oldSegments it is replacing. So I'm thinking we are trying to delete all segments before the swap segment's baseOffset as well? Is there a case where we can't assume all segments before the baseOffset can be deleted?

And yes, will definitely add tests

@hachikuji
Copy link

hachikuji commented Mar 4, 2021

@jolshan The idea is to mimic during recovery the same operation that the log cleaner was trying to do. So if we were trying to replace segments [0, 10, 20] with [15] when the operation failed, then we'd want to do the same thing during recovery. The recovery logic currently assumes that a replacement like this is not possible, so we need to adjust it. For example, we might be left with the follower files on disk when the broker starts up:

0.log
10.log
20.log
30.log
40.log
15.swap
30.swap

We would want the recovery to generate two replacements: [0, 10, 20] -> 15 and [30, 40] -> 30. The tricky thing is that the recovery operation itself might fail, so we have to basically pick up the operation from the same point and continue with the same algorithm (preserving order and all)

@jolshan
Copy link
Contributor Author

jolshan commented Mar 4, 2021

@hachikuji In the example above, we could also have something like 35.swap too, right, since we are updating the baseOffset of each segment. (Which means we will need to look at the next segment's offsets as well) Can we also assume that all the log will be replaced? That is, 15.swap is the lowest swap file, so it must cover 0.log

@hachikuji
Copy link

@jolshan The active segment would not be replaced during cleaning at a minimum. I think the recovery logic we want is to scan backwards through the .swap files and collect the segments to replace. Maybe something like this?

For each swap file in reverse order:

  • If the next earliest swap file exists, then collect any segments which have a has a base offset larger than the end offset in the next earliest swap and an end offset less than that of the swap segment itself.
  • If no next earliest swap file exists, then collect all segments which have an end offset less than that of the swap file itself.
  • Call replaceSegments with the swap file and the collected log segments

@jolshan
Copy link
Contributor Author

jolshan commented Mar 8, 2021

@hachikuji updated based on the conversations here + those we had offline. Quick summary is that we will only have one swap file upon crash in the log cleaning case, so we will just accept that there may be segment(s) before or after the cleaned segment that won't get replaced upon crash recovery, but will upon subsequent cleanings.

Two things about the latest commit:

  1. I decided that in the failure case the logStartOffset will be updated outside of replace segments. In the recovery path there is already code to update the logStartOffset. It must follow changes to update the log end offset so it made sense to handle that all there. Added a test for this scenario.

  2. EDIT: fixed
    Let me know if any of the tests seem unnecessary. I think we should definitely keep testRecoveryAfterCrashBaseOffsetUpdatedFirstSegmentChunk and testLogStartOffsetUpdatedUponFailureRecovery. Let me know if any others seem superfluous.

Added test for updating logStartOffset upon failure recovery.
@jolshan
Copy link
Contributor Author

jolshan commented Mar 9, 2021

Realized I left out one case. Going to add that and clean up tests.

@@ -760,6 +768,12 @@ class Log(@volatile private var _dir: File,
// must fall within the range of existing segment(s). If we cannot find such a segment, it means the deletion
// of that segment was successful. In such an event, we should simply rename the .swap to .log without having to
// do a replace with an existing segment.
//
// For case 1 (log cleaning), we may have old segments before or after the swap segment that were cleaned.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we clarify the comment above that "the swap segment must fall within the range of existing segment(s)"? I find it a bit confusing in the context of the new behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm a little confused too. My understanding of this line is that we can't have offsets in the swap segment that are outside the range of the segments in log we started from. Is there a case with this new behavior that I'm missing where this isn't the case? Or is it the "existing segments" part that is confusing.

Copy link

@hachikuji hachikuji Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I interpreted the comment as suggesting a stronger statement about how each swapped segment related to the set of segments that it would replace. Perhaps it was really the comment that the "swap segments be composed of the original set" which confused me.

I would suggest that we use more precise language. For example, can we say that all offsets in the swapped segment must be present in the segments that are to be replaced? It would also be helpful to clarify the following sentence:

If we cannot find such a segment, it means the deletion of that segment was successful.

Really we are referring to a group of segments. How does the conclusion that deletion was successful follow from the fact that the swapped segment is "composed" of the original segments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Ok, I see what you are saying now. I can clarify these points.
As for the deletion. I think the idea is that the segments that will be replaced (that "compose" the swap segment) are marked for deletion after the swap segment is created. I think the idea is that if a segment containing an offset in the range of the swap segment's offsets is missing, that means that the deletion completed. Again, I can clarify this.

core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
@@ -2393,6 +2407,10 @@ class Log(@volatile private var _dir: File,
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))

// If not recovered swap file we need to increment logStartOffset here. Otherwise, we do this when loading the log.
if (!isRecoveredSwapFile)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is necessary? I understand that there is logic to initialize the log start offset after loading segments, but why do we need a special check to prevent updating the log start offset here?

Copy link
Contributor Author

@jolshan jolshan Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could remove the check and update the offset twice in the case of loading with swap files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I remember now. The highwatermark is not updated until later so this throws an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, my test remembers.

core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved
if (destSegment.isEmpty) {
// create a new segment with a suffix appended to the name of the log and indexes
destSegment = Some(LogCleaner.createNewCleanedSegment(log, result.minOffset()))
transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)
Copy link

@hachikuji hachikuji Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to keep pressing this, but do you have an update here? It would be nice to keep the behavior here on par with the existing implementation. Really if you think about the way this is working, we could write to the cleaned segment directly from filterTo instead of building a buffer in memory, but we can leave that as a follow-up improvement.

@@ -601,6 +601,9 @@ private[log] object LogCleanerManager extends Logging {
// the active segment is always uncleanable
Option(log.activeSegment.baseOffset),

// we do not want to clean past the high watermark
Option(log.highWatermark),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this probably makes sense. It is surprising that it was not there already. Was there a specific reason related to this patch that you decided to do it here? An alternative by the way is to replace the high watermark and first unstable offset here with lastStableOffset, which would represent the lower bound of the two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause issues during tests where if we didn't update the highWatermark and tried to compact (and update the logStartOffset) we would throw an exception since we can't update the logStartOffset higher than the HW. This made me think we also shouldn't compact beyond the HW. I'm good with using lastStableOffset

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, we do not allow the log start offset to get ahead of the high watermark, so I am not sure that is a real issue. Still, I cannot think of a strong reason to allow cleaning above the high watermark, so maybe it is simpler to prevent it.

Copy link
Contributor Author

@jolshan jolshan Apr 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched this to lastStableOffset replacing unstable offset above.

/**
* @return the baseOffset of the first batch of retained records or -1 if no batches are retained
*/
public long baseOffsetOfFirstBatch() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth using OptionalLong?

Copy link
Contributor Author

@jolshan jolshan Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-test code, this is only used when we have bytes in the segment (so it will not be -1). In tests, this is sometimes called when the value is -1. I switched to OptionalLong, but we can also switch back.

It is a little awkward, since we will need to call get() and there isn't really an alternative if the optional is empty. (I guess we could throw an error)

core/src/main/scala/kafka/log/Log.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleaner.scala Outdated Show resolved Hide resolved
core/src/main/scala/kafka/log/LogCleanerManager.scala Outdated Show resolved Hide resolved
TestUtils.sendRecordsWithKey(producer, 50, 50L, new TopicPartition(topic1, 1), "key")

// Sleep to allow compaction to take place.
Thread.sleep(25000)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch. 25s of sleep is significant. I wonder if this test is overkill given testing we have in LogCleanerTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I was wondering about this too. I originally wrote this test to describe the exact case KAFKA-7556 is describing. I'll take a look at this again and see if we can bring it down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to get it to 15 seconds consistently (50 passed tests) but once we get to 13, it starts flaking. Not sure if this is good enough to keep the test

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather just remove it if we don't have a good way to keep it from flaking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. On further inspection looks like log.cleaner.backoff.ms is 15 seconds, which is why this is happening. I could try to figure out how to change this property for this test, but it may be easier to remove it.


val lastCleanOffset = Some(0L)
val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, lastCleanOffset, time.milliseconds)
assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable offset starts at the beginning of the log.")
assertEquals(log.activeSegment.baseOffset, cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset begins with the active segment.")
}

@Test
def testCleanableOffsetsForNoneWithLowerHighWatermark(): Unit = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the name is not very clear. What does None refer to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the None may have been taken from the test above. In that test, it was referring to "no minimum compaction lag settings active". Changed name and added a comment to describe what the test is doing.

assertTrue(distinctValuesBySegmentAfterClean.size < distinctValuesBySegmentBeforeClean.size)

// Drop the first segment from before cleaning since it was removed. Also subtract 1 from numCleanableSegments
val normalizedDistinctValuesBySegmentBeforeClean = distinctValuesBySegmentBeforeClean.drop(1)
Copy link

@hachikuji hachikuji Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in this test case has become rather obscure after the change. Maybe we could do something simpler than comparing segment by segment. As far as I can tell, all the test is doing is ensuring that the first uncleanable offset is respected. Maybe a simpler test would just write the same key over and over and then assert that all records below the uncleanable offset are removed and all records above that offset are retained?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take another look at this one

Copy link
Contributor Author

@jolshan jolshan Apr 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a little tricky, but I've updated it. Now it only uses duplicate keys. It's a little confusing because the first uncleanable offset is not actually the point at which records below are cleaned. The segments cleaned are the full segments below the uncleanable offset (so if the segments before the uncleanable offset in this case). And even then, one record (the last record in the last segment) will be retained due to how cleaning works.

@jolshan
Copy link
Contributor Author

jolshan commented Jun 9, 2021

all 3 jdk versions are failing with something like this:

 Execution failed for task ':core:integrationTest'.
> Process 'Gradle Test Executor 127' finished with non-zero exit value 1
This problem might be caused by incorrect test process configuration.

They build locally, so I'm not sure what's up with Jenkins. Been seeing this with at least one jdk build failing in many PRs in this repo.

@ijuma
Copy link
Contributor

ijuma commented Aug 23, 2021

What's remaining to get this merged?

@jolshan
Copy link
Contributor Author

jolshan commented Aug 23, 2021

I think just the merge conflicts, a final lookover and an lgtm

@jolshan jolshan closed this Apr 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants