Skip to content
This repository was archived by the owner on Mar 24, 2021. It is now read-only.

Conversation

fortime
Copy link

@fortime fortime commented Jul 27, 2015

To avoid multiple consumers to consume the same partition. I add a strict testing on the connection of zookeeper. I think a connection to zookeeper is re-connected only if a rebalancing has been finished.
BTW, if the id of a consumer is missing in zookeeper, its registry of owned partitions should be missing too. In this condition, we should not call self._remove_partitions().

@fortime fortime mentioned this pull request Jul 27, 2015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe/necessary to write to _zookeeper.state here? Won't the client manage that correctly by itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the zookeeper is from outside, it may have been started. self._zookeeper_state_changed would not be called. And if we not set self._zookeeper_connected here, consume() will raise an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, thanks, I see the point now. So then, wouldn't it be ok to just do

self._zookeeper_connected = self._zookeeper.state

so that we don't risk clobbering the KazooClient's internal state by writing CONNECTED ourselves, and still make sure we initialise self._zookeeper_connected correctly?

@yungchin
Copy link
Contributor

You make an important observation that calling _remove_partitions may be unsafe, in cases where the consumer potentially no longer owns it. I wonder if it's possible to make the method safer? Could we store the ZNode versions, and pass those along with the delete request, or something like that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on the pr-discussion thread - if _remove_partitions were safe (in the "only if we own them" sense), we could call it here. If it turns out that's not possible, the current way is best of course. Maybe in that case the comment could be expanded to "all ephemeral nodes for this should be missing, therefore we must not call _remove_partitions".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just forget my comment. My poor English~:P
I will add it.

@yungchin
Copy link
Contributor

This looks good to me, with the caveat that I have a more limited understanding of ZK than @emmett9001. I should also note that I don't fully grasp the use of self._setting_watches - I'm not alleging it is incorrect, I just can't manage to get my head around it quite yet, and it feels like it may be an overloaded flag now.

@kbourgoin
Copy link
Contributor

I'm new to this, but I'm not seeing what the need for this PR is. There's already a check on zookeeper to make sure it's connected, so I don't see the need for storing it elsewhere which could be vulnerable to race conditions.

Furthermore, there's a timeout when the zk connection dies before the ephemeral nodes are removed. It can't be safely assumed that if ZK hits a networking hiccup that all registrations have been removed and need to be re-established. Kazoo tends to reconnect itself automatically except in very serious network partition cases, so automatically calling rebalance on what could be an transitory issue seems like a very bad idea.

@fortime
Copy link
Author

fortime commented Jul 28, 2015

@yungchin I'm new to zookeeper. But I think the probability of losing ephemeral nodes between L445-L464 is very low. It may too complicated to make _remove_partitions safer.

@fortime
Copy link
Author

fortime commented Jul 28, 2015

@yungchin I think there is no need to do rebalance When the connection to zookeeper is not available or the consumer hasn't recovered from zookeeper failure. So I set self._setting_watches when the state of zookeeper changed.

@fortime
Copy link
Author

fortime commented Jul 28, 2015

@kbourgoin The original check of zookeeper connection is not sufficient.
Here is a scenario:
The consumer is consuming a message which will cost it several minutes. In the meantime, the connection of zookeeper has disconnected for a time long enough to make all its ephemeral nodes missing. Before the consumer fetches next message, the connection is back. Then the consumer may consume the partition which is no longer owned by it.
I'm new to zookeeper and kazoo. Thanks for the comment. I think i should check the consumer_id is still there before I do a rebalancing. BTW, is it atomic in zookeeper when it delete all ephemeral nodes of an expired session.

@fortime
Copy link
Author

fortime commented Jul 28, 2015

@kbourgoin I think it is necessary to do a rebalance. Becasue _zookeeper_state_changed() is a copy of ChildrenWatch._session_watcher().

if func is not None:
    self._used = True
    if allow_session_lost:
        self._client.add_listener(self._session_watcher)
    self._get_children()

It sounds like the session is lost when _session_watcher() is called. So I think all the ephemeral nodes are deleted.

@emmettbutler
Copy link
Contributor

I don't see the need for these changes. We have _check_held_partitions which runs periodically to ensure that the consumer isn't owning partitions that it shouldn't. The case that this pull request is meant to solve sounds rare enough that I don't think it warrants these changes.

@fortime
Copy link
Author

fortime commented Jul 29, 2015

@emmett9001 So, it is acceptable that multiple consumers consume the same partition in 120 seconds. And if there is no consumer changed, they will do that forever. rebalance() will always raise an exception if _add_self never is called after zookeeper failure.

@yungchin
Copy link
Contributor

@kbourgoin @emmett9001 I'd like to explore this a bit further if that's ok (also giving me an opportunity to get better acquainted with kazoo/zookeeper!). It's not entirely clear to me, from kazoo docs, whether, if the connection were suspended/lost just when a watch would be expected to trigger, that watch will still be triggered or not when the connection comes back. From what @fortime says I gather he's finding that those triggers don't work, is that right?

I understand why we need to have _check_held_partitions, but I also feel that to some extent that may paper over some issues (my reading of the method is something like "let's check if this situation that in theory should never occur occurs anyway, and then make it go away with rebalancing" - ideally, if both pykafka and kazoo (and zookeeper) behave exactly as promised, it would always return without rebalancing, right?).

I know what I'm saying is very hand-wavy. It would be a lot more concrete if it was possible to design a TestCase that shows the precise issues. It's a pain to write probably, if it has to simulate network interruptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a problem if we treat these two states the same way: a suspended connection may still have ownership of its ephemeral nodes. So for the LOST state, what you're doing in L450 above is valid (because the nodes on zookeeper will have gone), but for the SUSPENDED state, doing this might leave the nodes on zookeeper owned by this instance, that is, the instance would then have unilaterally dropped the partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also occurs to me that there's a case to be made for bubbling up SUSPENDED states to the application layer. What you'd want to happen may be somewhat app dependent: should we continue processing messages, but not commit the offsets, or freeze consumption completely, or continue doing both? The safest thing would be to suspend all consumption completely I suppose - that would be a change from current behaviour.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't dig too much about LOST and SUSPENDED. From the doc of kazoo, ephemeral nodes may or may not be lost in SUSPENDED state.
I don't drop the partitions unilaterally. I have checked if the consumer id is still there. Only if not, i will drop the partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, you are checking - my bad, I was being very myopic for a moment.

@fortime
Copy link
Author

fortime commented Jul 29, 2015

@yungchin From the source code of kazoo, i think watch will not be triggered when the connection comes back. Codes are here.

@yungchin
Copy link
Contributor

@fortime thanks for that pointer. So looking at that, I believe https://github.com/python-zk/kazoo/blob/master/kazoo/recipe/watchers.py#L343-L348 should make sure the watch may be triggered after a connection loss, and then the lines L320-325 that you linked make sure that if our consumer_id is still listed (and nothing else changed), the watch won't trigger after all.

So ultimately, I think I agree that this pullreq may not solve the problem you're seeing. Having said that, though - what's the behaviour you've found, and that this addresses? Does it fit in a TestCase?

@kbourgoin
Copy link
Contributor

@fortime is it possible to create a test case to illustrate this issue? I'm also not seeing the need for the changes, but instead of talking about it, it might be better if we had a concrete example of it happening.

@fortime
Copy link
Author

fortime commented Jul 29, 2015

@yungchin I have missed the second if clause. I have seen multiple consumers own the same partition. Maybe I don't wait more than 2 minutes and the frequency of zookeeper failure is too high. I haven't seen them recover from that. I have seen the following behaviour during my test:

  1. The consumer will delete the partition znodes which it no longer owns.
  2. The consumer will consume message during the time of rebalancing. It is an expected behavior, but it may cause messages in 30 seconds consumed by multiple consumers.
    Maybe it is the high frequency of zookeeper failure that cause other weird concurrency problems.

I think you guys should fix the first one at least. And stop internal consumer first when owned partitions should be changed.

@kbourgoin I will review those tests I have made and putting the process here.

@fortime
Copy link
Author

fortime commented Jul 29, 2015

There is another issue at this pull request. SimpleConsumer will still consume those messages arrived even if it has been stopped.

@yungchin
Copy link
Contributor

@fortime many thanks for explaining this in detail, that clarifies a lot. Revisiting the code now, I think I can see how both your observations 1) and 2) come about. Would you be able to turn the code you used for testing into a test case (the test suite lives under tests/pykafka), so that there would be a clear basis to measure improvements against?

As for solutions: as per our discussion about kazoo watches above, I don't believe the extra add_listener and state management here is needed as the watches contain that logic too (of course the test case may disprove that hunch). Also, the focus of this fix so far has been on not deleting partitions from zookeeper that we don't own, but I think additionally the reverse may also be a problem: after a disconnect, the consumer may believe it owns a partition already, and will therefore fail to re-register it. That is to say, I suspect the eventual complete fix will have to end up looking somewhat different.

@fortime
Copy link
Author

fortime commented Jul 30, 2015

@yungchin I wanted to put a test case into it. But it is hard to check if the test case runs right.

Basically, I think the deletion of ephemeral nodes should be atomic. So If one of the ephemeral nodes exists, all of them exist. Checking if the consumer id is in the participants when rebalancing is enghou for both cases.

@yungchin
Copy link
Contributor

Yes I can see the problem with getting the test case right, it's a difficult failure scenario to set up. In fact, we're lucky that you seem to have a setup with lots of zookeeper connection loss happening, so you've detected all these issues.

It looks like kazoo offers some test suite utilities to simulate things like this: https://github.com/python-zk/kazoo/blob/2.2.1/kazoo/testing/harness.py#L97 - I'd be keen to take a shot at that, but it might take a while before I've a clear plate I'm afraid, so if you're up for it, please go for it!

@yungchin
Copy link
Contributor

As noted in an earlier comment (#215 (comment)) there are some parts here that are crucial, and some that may not be necessary. And now that the pullreq on which this was based (#206) has been closed, I think I might have to close this one too, but that doesn't mean the changes are dead. I've added a summary of the important observations here to #204, and have also spun out two new tickets.

yungchin added a commit that referenced this pull request Oct 16, 2015
This changes _rebalance() to always use zk as the "source of truth" for
what partitions we hold, rather than self._partitions.  The main reason
to do so is that self._partitions may be out of date, if _rebalance() is
triggered by a zk session expiration.  This was originally reported by
@fortime in #215 - I'm however opting for a slightly more conservative
(and thus also less performant!) solution than was proposed there.

Because we now no longer use self._partitions as a proxy for our
zk-registered partitions, I've redefined it to point to the internal
SimpleConsumer's partitions, which simplifies things a bit.

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants