Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist #11760

Merged
merged 10 commits into from
Mar 28, 2022

Conversation

tim-patterson
Copy link
Contributor

Unit test for case task assignment where no caught up nodes exist.
Existing unit and integration tests to verify no other behaviour has been changed

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@tim-patterson
Copy link
Contributor Author

@vvcephei

@tim-patterson tim-patterson changed the title Kafka 13600 Kafka Streams - Fall back to most caught up client if no caught up clients exist KAFKA-13600 Kafka Streams - Fall back to most caught up client if no caught up clients exist Feb 15, 2022
@tim-patterson tim-patterson changed the title KAFKA-13600 Kafka Streams - Fall back to most caught up client if no caught up clients exist KAFKA-13600: Kafka Streams - Fall back to most caught up client if no caught up clients exist Feb 15, 2022
@cadonna
Copy link
Contributor

cadonna commented Feb 15, 2022

Thank you for the PR @tim-patterson ! I going to have a look at it tomorrow!

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR @tim-patterson !

I looked at the production code. I like your approach! I left some comments.

This or next week I will look at the test code.

final Map<UUID, ClientState> clientStates,
final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
final Map<TaskId, List<UUID>> tasksToClientByLag) {
final List<UUID> taskClients = requireNonNull(tasksToClientByLag.get(task), "uninitialized map");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final List<UUID> taskClients = requireNonNull(tasksToClientByLag.get(task), "uninitialized map");
final List<UUID> taskClients = requireNonNull(tasksToClientByLag.get(task), "uninitialized list");

Comment on lines 119 to 131
if (sourceClient == null) {
sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
}

if (sourceClient == null) {
sourceClient = requireNonNull(
mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
"Tried to move task to more caught-up client but none exist"
);
}

if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
// there's not a standby available to take over the task, so we'll schedule a warmup instead
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code is a bit hard to read. After it is clear that there is a client with a standby after line 117 (if sourceClient != null), you could execute the content of the else branch on line 140. If there is no such client, then you can check for the caught up and most caught up client. Something like:

if (sourceClient != null) {
    swapStandbyAndActive(...)
}  else {
    sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
    if (sourceClient == null) {
        sourceClient = requireNonNull(
            mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
            "Tried to move task to more caught-up client but none exist"
        );
    }
    moveActiveAndTryToWarmUp(...)
}
caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sort of originally did something like that , but a unit test caused a runtime assertion.

Basically there's ~5 cases.

  1. Caught up client with standby
  2. Caught up client without standby
  3. Not Caught up client with standby
  4. Not Caught up client without standby
  5. No client found (runtime assertion).

So the sourceClient from mostCaughtUpEligibleClient(...) may actually need to call swapStandbyAndActive(...) depending on if it's actually got a standby assigned to it.

I guess can do something like

if (sourceClient != null) {
    swapStandbyAndActive(...)
}  else {
    sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
    if (sourceClient != null) {
        moveActiveAndTryToWarmUp(...)
    } else {
        sourceClient = requireNonNull(
            mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination),
            "Tried to move task to more caught-up client but none exist"
        );
        if (clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
            swapStandbyAndActive(...)
        } else {
            moveActiveAndTryToWarmUp(...)
        }
    }
}
caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I totally missed case 3.

What about the following:

if (sourceClient != null) {
    swapStandbyAndActive(...);
}  else {
    sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
    if (sourceClient != null) {
        moveActiveAndTryToWarmUp(...);
    } else {
        sourceClient = mostCaughtUpEligibleClient(tasksToClientByLag, movement.task, movement.destination);
        if (sourceClient != null) {
            if (clientStates.get(sourceClient).hasStandbyTask(movement.task)) {
                swapStandbyAndActive(...);
            } else {
                moveActiveAndTryToWarmUp(...);
            }
        } else {
            throw new IllegalStateException("Tried to move task to more caught-up client but none exist");
        }
    }
}
caughtUpClientsByTaskLoad.offerAll(asList(sourceClient, movement.destination));

I think it makes the cases more explicit in the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could even make it more explicit by introducing methods like:

private boolean tryToSwapStandbyAndActiveOnCaughtUpClient(...);
private boolean tryToMoveToCaughtUpClientAndTryToWarmUp(...);
private boolean tryToSwapStandbyAndActiveOnMostCaughtUpClient(...);
private boolean tryToMoveToMostCaughtUpClientAndTryToWarmUp(...);

But that is optional and up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done it with the nested if/else's but stopped reusing the same sourceClient var to try make it a little clearer.
Let me know what you think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is fine as you did. However, the method is already over 100 lines long. Maybe the code would benefit from a bit of refactoring. The method was already quite long before you added your part. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure let me have a bit of a play around.
All these methods being static means that there's a lot of state/method arguments to pass to each method call.
So I'm unsure about how much of a win we're going to get extracting smaller bits of code out into separate methods.
Maybe some closures/local functions might help....
I'll have a bit of a play and get back to you :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great! Thanks!

final UUID client,
final Map<UUID, ClientState> clientStates,
final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients,
final Map<TaskId, List<UUID>> tasksToClientByLag) {
Copy link
Contributor

@cadonna cadonna Feb 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to use a SortedSet instead of a List here? Would make it clearer that client IDs should only appear once in this list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I've pushed up a commit that does this.
My only concern is that these SortedSet's now end up holding a reference to clientStates rather than just being a plain old list/set etc.

@cadonna
Copy link
Contributor

cadonna commented Feb 22, 2022

I run the Streams system tests on this branch twice yesterday and got twice the same failures.

See http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2022-02-21--001.system-test-kafka-branch-builder--1645479435--tim-patterson--KAFKA-13600-2--7b1e8797de/report.html

It might also be the tests, but it requires investigation.

To run the system tests locally follow these instructions:
https://github.com/apache/kafka/blob/trunk/tests/README.md

@tim-patterson
Copy link
Contributor Author

Thanks @cadonna
Those tests(Well at least one of them) seem to fail on trunk too.

I've done some digging
The tests broke on 34208e (22nd jan) due to a typo,
But once the typo was fixed 44fcba (9th feb), the real test failures started.

By doing a git bisect but checking out checking out the dockerfile with the typo fixed it looks like
e6db0c KAFKA-13598: enable idempotence producer by default and validate the configs
Is the commit breaking these tests

@cadonna
Copy link
Contributor

cadonna commented Feb 23, 2022

Thanks a lot for the investigation!
Before I wrote my last comment I had a quick look at other system test runs on trunk and thought they were passing, but apparently I had a too quick look and missed something.

@showuon Are you aware of any issues with your PR mentioned above?

@cadonna
Copy link
Contributor

cadonna commented Feb 23, 2022

The most recent system test run on trunk I could find on Jenkins was one on February 9th. This run does not contain the system test failures I mentioned above.

@tim-patterson Can you consistently reproduce the failures locally on trunk?

@tim-patterson
Copy link
Contributor Author

tim-patterson commented Feb 23, 2022

The most recent system test run on trunk I could find on Jenkins was one on February 9th. This run does not contain the system test failures I mentioned above.

@tim-patterson Can you consistently reproduce the failures locally on trunk?

Yes I seem to be able to, they're all timeout failures so maybe the extra latency of waiting for acks=all is enough to tip it over a timeout locally... Is there anyway to see what commit that system test from trunk actually ran on?

So here's a trunk commit from yesterday

tpatterson@Tims-MacBook-Pro-2 kafka % git status
On branch trunk
Your branch is up to date with 'origin/trunk'.

nothing to commit, working tree clean
tpatterson@Tims-MacBook-Pro-2 kafka % git log --oneline | head -n2
a5bb45c11a KAFKA-13511: Add support for different unix precisions in TimestampConverter SMT (#11575)
576496a1ca MINOR: Improve Connect docs (#11642)

tpatterson@Tims-MacBook-Pro-2 kafka % tests/docker/ducker-ak down && ./gradlew clean && REBUILD="t" TC_PATHS="tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance" _DUCKTAPE_OPTIONS='--parameters '\''{"upgrade_from_version":"0.10.0.1"}'\' bash tests/docker/run_tests.sh

...

================================================================================
SESSION REPORT (ALL TESTS)
ducktape version: 0.8.1
session_id:       2022-02-23--001
run time:         2 minutes 21.028 seconds
tests run:        1
passed:           0
failed:           1
ignored:          0
================================================================================
test_id:    kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test.StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance.upgrade_from_version=0.10.0.1
status:     FAIL
run time:   2 minutes 20.921 seconds


    TimeoutError("Never saw 'Processed [0-9]* records so far' message ducker@ducker07")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 133, in run
    data = self.run_test()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 190, in run_test
    return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 429, in wrapper
    return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py", line 93, in test_upgrade_to_cooperative_rebalance
    verify_running(processor, self.processing_message)
  File "/opt/kafka-dev/tests/kafkatest/tests/streams/utils/util.py", line 19, in verify_running
    monitor.wait_until(message,
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/remoteaccount.py", line 707, in wait_until
    return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % (self.offset + 1, self.log, pattern),
  File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 41, in wait_until
    raise TimeoutError(err_msg() if callable(err_msg) else err_msg)
ducktape.errors.TimeoutError: Never saw 'Processed [0-9]* records so far' message ducker@ducker07

@tim-patterson
Copy link
Contributor Author

I wonder if this is the fix https://issues.apache.org/jira/browse/KAFKA-13673
(Not sure what tests in particular it's trying to fix)

@showuon
Copy link
Contributor

showuon commented Feb 24, 2022

@tim-patterson @cadonna , yes, we are aware of the failed system tests because someone reported last week: #11769. And yes, we have a PR (#11788) ready to fix it. Thanks.

@cadonna
Copy link
Contributor

cadonna commented Feb 24, 2022

Thank you both for clearing this up! I also run the system tests on trunk and got the same failures.
http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2022-02-23--001.system-test-kafka-branch-builder--1645642736--apache--trunk--6f09c3f88b/report.html

@tim-patterson
Copy link
Contributor Author

Hi @cadonna,
Sorry day job stuff got in the way for a bit.
I've merged in trunk and the streams integration tests all run locally for me now.

I also split up that big method with a few helpers, WDYT?

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tim-patterson Thank you for the updates!

Here my feedback!

// caught up client.
final boolean moved = tryToSwapStandbyAndActiveOnCaughtUpClient(clientStates, caughtUpClientsByTaskLoad, movement) ||
tryToMoveActiveToCaughtUpClientAndTryToWarmUp(clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement) ||
tryToMoveActiveToMostCaughtUpClient(tasksToClientByLag, clientStates, warmups, remainingWarmupReplicas, caughtUpClientsByTaskLoad, movement);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests to TaskMovementTest for case tryToMoveActiveToMostCaughtUpClient() that you added?

@tim-patterson
Copy link
Contributor Author

@cadonna Hey Bruno, how's this?

Copy link
Contributor

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tim-patterson !

LGTM!

I have Streams system tests and benchmarks running on this PR. Once they are done and they do not show any issue, I will merge the PR.

@cadonna
Copy link
Contributor

cadonna commented Mar 28, 2022

Streams system tests are green. Going to merge.

@cadonna cadonna merged commit 110bcca into apache:trunk Mar 28, 2022
cadonna pushed a commit that referenced this pull request Mar 28, 2022
… caught up clients exist (#11760)

The task assignor is modified to consider the Streams client with the most caught up states if no Streams client exists that is caught up, i.e., the lag of the states on that client is less than the acceptable recovery lag.  

Unit test for case task assignment where no caught up nodes exist.
Existing unit and integration tests to verify no other behaviour has been changed

Co-authored-by: Bruno Cadonna <cadonna@apache.org>

Reviewer: Bruno Cadonna <cadonna@apache.org>
@cadonna
Copy link
Contributor

cadonna commented Mar 28, 2022

Cherry-picked to 3.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants