-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15388: Handling remote segment read in case of log compaction #15060
Conversation
1b245e1
to
fc956c3
Compare
fc956c3
to
e1a24df
Compare
Thank you for the contribution! I will aim to provide a review throughout the day! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a first pass, let me know if the comments don't make sense - thanks once again!
|
||
private Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, | ||
// visible for testing. | ||
Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, | ||
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws RemoteStorageException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws RemoteStorageException { | |
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws RemoteStorageException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
); | ||
|
||
when(rsmManager.fetchLogSegment(any(), anyInt())).thenReturn(fileInputStream); | ||
when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(), anyInt(), anyLong())).thenReturn(Optional.of(segmentMetadata), Optional.of(segmentMetadata)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't one Optional.of(segmentMetadata) enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out. This may be even not required as I already mocked fetchRemoteLogSegmentMetadata function.
|
||
int startPos = 0; | ||
RecordBatch firstBatch = null; | ||
while (firstBatch == null && rlsMetadataOptional.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not super hung-up on this, but don't you just need to look forward once i.e. you have a guarantee that if you do not find the offset in this segment then you are bound to find it in the next, no? If this is the case can you just look in the next segment and not use a while loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clolov We need to use a while loop as there may be a possibility ,the next segment in the iteration is also log compacted and we may need to further move until we find it. Check the example above I attached in the description PR where 0.log and 6.log both are log compacted fully and the next batch exist only in the 07.log.
The similar logic is used in fetching data from the log segment.
} else segmentOpt = segments.higherSegment(baseOffset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh, interesting, I shall circle back to this, maybe I am not as familiar with compaction as I thought I was. To be honest, I was expecting that the segments 0 and 6 will be merged into one by compaction - I was under the impression that it doesn't leave empty segments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clolov I believe this merging of segments happens in the background , but before the merging start we disable the compaction on the topic to make it ready for tiered storage because of which merging of segments never happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the delay, okay, this reasoning makes sense to me, but even if I am wrong the while loop won't really cause a performance impact, so I am happy with it!
01277d5
to
ad76366
Compare
@clolov @divijvaidya Do you have any additional review comments ? |
@clolov Can we merge the request ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the patch!
This patch handles the FETCH request for previously compacted topic segments uploaded to remote storage. We also have to go through the
upload
,deletion
path and- RemoteLogMetadataCache internal state
for both normal and unclean-leader-election scenarios.
@@ -2065,6 +2066,11 @@ public Optional<RemoteLogSegmentMetadata> fetchRemoteLogSegmentMetadata(TopicPar | |||
return Optional.of(segmentMetadata); | |||
} | |||
|
|||
public Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata, | |||
Option<LeaderEpochFileCache> leaderEpochFileCacheOption) { | |||
return Optional.ofNullable(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Optional.empty()
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
LeaderEpochFileCache cache = mock(LeaderEpochFileCache.class); | ||
when(cache.epochForOffset(anyLong())).thenReturn(OptionalInt.of(1)); | ||
|
||
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), anyInt())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two remote-storage-manager: rsmManager
(local) and remoteStorageManager
(global). Can we discard the latter one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
177c7cf
to
17deeb0
Compare
17deeb0
to
5bef7e9
Compare
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for fixing this. Looks good to me. I will wait for CI to be stable before merging this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @iit2009060 for the PR.
Let us say there are two segments in remote storage and subsequents segments in local storage.
remote-seg-10[10, 20],
remote-seg-21[21, 30] : offsets 25 to 30 are compacted.
local-seg-31[31, 40]
local-seg-41[41, 90]
local-seg-91[41, 99] (active segment)
Updated the above that the targeted offset can be in any of the following local log segments but not only limited to active segment.
When a fetch request comes for offsets within [25, 30] then it should move to the local segment as those offsets might have been compacted earlier. Did you also cover this scenario in this PR?
@satishd I have not tested this case explicitly. |
@iit2009060 You may have mistyped as controller but it does not have any role in the fetch path here. |
@satishd I am trying to reproduce the case when last offsets of the segment earlier than the active segment is compacted away.
![]() @satishd Do you have a step in mind to regenerate the above scenario ? |
Yes @satishd correct . I am still going through the code but need few inputs
|
@satishd @divijvaidya @kamalcph I am able to reproduce the above scenario using retention feature instead of log compaction. The overall problem is we are sending MemoryRecords.Empty when unable to find offset even though active segments can still have the data. Consider the below scenario
![]() ![]() ![]() ![]() ![]() ![]()
![]() In general this is happening because The next fetch request offset increments only when there is a record list. But the Remote fetch implementation by default if not able to find the records sents Empty Records. We should create separate JIRA to track this .The issue is not specific to log compaction but it will happen whenever we sent Records Empty from remote storage. |
@satishd @divijvaidya I have created separate ticket to track the scenarios when RemoteFetch return Empty Records. |
Empty records response is sent back to the consumer so that it can retry the request if needed. In a non-compacted topic, the records are expected to be present in the contiguous manner. Shall we list the scenarios where the record offsets won't be in contiguous manner for regular (non-compacted) topic (or) write a unit test to reproduce this problem? |
@divijvaidya @satishd @kamalcph |
@iit2009060 KAFKA-15388 is about addressing the fetch requests for tiered storage enabled topics that have compacted offsets in a partition before their cleanup policy is changed to delete. This PR only covers if the next offset is available only within the remote log segments. But there is a case when the offset is available in the subsequent local segments(not only in active local segments but in any of the local only segments) that have not yet been copied to remote storage. The solution does not address KAFKA-15388 completely. I am fine to merge this and have a followup PR for the mentioned scenario to resolve KAFKA-15388 addressing the compacted topics before tiered storage is enabled. |
@satishd Yes, as you mentioned the current PR is not covering reading from local log segments ,i.e. data not available in RemoteLogSegments and the reason can be
|
@iit2009060 We can syncup offline to understand your scenario better. |
@satishd Check the 5th step mentioned in the comment where I produced some data. In the 7th step I made an offset request with 1 and 6 both ,6th worked fine but the request with offset 1 goes through the step
Default configuration is latest in the console consumer. Even for the latest configuration it should work?. |
@iit2009060 In step 7, it is anticipated that setting the fetch offset to 1 should follow the specified process to trigger an
Sure.
|
@satishd I will try to write a integration or unit test to depicts the scenario I mentioned. However as far I noticed ,the additional messages I reproduced in step 5 were not able to retrieve with fetch offset 1 request and were able to proceed with offset 6. |
Thanks @kamalcph for the discussion.
@satishd We can merge the current PR if it is feasible. I will continue to work and see if i can reproduce the case mentioned by you around log compaction. And will open a separate PR once the issue is reproducible. |
@satishd @kamalcph @divijvaidya I am able to reproduce the issue which @satishd mentioned , I need to introduced a delay to the movement of segments to remote through a hacky code.
![]()
![]()
![]() ![]() ![]() @satishd @divijvaidya @kamalcph Let me know if you have any implementation suggestion to fix this use case . |
@iit2009060 As we discussed earlier in the comment, this issue is only applicable to compact policy and the behaviour on retention policy is as expected. As I mentioned in my earlier comment, I am fine to merge this PR which partially addresses the problem of compact policy by iterating through the remote log segments and we can have a followup PR on the remaining cases. We can discuss the possible cases and solutions in KAFKA-16088 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. It covers KAFKA-15388 partially by covering the next available segments if they exist in remote storage.
We have a follwup to cover local segments with KAFKA-16088
Merging it to trunk as the test failures are unrelated to this change. |
…pache#15060) Fetching from remote log segment implementation does not handle the topics that had retention policy as compact earlier and changed to delete. It always assumes record batch will exist in the required segment for the requested offset. But there is a possibility where the requested offset is the last offset of the segment and has been removed due to log compaction. Then it requires iterating over the next higher segment for further data as it has been done for local segment fetch request. This change partially addresses the above problem by iterating through the remote log segments to find the respective segment for the target offset. Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
…pache#15060) Fetching from remote log segment implementation does not handle the topics that had retention policy as compact earlier and changed to delete. It always assumes record batch will exist in the required segment for the requested offset. But there is a possibility where the requested offset is the last offset of the segment and has been removed due to log compaction. Then it requires iterating over the next higher segment for further data as it has been done for local segment fetch request. This change partially addresses the above problem by iterating through the remote log segments to find the respective segment for the target offset. Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
…pache#15060) Fetching from remote log segment implementation does not handle the topics that had retention policy as compact earlier and changed to delete. It always assumes record batch will exist in the required segment for the requested offset. But there is a possibility where the requested offset is the last offset of the segment and has been removed due to log compaction. Then it requires iterating over the next higher segment for further data as it has been done for local segment fetch request. This change partially addresses the above problem by iterating through the remote log segments to find the respective segment for the target offset. Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
…pache#15060) Fetching from remote log segment implementation does not handle the topics that had retention policy as compact earlier and changed to delete. It always assumes record batch will exist in the required segment for the requested offset. But there is a possibility where the requested offset is the last offset of the segment and has been removed due to log compaction. Then it requires iterating over the next higher segment for further data as it has been done for local segment fetch request. This change partially addresses the above problem by iterating through the remote log segments to find the respective segment for the target offset. Reviewers: Satish Duggana <satishd@apache.org>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Christo Lolov <lolovc@amazon.com>
Problem
Fetching from remote log segment implementation does not handled log compaction cases. It always assumes record batch will exist in the required segment for the requested offset. But there is a possibility where the requested offset is the last offset of the segment and has been removed due to log compaction. Then it requires to iterate over the next higher segment for further data as it has been done for local segment fetch request.
Fixes
Testing Strategy:
Steps followed for testing manually
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic arpit
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name arpit --add-config segment.bytes=100 bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name arpit --add-config cleanup.policy=compact
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name arpit --delete-config cleanup.policy=compact
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name arpit --add-config remote.storage.enable=true
cc @divijvaidya @satishd @showuon