Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Long-lived pykafka consumer occasionally freezes. #189

Closed
rduplain opened this issue Jun 22, 2015 · 26 comments
Closed

Long-lived pykafka consumer occasionally freezes. #189

rduplain opened this issue Jun 22, 2015 · 26 comments

Comments

@rduplain
Copy link
Contributor

Reporting this bug as I've heard it through @amontalenti. I don't have any information, as I'm new to the project, but want to post this issue to get other reports from others.

@msukmanowsky
Copy link

@rduplain are you using the BalancedConsumer or SimpleConsumer? Also, any logs from around the time the consumer freezes?

@rduplain
Copy link
Contributor Author

I heard this issue through @amontalenti. The questions you are asking @msukmanowsky are exactly what I'd like to see too to assist in addressing the problem. I opened the issue, because it didn't look like one already existed. This is now an open call for those with details to post to the thread.

@msukmanowsky
Copy link

Gotcha! We'll try to contribute what we've seen in and around when our BalancedConsumer does this. Think @kbourgoin or @emmett9001 may have some historical logs (though I don't think they tell much). Pretty sure this issue is isolated to the BalancedConsumer and likely some threading issues, don't think the SimpleConsumer suffers from any problems here.

@yungchin
Copy link
Contributor

As noted in comments elsewhere, I mentioned to @kbourgoin seeing lots of freezes in the test suite - always on tests involving a BalancedConsumer. I need to figure out how to get nose to give me more info, as --nologcapture doesn't seem to do the trick.

@yungchin
Copy link
Contributor

Ok, just caught log output from a stalled test. The test in question is this one, and the log output is in a gist here. The test was run like so: nosetests --stop --nologcapture --nocapture --logging-config tests/debug-logging.conf tests.pykafka.test_producer:ProducerIntegrationTests > /tmp/nosetests.logging. Entries from 21:55 (L138) onwards were prompted by me hitting ctrl-c. When it doesn't stall, this test finishes in 15 seconds or so.

@emmettbutler
Copy link
Contributor

Is this a rebalancing loop? Looks like it rebalanced at least ten times in quick succession.

@yungchin
Copy link
Contributor

Could that be because the topic/partitions have only just been created (in test setup) and so the brokers are still settling?

I've updated the gist with sections of the server logs, starting from 21:52 (so about 20 seconds before the test starts).

@rduplain
Copy link
Contributor Author

Now that I have the test runner going, I am seeing similar behavior as @yungchin:

The test in question is this one, and the log output is in a gist here.

I am running this, as modeled after .travis.yml:

python -m pykafka.test.kafka_instance 3 --download-dir $KAFKA_BIN &
nosetests -v --with-cov --cov pykafka --cover-branches --cov-config  .coveragerc --logging-level=DEBUG

The test suite is stalling at:

test_produce (tests.pykafka.test_producer.ProducerIntegrationTests) ... 

For my environment, I am on Ubuntu 14.04 64-bit with a source-compiled Python 2.7.10, and I haven't yet given any special consideration to my Java installation:

java version "1.7.0_79"
OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-0ubuntu0.14.04.2)
OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)

@emmettbutler
Copy link
Contributor

Interesting, the tests have never shown this stalling behavior for me.

yungchin added a commit that referenced this issue Jun 27, 2015
As discussed in #189 this test would previously stall randomly, waiting
forever until you'd hit ctrl-c.  The change demonstrates that the
problem is test-suite related: we put a timeout on the consumer, and as
a result we get a test error whenever None (== timed out) is returned
from consume(), instead of the stalling.

In log output, we see one of two things whenever the test errors out.
Either we get the wrong consumer offset straight away:

    DEBUG - pykafka.simpleconsumer - Set offset for partition 2 to 0

(where offset -1 is expected, on a fresh topic), or

    INFO - pykafka.simpleconsumer - Resetting offsets for 1 partitions

(which, by default settings, will jump to OffsetType.LATEST, ie past the
 freshly produced message we want to get).  Neither of these should
occur if the test harness hands us a newly created topic, as we expect.

On a side note, this resolves #179 because we now test with a binary
string, and that works (when it doesn't stall).

Signed-off-by: Yung-Chin Oei <yungchin@yungchin.nl>
@yungchin
Copy link
Contributor

Interesting, the tests have never shown this stalling behavior for me.

It could be related to whether you keep the test cluster up between tests. I usually follow the recipe layed out in .travis.yml, where you start the test cluster in a separate step (see before_script:). It just stalled on Travis, incidentally: https://travis-ci.org/Parsely/pykafka/builds/68558040 (which will probably pass just fine if we restart it).

That is also to say, I think I may have confused this bug report by suggesting that the test stalling is related to the original report at all. I've just pushed a test fix for the stalling thing.

@emmettbutler
Copy link
Contributor

Possibly related to #260

@emmettbutler emmettbutler modified the milestones: 2.0.2, 2.0.1 Oct 19, 2015
@emmettbutler emmettbutler removed this from the 2.0.2 milestone Oct 28, 2015
@aeneaswiener
Copy link

I am experiencing the same behaviour on a staging environment where I am testing pykafka with three balanced consumers connected to a single broker.

After a while the consumers will just stop consuming messages, however in the log output I can see that they keep rebalancing and they keep committing their current offsets. Here is an excerpt of the debug log, where I have indicated the point after which no more messages will be consumed by one particular consumer: https://gist.github.com/aeneaswiener/aa38495c7e7d9ff3d74c

Also, below you can see the progress of the consumer group in light blue, and I have indicated the point where all consumers of the topic have stopped consuming messages.

cursor_and_kafka_consumer_offset_monitor

Finally, looking at the Kafka broker log output after the consumers have stopped, I can see the following error message:

[2015-11-04 14:12:50,792] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2015-11-04 14:12:50,806] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: This operation cannot be completed on a complete request.
        at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
        at kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:203)
        at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:226)
        at kafka.network.Processor.write(SocketServer.scala:472)
        at kafka.network.Processor.run(SocketServer.scala:342)
        at java.lang.Thread.run(Thread.java:745)

This keeps reoccurring until I manually shut down the consumer processes.

@ottomata
Copy link
Contributor

ottomata commented Nov 4, 2015

Hi,

I'd just like to comment that this exact thing happened to us once after running BalancedConsumers for about a month or two. Restarting the consumers made them continue consuming fine. This occurred a while ago, so I don't have any relevant logs. If it happens again, I'll capture some info and post here.

@aeneaswiener
Copy link

I can also confirm that restarting the consumers fixes the issue.

For us the issue happens reproducibly after about 10 minutes or so in our staging environment, where we are consuming messages at a rate of thousands per second.

@ottomata did you also see the above error log message in the Kafka broker? (ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) kafka.common.KafkaException: This operation cannot be completed on a complete request)

@emmettbutler
Copy link
Contributor

Interesting. We only ever see this behavior in our Travis and local test environments - we run PyKafka consumers in production at Parse.ly without seeing this. It's possible that the issue is invisible to us because we use Storm to manage the automatic restarting of consumers, so maybe we can look into that.

@yungchin
Copy link
Contributor

yungchin commented Nov 5, 2015

@aeneaswiener could you maybe repeat that with additional debug logging - something like

diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py
index ae4ebf8..116444a 100644
--- a/pykafka/simpleconsumer.py
+++ b/pykafka/simpleconsumer.py
@@ -647,6 +647,8 @@ class SimpleConsumer():
                         fetch_req = owned_partition.build_fetch_request(
                             self._fetch_message_max_bytes)
                         partition_reqs[owned_partition] = fetch_req
+            log.debug("Fetching messages for {} partitions".format(
+                len(partition_reqs)))
             if partition_reqs:
                 try:
                     response = broker.fetch_messages(

The thing that strikes me in your logging output is that at some point it mentions fetching for partition 15 but 15 doesn't show up again further down the line, even though you're steadily consuming, and then it alternates between 19 and 18 for a bit until finally it only fetches for 18. So I wonder if we're leaking partition locks somehow - even if I don't see how when just staring at the code.

@jasonrhaas
Copy link

+1 for this issue. I am also seeing the PyKafka Balanced consumer stop consuming after a period of time. I've seen this anywhere from 30 minutes to 4 hours after establishing the connection. I'm still seeing this message:

out: 2015-11-06 12:24:04,446 - pykafka.balancedconsumer - INFO - Checking held partitions against ZooKeeper

but the consumer offsets are no longer changing.

[root@kafka01 bin]# ./kafka-consumer-offset-checker.sh -g scrapy-janitor -z zookeeper:2181
Group           Topic                          Pid Offset          logSize         Lag             Owner
scrapy-janitor  crawled_firehose         0   13488372        13492777        4405            none
scrapy-janitor  crawled_firehose         1   13321651        13326058        4407            none
scrapy-janitor  crawled_firehose         2   13753435        13757842        4407            none
[root@kafka01 bin]# ./kafka-consumer-offset-checker.sh -g scrapy-janitor -z zookeeper:2181
Group           Topic                          Pid Offset          logSize         Lag             Owner
scrapy-janitor  crawled_firehose         0   13488372        13492789        4417            none
scrapy-janitor  crawled_firehose         1   13321651        13326066        4415            none
scrapy-janitor  crawled_firehose         2   13753435        13757854        4419            none

Here is the traceback that I saw come across right before the consumer stopped consuming:
https://gist.github.com/jasonrhaas/8ee82d30ac3b1183c1ca

@yungchin
Copy link
Contributor

yungchin commented Nov 6, 2015

@jasonrhaas if I'm reading that traceback correctly, I believe what's happening is that we get a zookeeper notification, presumably because one of the brokers has become unresponsive (maybe intermittently), and it happens to be the broker that was the coordinator for our consumer-group's offsets, so that when we try to write our offsets (which we want to do before restarting the consumer) we hit a socket timeout (about that - can I ask, are you on Linux, or some BSD? Just curious because I thought sockets didn't have any timeout by default).

So actually @emmett9001 is working on handling such errors in #327 right now (only so far we only expected pykafka.SocketDisconnectedError, not socket.timeout) which I suspect should resolve this particular problem.

@emmett9001 in addition, I'm wondering if BalancedConsumer._brokers_changed should maybe call self._cluster.update straight out of the gate, before beginning the consumer rebalance?

@jasonrhaas
Copy link

@yungchin Zookeeper, Kafka, and the consumer client that is running Storm are all running Linux CentOS 6.5. The fact that I'm seeing error messages from Kazoo points towards losing a zookeeper connection, although I don't seem to have this problem with all the other consumers I have running kafka-python. Will continue to investigate.

@yungchin
Copy link
Contributor

yungchin commented Nov 8, 2015

Thanks @jasonrhaas. Yes, the error is raised on a thread spawned by kazoo, but I believe the bug is in pykafka code. The traceback you posted comes out of a callback that we register with kazoo, so that we are notified when there are broker availability changes (and I suspect that in your case, it triggers because a broker became temporarily unresponsive - and long enough for zookeeper to notice it). The bug in pykafka would then be that while we receive this notification we still try to write offsets to the unresponsive broker.

(In surmising all this, I am going by the logging excerpt you posted, which only has tracebacks containing _brokers_changed. If this was not a broker connectivity issue but a zookeeper connectivity issue, I would expect all our kazoo watches to trigger, and you would see the same sort of traceback appear in logs in quick succession, with _brokers_changed, _consumers_changed, and _topics_changed.)

@emmett9001 has just merged #331 to master, which doesn't fix the problem yet, but improves the situation by surfacing this exception from the callback to the main thread, so now you'd see it when you call consume().

I'll also make a note on #327 about the fact that you encounter socket.timeout rather than pykafka.SocketDisconnectedError. Thanks.

@jasonrhaas
Copy link

So running this again, and tailing the logs the whole time, the cause of death now seems to be this:

[in-slave05.nj.istresearch.com] out: 2015-11-10 11:53:12,021 - pykafka.simpleconsumer - INFO - Autocommitting consumer offset for consumer group scrapy-janitor2 and topic memex.crawled_firehose
[in-slave05.nj.istresearch.com] out: 2015-11-10 11:53:22,486 - pykafka.simpleconsumer - ERROR - Exception encountered in worker thread:
[in-slave05.nj.istresearch.com] out:   File "/tmp/virtualenv/virtualenv_root/scrapy-email/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 301, in autocommitter
[in-slave05.nj.istresearch.com] out:     self._auto_commit()
[in-slave05.nj.istresearch.com] out:   File "/tmp/virtualenv/virtualenv_root/scrapy-email/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 377, in _auto_commit
[in-slave05.nj.istresearch.com] out:     self.commit_offsets()
[in-slave05.nj.istresearch.com] out:   File "/tmp/virtualenv/virtualenv_root/scrapy-email/lib/python2.7/site-packages/pykafka/simpleconsumer.py", line 397, in commit_offsets
[in-slave05.nj.istresearch.com] out:     self._consumer_group, 1, b'pykafka', reqs)
[in-slave05.nj.istresearch.com] out:   File "/tmp/virtualenv/virtualenv_root/scrapy-email/lib/python2.7/site-packages/pykafka/broker.py", line 333, in commit_consumer_group_offsets
[in-slave05.nj.istresearch.com] out:     return self._offsets_channel_req_handler.request(req).get(OffsetCommitResponse)
[in-slave05.nj.istresearch.com] out:   File "/tmp/virtualenv/virtualenv_root/scrapy-email/lib/python2.7/site-packages/pykafka/handlers.py", line 59, in get
[in-slave05.nj.istresearch.com] out:     raise self.error

It runs smoothly for about 15-20 minutes, and then this happens. The topic I'm consuming has 3 partitions, and I'm using a Balanced consumer with :p of 2 in streamparse to read the topic. So if I look at the lag it might look like this:

 Partition   Lag      Latest Offset    Current Offset
-----------  ------  ---------------  ----------------
     0       3          13834709          13834706
     1       3          13668011          13668008
     2       34,829     14066509          14031680

Note: I was running a Kafkacat consumer on the same topic at the same time to make sure there wasn't any issues on the producer/topic side. My Kafkacat consumer kept going after the PyKafka consumer stopped consuming messages.

@rduplain
Copy link
Contributor Author

There are three different values of self.error throughout the logs over time:

error: [Errno 104] Connection reset by peer
error: [Errno 32] Broken pipe
timeout: timed out

The value in @jasonrhaas's most recent comment is error: [Errno 32] Broken pipe.

@rduplain
Copy link
Contributor Author

It looks like we need to look for IOError errors in addition to the current checks in the connection code.

@emmettbutler
Copy link
Contributor

#341 has been merged and I'll be putting it into a 2.0.3 release shortly. If that does prove to fix @jasonrhaas' issue, I propose that we close this issue and use other (possibly new) tickets for similar issues, since this one has been nebulous since the start.

@ottomata
Copy link
Contributor

I'd like to add that this may be the same problem that I saw. We've been doing some rolling broker restarts this week, and I regularly see SocketDisconnectedError followed by stopped consumption (or stopped offset commits? Can't tell).

https://gist.github.com/ottomata/c3a6c47685cdd75bfc3f

@emmettbutler
Copy link
Contributor

I'm pretty sure this issue encompasses several that have since been solved, and it's hard to pick apart the remaining problems. I'm closing this issue in favor of updated ones, specifically #347. If anyone objects and feels this issue should stay open, feel free to respond/reopen.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

8 participants