Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2555: Infinite recursive function call when call commitSync in … #221

Closed
wants to merge 4 commits into from

Conversation

becketqin
Copy link
Contributor

@hachikuji @ewencp I found this problem when adding new consumer to mirror maker which commits offset in the rebalance callback. It is not clear to me why we are triggering rebalance for commitSync() and fetchCommittedOffset(). Can you help review to see if I miss something?

Regarding commitSync, After each poll() the partitions will be either assigned to a consumer or it will be already revoked. As long as user is using internal offset map, the offset map will always be valid. i.e. the offset map will always only contain the assigned partitions when commitSync is called. Hence there is no need to trigger a rebalance in commitSync().

The same guarantee also apply to fetchCommittedOffset(), isn't the only requirement is to ensure we know the coordinator?

Another related issue is that today the IllegalGenerationIdException is a bit confusing. When we receive an IllegalGenerationIdException from heartbeat, we need to use that same generation Id to commit offset and the coordinator will take it. So the generation ID was not really illegal. I will file a ticket for this issue.

@asfbot
Copy link

asfbot commented Sep 18, 2015

kafka-trunk-git-pr #443 FAILURE
Looks like there's a problem with this pull request

@hachikuji
Copy link
Contributor

@becketqin I thought about this before, but the implications from removing the partition assignment check were not very clear to me. I think there are basically three cases to consider:

  1. The current generation is still active.
  2. There is a rebalance in progress.
  3. A rebalance was previously completed (and the member was kicked out).

For (1) and (2), this change appears to work fine. The coordinator should accept the commits and we will have a chance to rebalance the next time poll() is invoked. However, for (3), I'm not sure we'll recover correctly since the commits will always return either UNKNOWN_CONSUMER or ILLEGAL_GENERATION, which are both designated retriable. The loop will spin forever since it no longer has a way to refresh the generation.

One of the problems is that the current "retriable" notion is pretty limited. In fact, those errors are only retriable in the context of a JoinGroup. For everything else, they are not retriable. My suggestion would probably be to make them non-retriable, and then to add special checks in the join group loop to check for them. This would mean that the illegal generation errors in commitSync would get propagated to the user, which is probably a lot better than trying to force through the old commit on a new generation. What do you think?

Also, I agree that use of illegal generation is a little confusing. I would rather reserve that error for when the generation is officially superseded. Instead of using illegal generation in the heartbeat response to indicate a need to rebalance, maybe we can have a REJOIN_NEEDED error or something like that.

@becketqin
Copy link
Contributor Author

@hachikuji Thanks for the explanation. Case (3) is actually a failure case, the consumer has already been kicked out of the group. Letting a commit offset trigger a rebalance and proceed might cause some issue. For example:

  1. Consumer 1 is consuming from 100 from partition 1. Consumer 2 is consuming from 100 from partition 2.
  2. Consumer 2 somehow was kicked out of the group
  3. Consumer 1 now consumes from offset 200 from partion1 and 200 from partion2.
  4. Consumer 2 commits offset 100.
  5. In this case if we let consumer 2 proceed the offset of partition 2 will rewind.

Personally I prefer to have the rebalance triggered by IllegalGenerationID only happen in heartbeat. In other cases, that is actually an error user needs to be aware of, perhaps throwing an exception is better than swallowing the error.

Completely agree with the ambiguity of IllegalGenerationIdException. I have created a ticket KAFKA-2557 and Onur has a PR for it.

@hachikuji
Copy link
Contributor

@becketqin I think we agree that commitSync should not rebalance. However, in order to handle case (3), we need to make IllegalGenerationException and UnknownConsumerIdException not extend RetriableException. Otherwise, commitSync will be stuck in the loop (since it will keep retrying) and will never propagate the error to the user. Does that make sense?

@becketqin
Copy link
Contributor Author

@hachikuji Yes, that makes sense. Good catch about the infinite while loop.
So we are going to only make REBALANCE_IN_PROGRESS retriable, and ILLEGAL_GENERATION to be non-retriable. I am going to wait for that change get checked in. Sounds good?

@hachikuji
Copy link
Contributor

@becketqin Looks like you might need a rebase (sorry!), but the REBALANCE_IN_PROGRESS patch was merged, so you should be good to go. I think @guozhangwang maybe can help check in the fix when it's ready since he was also running into this problem.

@becketqin
Copy link
Contributor Author

@hachikuji I will need to change both UnknownConsumerException and IllegalGenerationIdException to Non-retriable.

There are two scenarios:

  1. A consumer that has been kicked out of the group wants to commit offsets. In this case, we should throw exception rather than retry.
  2. A coordinator failover happened (in the current checked in code it is as if all the consumers are kicked out of the group), we need to retry.

Today a consumer is not able to distinguish between those two scenarios. But I think moving forward, we should persist the group metadata for coordinator failover. The coordinator failover should either throw a GroupMetadataLoadingInProgressException or hold the offset commit request until the failover finish.

If we follow that path then making UnknownConsumerIdException and IllegalGenerationIdException non-retriable is reasonable. Today, making both of them non retriable means that we expose the coordinator failover to the users in (2), so user needs rejoin the group and retry commit with correct offset. It is a little bit ugly but not too big a deal. Do you have any concern on this?

@hachikuji
Copy link
Contributor

@becketqin That's a good point about the coordinator failure case. Unless we have persistence, the new coordinator will have no choice but to reject the commits, and the only thing the client can do is propagate the error to the user. This pretty much guarantees that broker failures (or even clean broker shutdowns) will generally lead to duplicate consumption, which is unfortunate. Do you guys think this problem is serious enough to push for persistence in the initial release? Either way, it seems like making IllegalGeneration and UnknownConsumer non-retriable is the right thing to do since rebalancing in commit is not really an option.

@becketqin
Copy link
Contributor Author

@hachikuji I don't have a strong opinion on whether we should include it in the initial release or not. It is ugly but not a disaster. User needs to handle duplicate anyway. But we have to educate user what to do when they receive those two exceptions.

@becketqin
Copy link
Contributor Author

@guozhangwang Could help take a look at this patch? It seems both of us are blocking on this problem now...

@asfbot
Copy link

asfbot commented Sep 24, 2015

kafka-trunk-git-pr #531 FAILURE
Looks like there's a problem with this pull request

@hachikuji
Copy link
Contributor

@becketqin Since we made those exceptions non-retriable, I think we have to also fix Coordinator.reassignPartitions() so that we don't propagate UnknownConsumerException when doing the actual rebalance. It's ugly, but something like this would probably work:

RequestFuture<Void> future = sendJoinGroupRequest();
client.poll(future);

if (future.failed()) {
    if (future.exception() instanceof UnknownConsumerException)
        continue;
    else if (future.isRetriable())
        Utils.sleep(retryBackoffMs);
    else
        throw future.exception();
}

@becketqin
Copy link
Contributor Author

@hachikuji Good catch. Yes, it is a bit ugly to call them out and retry. But I think this is the right thing to do. Retriable means always retry, but the correct handling for those exceptions depends. So we cannot put them into the retriable category.

@asfbot
Copy link

asfbot commented Sep 24, 2015

kafka-trunk-git-pr #540 SUCCESS
This pull request looks good

@@ -197,7 +198,10 @@ private void reassignPartitions() {
client.poll(future);

if (future.failed()) {
if (!future.isRetriable())
if (future.exception() instanceof UnknownConsumerIdException
|| future.exception() instanceof IllegalGenerationException)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think illegal generation is possible in JoinGroup.

@asfbot
Copy link

asfbot commented Sep 25, 2015

kafka-trunk-git-pr #541 FAILURE
Looks like there's a problem with this pull request

@hachikuji
Copy link
Contributor

LGTM

@guozhangwang
Copy link
Contributor

Thanks for the discussion @hachikuji @becketqin, this is very helpful. A few thoughts:

  1. We can probably solve the problem that the same exception having different semantics for different request / response by introducing new exception codes, but we can also add the request API code into the the RequestFuture and let wrap the logic inside RequestFuture for determining isRetriable() instead of just the exception type itself.
  2. If there is a coordinator failover, the new coordinator will have the following phases:

a) it does not yet know that this group belongs to itself:

Hence returns the NOT_COORDINATOR_FOR_CONSUMER, which is retriable.

b) being notified by the controller and gone though becomeLeader but the group is not created yet:

Hence just blindly commit the offset since it thinks this consumer does not use Kafka for membership.

c) the group is finally created, but the consumer id is unknown or the generation id is out-dated:

Hence will return UNKNOWN_CONSUMER or ILLEGAL_GENERATION, which is now non-retriable.

For me I think the current case b) is really bad, as it will accept any commits and this phase will only end when someone sends a JoinGroup, phase c) will now possibly result in consumption duplicates. So I feel
persisting the metadata to ZK is necessary because it relates to correctness but not only duplicates: in b) it will not blindly commit the offset but will return GroupMetadataLoadingInProgress, and in c) it will possibly discovered the consumer already.

@hachikuji
Copy link
Contributor

@guozhangwang Thanks for the comments. It seems like the problem in phase b) is that the coordinator blindly commits the offsets. Does it have to? Couldn't we have the coordinator check if the generation is -1 (the default generation) in order to tell if consumer is using group membership? If the generation is greater than 0, then we could just return ILLEGAL_GENERATION. This obviously wouldn't prevent duplicates, but at least it would prevent this weird state where commits from old generations are accepted.

That being said, I'm starting to think that persistence ought to be part of the first release. If not, then we should at least consider whether there's a way to prevent duplicates on clean broker shutdown, since that is probably the most common case where the coordinator would need to fail-over.

@guozhangwang
Copy link
Contributor

@hachikuji Good point, I think we can do that.

@guozhangwang
Copy link
Contributor

Had some offline discussion with @hachikuji and @junrao, I think we can check in this patch as-is and there are a few follow-up works:

  1. Shutting down consumer should commit offset to avoid duplicates. @hachikuji will work on that.
  2. I will work on KAFKA-2017 to make it in the first release, there are some thoughts about what minimum metadata needs to be persisted to ZK and in what path / format. Also I will try to incorporate the generation id check for non-kafka membership management in the same ticket.
  3. @hachikuji will create another ticket for adding request code into request future.

@asfgit asfgit closed this in a07fbd0 Sep 25, 2015
@guozhangwang
Copy link
Contributor

LGTM.

jsancio pushed a commit to jsancio/kafka that referenced this pull request Aug 6, 2019
…pache#221)

   - Pass principal.tostring() as input param. Metadata service API expects plain string value 
   - Similar to ResourceType, add standard serializer/deserializer for Operation class
   - Removed AuthorizeResponse class. Metadata service API returns List<String> as authorize response result.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants