Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion #15525

Merged
merged 12 commits into from Apr 8, 2024

Conversation

philipnee
Copy link
Collaborator

@philipnee philipnee commented Mar 12, 2024

A subtle difference in the behavior of the two API causes the failures with Invalid negative timestamp.

In this PR, the list offsets response will be processed differently based on the API. For beginingOffsets/endOffsets - the offset response should be directly returned.

For offsetsForTimes - A OffsetAndTimestamp object is constructed for each requested TopicPartition before being returned.

The reason beginningOffsets and endOffsets - We are expecting a -1 timestamp from the response which subsequently causes the invalid timestamp exception because the original code tries to construct an OffsetAndTimestamp object upon returning.

In this PR, the following missing tasks are added:

  1. short-circuit both beginningOrEndOffsets
  2. Test both API (beginningOrEndOffsets, OffsetsForTime)
  3. Seems like we don't have tests for this API: Note it is presented in other IntegrationTests but they are added to test Async consumer

@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch 3 times, most recently from c103fdd to fc04cc0 Compare March 18, 2024 23:07
@philipnee philipnee changed the title Ctr end offsets shoud not throw KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion Mar 19, 2024
@philipnee philipnee marked this pull request as ready for review March 22, 2024 04:40
@philipnee
Copy link
Collaborator Author

Hey @lucasbru - Would it be possible to ask you to review this PR? Thanks!

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly looking good to me. I'm just wondering if we could get by without using generics here, as there are just two cases.

@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch from bfadb18 to e76789d Compare March 25, 2024 20:15
@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch from e76789d to 2f973a3 Compare March 25, 2024 20:42
@philipnee
Copy link
Collaborator Author

Hey @lucasbru - Thanks for taking the time to review this PR. Let me know if there's anything to add to the PR.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. One Q about timeout handling (sorry, didn't notice it before). Otherwise looking good to me.

Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = applicationEventHandler.addAndGet(

// shortcut the request if the timeout is zero.
if (timeout.isZero()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, only noticed this now, but the original consumer seems to send a list offset request even if the timeout is 0, and you are specifically introducing code to avoid that. Isn't that going against what Kirk is trying to achieve? cc @kirktrue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to dig into this a bit. So short-cutting is definitely the right thing to do, since otherwise we'll run into a time-out exception. The old consumer is a bit weird in that it fires the list offset request, but never returns a result. But I also couldn't find a cache that that is influenced by the list offsets request, so what's the point of sending the request?

Replicating the old consumer behavior would mean creating the event for the background thread, but not waiting for the result. We can consider changing the behavior here, but let's make sure we do it consciously.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lucasbru - It seems like one of the handlers would also update the subscription state upon completion. See the snippet below:

                public void onSuccess(ListOffsetResult value) {
                    synchronized (future) {
                        result.fetchedOffsets.putAll(value.fetchedOffsets);
                        remainingToSearch.keySet().retainAll(value.partitionsToRetry);

                        offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, isolationLevel);
                    }
                }

I think addAndGet seems to be sufficient to handle such logic so I'll revert this code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scratch off the previous comment - addAndGet actually doesn't. We will need to explicitly return an empty result. See the code change.

@philipnee
Copy link
Collaborator Author

@lucasbru - Thanks again for reviewing the PR. Sorry about the misinterpretation on short circuting logic so here I updated the beginningOrEndOffsets API. It seems like the right thing to do here is to still send out the request but return it immediately for zero timeout (a bit strange because it does throw timeout when time runs out which seems inconsistent).

*/
public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, OffsetAndTimestamp>> {

public class ListOffsetsEvent extends CompletableApplicationEvent<Map<TopicPartition, Long>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KInd of a general comment looking for simplification: couldn't we just have an internal new class OffsetAndTimestampInternal (better named), that allows negatives and knows how to build an OffsetAndTimestamp? Seems to solve the problem we have, without having to split the ListOffsets into 2 events, with separate paths for beginning/endOffsets and offsetsForTimes, where in reality they have everything in common, except for the object we use to encapsulate the result (same result). These new splitted path leak down to the OffsetsManager event, when in reality, at the request/response level the manager is responsible for, everything is the same for both paths. With this approach the change would only be at the API level, on the consumer, where the result of the event would build the map with Longs for the beginning/end, or the map with OffsetAndTimestamp for the offsetsForTimes (data is the same, we just need to change how we return it).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, sounds like a good idea to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm personally not concerned about having two events, because they are very simple. The alternative is to have a common code-path that carries a requiresTimestamp boolean to differentiate behavior again, which isn't really any simpler. But I agree there is a certain amount of code duplication here that we could eliminate using your approach @lianetm , so I'm not against it.

@lucasbru
Copy link
Member

@lucasbru - Thanks again for reviewing the PR. Sorry about the misinterpretation on short circuting logic so here I updated the beginningOrEndOffsets API. It seems like the right thing to do here is to still send out the request but return it immediately for zero timeout (a bit strange because it does throw timeout when time runs out which seems inconsistent).

Yes, the behavior of the existing consumer is a bit curious, but it's not the only place where a zero duration is treated different from 0.000001s. Either way, we probably have to do it this way for compatibility. This part looks good to me now.

@philipnee
Copy link
Collaborator Author

philipnee commented Mar 27, 2024

@lucasbru - If I'm not mistaken, the current implementation for both beginningOrEndOffsets and OffsetsForTimes both need to send out a request upon getting ZERO duration. Seems like both code paths are invoking this logic

            // if timeout is set to zero, do not try to poll the network client at all
            // and return empty immediately; otherwise try to get the results synchronously
            // and throw timeout exception if it cannot complete in time
            if (timer.timeoutMs() == 0L)
                return result;

But the offsetsForTime in the AsyncConsumer seems to short circuit it here:

            // If timeout is set to zero return empty immediately; otherwise try to get the results
            // and throw timeout exception if it cannot complete in time.
            if (timeout.toMillis() == 0L)
                return listOffsetsEvent.emptyResult();

            return applicationEventHandler.addAndGet(listOffsetsEvent, timer);

Here is the ticket: https://issues.apache.org/jira/browse/KAFKA-16433

@lucasbru
Copy link
Member

@philipnee Okay, thanks for creating the ticket. Not sure if it's blocker priority though. If it's a quick thing, you could address it in this PR.

Are you going to implement Lianets suggestion?

@philipnee
Copy link
Collaborator Author

hi @lucasbru - Let me address Lianets comment in this PR and have a separated PR for the behavior inconsistency as it does require some changes to the unit test

wip

wip

not working

refactor p1

wip

tmp

wip

tmp

Update OffsetsRequestManager.java

blah

clean up

catch the exception and do nothing
clean up

clean up

Update OffsetsRequestManagerTest.java
Update based on comments

wip
@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch 2 times, most recently from 6834721 to 0bde117 Compare March 29, 2024 04:54
Update AsyncKafkaConsumer.java
@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch from 0bde117 to 6fb9bd8 Compare March 29, 2024 04:58
@@ -946,7 +956,7 @@ public void testOffsetsForTimesWithZeroTimeout() {
@Test
public void testWakeupCommitted() {
consumer = newConsumer();
final HashMap<TopicPartition, OffsetAndMetadata> offsets = mockTopicPartitionOffset();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just cleaning up.

@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch 3 times, most recently from 0abf86d to 80e6903 Compare March 29, 2024 05:35
@philipnee
Copy link
Collaborator Author

@lucasbru - Thanks for taking time reviewing this PR. This PR is ready for another pass.

Update ListOffsetsEvent.java

clean up

clean up
@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch from 80e6903 to 8205c79 Compare March 29, 2024 06:22

Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;
if (timeout.isZero()) {
applicationEventHandler.add(listOffsetsEvent);
Copy link
Contributor

@lianetm lianetm Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if I get it right we are intentionally leaving this? generating an event to get offsets, when in the end we return right away without waiting for a response? I do get that the old consumer does it, and I could be missing the purpose of it, but seems to me an unneeded request, even considering the side effect of the onSuccess handler. The handler just updates the positions to reuse the offsets it just retrieved, and it does make sense to reuse the result when we do need to make a request, but I wouldn't say we need to generate an unneeded event/request just for that when the user requested offsets with max-time-to-wait=0.

In any case, if we prefer to keep this, I would suggest 2 things:

  1. to add a comment explaining why (handler), because it looks like a weird overhead to add the event and return,
  2. to be consistent and generate the event also in the case of the offsetsForTimes before the early return (ln 1104). In the case of the old consumer, it's a common logic so both path, offsetsForTimes and beginning/endOffsets do the same request+return

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @lianetm thanks for the comment. There's a ticket to align the behavior of the two apis per your suggestions there. The plan is to do that in a separated pr. https://issues.apache.org/jira/browse/KAFKA-16433

Back to your first comment, it is not immediately obvious to see why people use these two apis with zero timeout. The only thing sensible thing it does to updating the local highwatermark as you mentioned. I think it is worth addressing this ambiguity after 4.0 release. So I'll leave a comment per your request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation! Totally ok to tackle it with that separate Jira.

import java.util.Optional;

/**
* Internal representation of {@link OffsetAndTimestamp}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add : Internal representation of {@link OffsetAndTimestamp} that allows negative offsets

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamps I assume.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm...what OffsetsAndTimestamp does not allow is negative offsets here, and that's the requirement this new one is removing. Am I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually both! he he, so let's maybe add negative offsets and timestamps

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is negative timestamp in the response causing org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Invalid negative timestamp. More specifically is this part that was complaining:

if (timestamp < 0)
            throw new IllegalArgumentException("Invalid negative timestamp");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, agree that's the failure we noticed on the sys tests, but conceptually we're creating a new OffsetAndTimestampInternal class that is the same as the existing OffsetAndTimestamp, with the only difference that the former does not throw on negative offsets or negative timestamps, right? so for the class doc makes sense to mention it.

static Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult(final Map<TopicPartition, Long> timestampsToSearch,
final Map<TopicPartition, ListOffsetData> fetchedOffsets) {
HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size());
static <T> Map<TopicPartition, T> buildListOffsetsResult(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This generic buildListOffsetsResult is currently only being used from buildOffsetsForTimesResult, was the intention to used it also from buildOffsetsForTimeInternalResult?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.


@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have split the consumer tests into separate files grouped by feature, and there is now one PlaintextConsumerCommitTest, I would expect this test should go there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an error from rebase. so this should be removed from the PR. Thanks for catching this.


@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including the func name we're testing (offsetsAndTimestamps) would probably make the test name clearer... maybe something around testOffsetsAndTimestampsTargetTimestamps?

Copy link
Collaborator Author

@philipnee philipnee Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe testFetchOffsetsForTime, which already implies searching at a given timestamps.

@@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends AbstractConsumerTest {
consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, startingOffset = 5, startingTimestamp = startingTimestamp)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this one (and the one below) are related to partition's offsets, not committed offsets, so I would say they need to stay in the PlaintextConsumer, where you had them (I was only suggesting to move the testSubscribeAndCommitSync here, because it relates to committed offsets)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - wasn't looking carefully at it. Putting things back to the original place.

Update AsyncKafkaConsumerTest.java

reverting unintentional changes
@philipnee philipnee force-pushed the ctr-end-offsets-shoud-not-throw branch from a6c9a42 to f6b7d48 Compare April 3, 2024 04:21
@philipnee
Copy link
Collaborator Author

hi @lianetm - Much appreciate for the reviews. I think I've addressed your comments. LMK if there's anything more. cc @lucasbru

@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one being removed intentionally? the suggestion was only to move it to the PlainTextConsumerCommit file, where all tests related to committing offsets are now. Ok for me if you think it's not worth keeping, but just to make sure it's intentional.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears in the PR because i didn't rebase correctly. You've actually moved the test to here:

def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All good then, just wanting to make sure we're not loosing it

@lianetm
Copy link
Contributor

lianetm commented Apr 3, 2024

Hey @philipnee, thanks for the updates, just one minor comment left above.

Copy link
Contributor

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @philipnee! Test failures seem unrelated. LGTM.

@lucasbru lucasbru merged commit 4e0578f into apache:trunk Apr 8, 2024
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants