Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8179: Part 2, ConsumerCoordinator Algorithm #6778

Merged
merged 41 commits into from Jun 23, 2019

Conversation

guozhangwang
Copy link
Contributor

@guozhangwang guozhangwang commented May 20, 2019

  1. In ConsumerCoordinator, select the protocol as the common protocol from all configured assignor instances' supported protocols with the highest number.

1.b. In onJoinPrepare: only call onPartitionRevoked with EAGER.
1.a. In onJoinComplete: call onPartitionAssigned with EAGER; call onPartitionRevoked following onPartitionAssigned with COOPERATIVE, and then request re-join if the error indicates so.
1.c. In performAssignment: update the user's assignor returned assignments by excluding all partitions that are still owned by some other members.

  1. I've refactored the Subscription / Assignment such that: assigned partitions, error codes, and group instance id are not-final anymore, instead they can be updated. For the last one, it is directly related to the logic of this PR but I felt it is more convienent to go with other fields.

  2. Testing: primarily in ConsumerCoordinatorTest, make it parameterized with protocol, and add necessary scenarios for COOPERATIVE protocol.

I intentionally omitted the documentation change since there are some behavioral updates that needs to be finalized in later PRs, and hence I will also only add the docs in later PRs.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…pache#6511)

As described in KIP-443 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics). We want to remove the aggressive overrides of segment.ms and segment.index.bytes for repartition topics. The remaining segment.bytes should still be effective in bounding its footprint.

Reviewers: Bill Bejeck <bbejeck@gmail.com>
@guozhangwang guozhangwang changed the title KAFKA-8179: Part 2, ConsumerCoordinator Algorithm [WIP] KAFKA-8179: Part 2, ConsumerCoordinator Algorithm May 21, 2019
@guozhangwang
Copy link
Contributor Author

retest this please

// 2. if there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the
// id number indicates how advanced is the protocol).
// we know there are at least one assignor in the list, no need to double check for NPE
List<RebalanceProtocol> supportedProtocols = new ArrayList<>(assignors.get(0).supportedProtocols());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use java doc style comment /** here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In AK we usually only use /** for function head javadoc.

// 1. only consider protocols that are supported by all the assignors. If there is no common protocols supported
// across all the assignors, throw an exception.
// 2. if there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the
// id number indicates how advanced is the protocol).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/is the protocol/the protocol is


switch (protocol) {
case EAGER:
if (!ownedPartitions.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a possible state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not possible state, and hence I'm logging a WARN. I can also just throw an exception actually.

@@ -141,7 +141,7 @@ private static String toString(final byte[] b, int off, int len) {
}

/**
* A byte array comparator based on lexicograpic ordering.
* A byte array comparator based on lexicographic ordering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice typo fix

}

if (supportedProtocols.isEmpty()) {
throw new IllegalArgumentException("Specified assignors do not have commonly supported rebalance protocol");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to include assignor value here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

@@ -434,6 +435,10 @@ public static void isValidClusterId(String clusterId) {
return list;
}

public static <T> Set<T> toSet(Collection<T> collection) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we could use this for a lot of refactoring :)

log.info("Setting newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));

switch (protocol) {
case EAGER:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight readability improvement: could we factor this into a method, maybe #assignPartitions? We could also then have just #revokePartitions and the cooperative case could call both -- this way it's immediately clear what each protocol does/how they differ

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to update it in the way you suggested, but it turns out a bit tricky since for some cases we do want to update the owned-partitions state if COOPERATIVE is used, while for some others we do not rely on the protocol to decide if the owned partitions would be revoked or not..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree you'd have to factor out the logic of what exactly is being revoked/assigned into the switch statement and then call assign/revoke on the appropriate set of partitions. Might be worth doing if we add another protocol but for now it's not that important

@guozhangwang
Copy link
Contributor Author

@ableegoldman @mjsax @hachikuji @abbccdda ready for another review. Also updating the description on the top to summarize the change here.

@@ -221,7 +221,7 @@ public void send(ClientRequest request, long now) {
builder.latestAllowedVersion());
AbstractRequest abstractRequest = request.requestBuilder().build(version);
if (!futureResp.requestMatcher.matches(abstractRequest))
throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest);
throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest + " with prepared response " + futureResp.responseBody);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break up long line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman Sorry I missed these comments. Will address in the pt.3 PR.

Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few nits, but overall Part 2 LGTM

@@ -176,6 +196,25 @@ public void teardown() {
this.coordinator.close(time.timer(0));
}

@Test
public void testSelectRebalanceProtcol() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void testSelectRebalanceProtcol() {
public void testSelectRebalanceProtocol() {

}

@Test
public void testMetadataRefreshDuringRebalance() {
final String consumerId = "leader";
final List<TopicPartition> owned = Collections.emptyList();
final List<TopicPartition> oldAssigned = Arrays.asList(t1p);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(client.hasPendingResponses());
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: all on same line (or all separate)?


subscriptions.subscribe(singleton(topic1), rebalanceListener);
try (ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors,
true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

@@ -2294,7 +2399,7 @@ private JoinGroupResponse joinGroupLeaderResponse(int generationId,
.setProtocolName(partitionAssignor.name())
.setLeader(memberId)
.setMemberId(memberId)
.setMembers(Collections.emptyList())
.setMembers(metadata)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a piggy-backed fix: we should encode the metadata for leaders.

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of minor nits but LGTM.

switch (protocol) {
case EAGER:
if (!ownedPartitions.isEmpty()) {
log.info("Coordinator has owned partitions {} that are not revoked with {} protocol, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this slip? Seems it should be a WARN as you said above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally make it an exception since it should not happen, but later in unit test I realized it could normally happen if we have a wakeup exception during rebalance listener call, and therefore I change it to INFO


// assume all higher versions can be parsed as V1
default:
return buildSubscriptionV1(buffer, groupInstanceId);
return deserializeSubscriptionV1(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Although not strictly part of the PR, this part isn't covered in the unit test. Is it worth mocking out future version /different protocol to make sure this works as intended? Same for serializeSubscription

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more? I thought future-proof is covered in deserializeFutureSubscription/AssignmentVersion`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I missed the tests deserializeFutureSubscription/AssignmentVersion and was looking at the coverage of de/serializeAssignment, and the default branch isn't hit, but the individual methods themselves are covered.

@guozhangwang guozhangwang merged commit e5c4ebd into apache:trunk Jun 23, 2019

protocol = supportedProtocols.get(supportedProtocols.size() - 1);
} else {
protocol = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this break in the switch below? Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a check at the beginning of ConsumerCoordinator.poll:

            if (protocol == null) {
                throw new IllegalStateException("User confingure ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG to empty " +
                    "while trying to subscribe for group protocol to auto assign partitions");
            }

Basically if user does not call subscribe() it is okay to not set the protocol at all; otherwise we will throw error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. We also have a check in subscribe to ensure that the set of assignors is not empty. There might be a way to remove the redundant checking.

By the way, there's a typo above: confingure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we can indeed remove it:

  1. if no assignors specified, then subscribe will throw.
  2. if assignors specified but no common supported protocol, then constructor will throw.

Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);
Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), subscriptions);

switch (protocol) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since we're not doing anything in the EAGER case, couldn't we simplify this:

if (protocol == COOPERATIVE)
  adjustAssignment(ownedPartitions, assignments)

Similarly in onJoinPrepare

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack here; For onJoinPrepare though in the pt.3 this is modified a bit since with EAGER we may still revoke.

case EAGER:
if (!ownedPartitions.isEmpty()) {
log.info("Coordinator has owned partitions {} that are not revoked with {} protocol, " +
"it is likely client is woken up before a previous pending rebalance completes its callback", ownedPartitions, protocol);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this log message is more confusing than useful. Maybe it would be better to log a message below in the Wakeup/Interrupt exception handler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.


}

private void assignAndRevoke(final ConsumerRebalanceListener listener,
Copy link
Contributor

@hachikuji hachikuji Jun 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to have some high-level documentation somewhere in the code about the cooperative strategy and how it works. Perhaps this already exists? At a minimum, we should explain in the ConsumerRebalanceListener docs how the two strategies affect the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah my plan is to add the javadoc change in pt.3, and will do another doc PR for web-docs / upgrade-path; the rationale is that some of the behavior changes are yet to be agreed on in pt.3 so I'd like to defer that to later.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few comments.

break;

case COOPERATIVE:
adjustAssignment(ownedPartitions, assignments);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A brief description of the purpose of this adjustment would be helpful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@@ -186,6 +182,10 @@ public ByteBuffer userData() {
return userData;
}

public void setGroupInstanceId(Optional<String> groupInstanceId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turning a final field into a mutable field seems like a step backwards... Can you explain why this is needed?

* By default it should always work with {@link RebalanceProtocol#EAGER}.
*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}

/**
* Return the version of the assignor which indicate how the user metadata encodings
* Return the version of the assignor which indicates how the user metadata encodings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I mentioned this before as well, but can we elaborate on the use of this version? This is very vague.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version of the assignor would be used at the broker-side to select the leader as the one with the highest version, expecting it be able to decode assignor's older subscription user metadata.

I will update the javadoc as well after the broker's logic has been updated accordingly.

return error;
}

public void updatePartitions(List<TopicPartition> partitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these methods have to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, it needs to be default-access.

ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
groupAssignment.put(assignmentEntry.getKey(), buffer);
}

return groupAssignment;
}

private void adjustAssignment(final Map<TopicPartition, String> ownedPartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have discussed this in the past, but I'm not too thrilled with the consumer mucking around with assignments from the plugin. Couldn't we just raise an error if the assignment doesn't respect ownership?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we just raise an error if the assignment doesn't respect ownership?

Hmm, I think that's doable. But on the other hand we are requiring users to be aware how to leverage on owned-partitions list and respect it with COOPERATIVE.

I see pros and cons of this approach: not silently mucking around user assignor returned data, but pushing the responsibility of implementing COOPERATIVE-supported assignors to users themselves. @ableegoldman wdyt?

private final ByteBuffer userData;
private final ConsumerProtocol.Errors error;
private ConsumerProtocol.AssignmentError error;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the assignor need to be aware of this error? If it is just for internal use, can we leave it out of the public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, ack.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants