Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High CPU usage in KafkaConsumer.poll() when subscribed to many topics with no new messages (possibly SSL related) #1315

Closed
rmechler opened this issue Dec 6, 2017 · 25 comments

Comments

@rmechler
Copy link

rmechler commented Dec 6, 2017

Experiencing high CPU usage when sitting idle in poll() (i.e., waiting for a timeout when there are no new messages on the broker). Gets worse the more topics I am subscribed to (I have cpu pegged at 100 with 40 topics). Note that I am using 1.3.4 with mostly default configs, and repro'd also in the curret master.

Seems to be a couple things at play here. One is that poll() will do fetch requests in a tight loop. The other, the one that really seems to be killing cpu, is that when a fetch response is received, the low level poll() will get in a relatively tight loop as the payload buffer fills, adding a relatively small number of bytes at a time. This explains the effect of adding more topics: the fetch responses are bigger so it more time in this tight loop. Here's some debug output based on a couple probes I put in the code:

In conn.py: _recv()

        if staged_bytes != self._next_payload_bytes:
                print("staged: {}   payload: {}".format(staged_bytes, self._next_payload_bytes))
                return None

In consumer/group.py: _poll_once()

        print("fetch!")
        # Send any new fetches (won't resend pending fetches)
        self._fetcher.send_fetches()

So, for one topic I get output like this while blocked in poll():

fetch!
staged: 4   payload: 104
fetch!
staged: 12   payload: 104
fetch!
staged: 50   payload: 104
fetch!
staged: 68   payload: 104
fetch!
staged: 86   payload: 104
fetch!
fetch!
staged: 4   payload: 104

For 2 topics:

fetch!
staged: 4   payload: 179
fetch!
staged: 12   payload: 179
fetch!
staged: 51   payload: 179
fetch!
staged: 69   payload: 179
fetch!
staged: 87   payload: 179
fetch!
staged: 105   payload: 179
fetch!
staged: 143   payload: 179
fetch!
staged: 161   payload: 179
fetch!
fetch!
staged: 4   payload: 197
fetch!

For 40 topics:

fetch!
staged: 2867   payload: 3835
fetch!
staged: 2885   payload: 3835
fetch!
staged: 2939   payload: 3835
fetch!
staged: 2957   payload: 3835
fetch!
staged: 2975   payload: 3835
staged: 4   payload: 3799
fetch!
staged: 12   payload: 3799
fetch!
staged: 58   payload: 3799
fetch!
staged: 76   payload: 3799
fetch!
staged: 94   payload: 3799
fetch!
staged: 112   payload: 3799
fetch!
staged: 154   payload: 3799
fetch!
... and many mnay more

so it gets stuck spinning in this, and cpu goes to 100.

I tried mitigating this using consumer fetch config:

    fetch_min_bytes=1000000,
    fetch_max_wait_ms=2000,

but that did nothing.

The only thing that gets the cpu down is to to a non-blocking poll() instead of using a timeout, and then doing a short sleep when there are no result records (my application can tolerate that latency). It looks like poll used to support something like this, i.e., there was a sleep parameter that caused a sleep for the remainder of the timeout period if there were no records on first fetch. Looks like that was removed in 237bd73, not sure why.

So... like I said I can workaround the continuous fetching with my own sleep. Would be good to understand the real problem which is the tight _recv() loop, and whether anything can be done about it.

@rmechler
Copy link
Author

rmechler commented Dec 6, 2017

so it gets stuck spinning in this, and cpu goes to 100.

Just to clarify, it doesn't actually get stuck, it just spends a lot of time in the tight loop. The application performs correctly, it's just the cpu usage that is the problem (negatively impacts other applications)

@dpkp
Copy link
Owner

dpkp commented Dec 6, 2017

What broker version are you using? Perhaps related: https://issues.apache.org/jira/browse/KAFKA-1563

I dont think there is a way to disable TCP_NODELAY on the kafka broker, but I suspect that would help if the issue is that the broker is sending lots of very small packets.

Otherwise, I'll think about how to improve performance in this scenario.

@dpkp
Copy link
Owner

dpkp commented Dec 6, 2017

One alternate approach here might be to wrap each sock.recv() call in a configurable timeout (perhaps socket_recv_timeout_ms) which could be used to tune throughput / latency.

@rmechler
Copy link
Author

rmechler commented Dec 7, 2017

Our broker version is 0.11.0, so seems unlikely to be KAFKA-1563, but we can have a look into whether it might be that or something else on the server.

@dpkp
Copy link
Owner

dpkp commented Dec 8, 2017

Thanks. I can reproduce this, but only if the poll timeout_ms is 0. Have you tried setting a larger timeout? consumer.poll(timeout_ms=100) seems to work fine.

Perhaps the much simpler solution here is to change the default timeout_ms to something like 100ms?

@dpkp dpkp changed the title High CPU usage when sitting idle in poll() High CPU usage when sitting idle in poll() with timeout_ms=0 Dec 8, 2017
@rmechler
Copy link
Author

Sorry for the delayed response. For us, the issue is actually when we have a non-zero timeout. It's when it sits idle in a poll() call with no messages coming in. The tight loop with small reads probably happens with a 0 timeout too (I'll retry that as soon as I get a chance), but that's actually how we mitigate the issue right now: we do a poll() with timeout_ms=0, and if there are no messages we do a sleep(1). This brings the cpu way down, but probably only because it is spreading it into smaller bursts.

In trying to reproduce this with a timeout, did you try subscribing to several topics? That magnifies the issue and makes it easier to notice.

@rmechler
Copy link
Author

Tested with timeout_ms=0, I see the same behaviour as I do with a bigger timeout.

@dpkp
Copy link
Owner

dpkp commented Dec 11, 2017

How many total partitions are you assigned? And what is the total leader count across these partitions?

@rmechler
Copy link
Author

16 partitions per topic, and up to 40 topics. I tested with 1, 2 10 and 40 topics, same behaviour in all cases, but gets worse with the number of partitions (because the fetch response size goes up). Total leader count 6 (i.e., I have 6 brokers and the leaders are spread among those brokers).

@rmechler
Copy link
Author

Also I traced with tcpdump and it looks like the broker is not sending small packets, rather, the client is breaking it up into small reads. Here's a sample of the trace:

20:27:53.715512 IP 10.200.156.101.9093 > 10.60.2.53.24990: Flags [P.], seq 4077:4250, ack 6836, win 20279, options [nop,nop,TS val 301262803 ecr 1666870086], length 173
20:27:53.743320 IP 10.200.156.69.9093 > 10.60.2.53.35331: Flags [.], seq 7285:10057, ack 3774, win 22077, options [nop,nop,TS val 386447275 ecr 1666870094], length 2772
20:27:53.743331 IP 10.60.2.53.35331 > 10.200.156.69.9093: Flags [.], ack 10057, win 347, options [nop,nop,TS val 1666870102 ecr 386447275], length 0
20:27:53.743537 IP 10.200.156.69.9093 > 10.60.2.53.35331: Flags [P.], seq 10057:11909, ack 3774, win 22077, options [nop,nop,TS val 386447275 ecr 1666870094], length 1852
20:27:53.743549 IP 10.60.2.53.35331 > 10.200.156.69.9093: Flags [.], ack 11909, win 354, options [nop,nop,TS val 1666870102 ecr 386447275], length 0
20:27:53.745193 IP 10.200.156.101.9093 > 10.60.2.53.24990: Flags [.], seq 4250:7022, ack 6836, win 20279, options [nop,nop,TS val 301262810 ecr 1666870095], length 2772
20:27:53.745204 IP 10.60.2.53.24990 > 10.200.156.101.9093: Flags [.], ack 7022, win 119, options [nop,nop,TS val 1666870102 ecr 301262810], length 0
20:27:53.745211 IP 10.200.156.101.9093 > 10.60.2.53.24990: Flags [.], seq 7022:8408, ack 6836, win 20279, options [nop,nop,TS val 301262810 ecr 1666870095], length 1386
20:27:53.745215 IP 10.60.2.53.24990 > 10.200.156.101.9093: Flags [.], ack 8408, win 124, options [nop,nop,TS val 1666870102 ecr 301262810], length 0
20:27:53.745283 IP 10.200.156.101.9093 > 10.60.2.53.24990: Flags [.], seq 8408:15338, ack 6836, win 20279, options [nop,nop,TS val 301262810 ecr 1666870095], length 6930
20:27:53.745304 IP 10.60.2.53.24990 > 10.200.156.101.9093: Flags [.], ack 15338, win 151, options [nop,nop,TS val 1666870102 ecr 301262810], length 0
20:27:53.745374 IP 10.200.156.101.9093 > 10.60.2.53.24990: Flags [.], seq 15338:20882, ack 6836, win 20279, options [nop,nop,TS val 301262811 ecr 1666870095], length 5544
20:27:53.745389 IP 10.60.2.53.24990 > 10.200.156.101.9093: Flags [.], ack 20882, win 173, options [nop,nop,TS val 1666870102 ecr 301262811], length 0
20:27:53.745402 IP 10.200.156.101.9093 > 10.60.2.53.24990: Flags [P.], seq 20882:22296, ack 6836, win 20279, options [nop,nop,TS val 301262811 ecr 1666870095], length 1414
20:27:53.745411 IP 10.60.2.53.24990 > 10.200.156.101.9093: Flags [.], ack 22296, win 179, options [nop,nop,TS val 1666870102 ecr 301262811], length 0

while network traffic was blocked at that point, the client continued with its slow read:

calling poll
fetch!
staged: 487   payload: 10589
staged: 489   payload: 7367
staged: 1913   payload: 14124
staged: 408   payload: 5629
staged: 149   payload: 7090
staged: 157   payload: 15697
returned from poll
no messages
time.sleep(1)
calling poll
fetch!
staged: 505   payload: 10589
staged: 507   payload: 7367
staged: 1931   payload: 14124
staged: 426   payload: 5629
staged: 167   payload: 7090
staged: 175   payload: 15697
returned from poll
no messages
time.sleep(1)
calling poll
fetch!
staged: 523   payload: 10589
staged: 525   payload: 7367
staged: 1949   payload: 14124
staged: 444   payload: 5629
staged: 185   payload: 7090
staged: 1163   payload: 15697
returned from poll
no messages
time.sleep(1)

in this case I am in a loop where I call poll(timeot_ms=0) and then sleep for 1 second if there are no messages. So it seems to be taking multiple poll calls (over multiple seconds) to assemble the fetch responses. This was with 40 topics in order to highlight the effect.

@rmechler
Copy link
Author

BTW, I don't think I mentioned that I am using SSL

@dpkp dpkp changed the title High CPU usage when sitting idle in poll() with timeout_ms=0 High CPU usage in KafkaConsumer.poll() when subscribed to many topics with no new messages (possibly SSL related) Dec 12, 2017
@dpkp
Copy link
Owner

dpkp commented Dec 12, 2017

Are you able to profile the process while it is in this state? Have you used vmprof ?

@rmechler
Copy link
Author

It looks like the problem is with recv'ing from the SSL wrapped socket. It seems to always return a small number of bytes (most of the time 18). We tried the same test without SSL and don't see the same behaviour. I put the following probe in the 1.3.4 code:

bytes_to_read = self._next_payload_bytes - staged_bytes
data = self._sock.recv(bytes_to_read)
print("bytes_to_read: {:<10} bytes actually read: {}".format(bytes_to_read, len(data)))
# We expect socket.recv to raise an exception if there is not

Here's some sample output with SSL enabled:

bytes_to_read: 2161       bytes actually read: 18
bytes_to_read: 3544       bytes actually read: 18
fetch!
bytes_to_read: 2143       bytes actually read: 18
bytes_to_read: 3526       bytes actually read: 33
fetch!
bytes_to_read: 2125       bytes actually read: 38
bytes_to_read: 3493       bytes actually read: 18
fetch!
bytes_to_read: 2087       bytes actually read: 18
bytes_to_read: 3475       bytes actually read: 18
fetch!
bytes_to_read: 2069       bytes actually read: 18
bytes_to_read: 3457       bytes actually read: 18
fetch!
bytes_to_read: 2051       bytes actually read: 18
bytes_to_read: 3439       bytes actually read: 45
fetch!
bytes_to_read: 2033       bytes actually read: 41
bytes_to_read: 3394       bytes actually read: 18
fetch!
bytes_to_read: 1992       bytes actually read: 18
bytes_to_read: 3376       bytes actually read: 18
fetch!
bytes_to_read: 1974       bytes actually read: 18
bytes_to_read: 3358       bytes actually read: 18
fetch!
bytes_to_read: 1956       bytes actually read: 18
bytes_to_read: 3340       bytes actually read: 42

Here's a sample with a plaintext connection:

bytes_to_read: 3817       bytes actually read: 1564
fetch!
bytes_to_read: 2253       bytes actually read: 2253
fetch!
bytes_to_read: 3691       bytes actually read: 736
fetch!
bytes_to_read: 2955       bytes actually read: 2955
fetch!
bytes_to_read: 3943       bytes actually read: 1585
fetch!
bytes_to_read: 2358       bytes actually read: 2358
fetch!
bytes_to_read: 3835       bytes actually read: 3835
fetch!
bytes_to_read: 3745       bytes actually read: 3469

We have yet to figure out why the SSL socket is behaving the way it is.

I did try putting the self._sock.recv(bytes_to_read) in a loop and only breaking out when staged_bytes == self._next_payload_bytes. (Not sure it is safe to do that, but it was an experiment.) Still got the same small reads, but there was a big improvement in cpu usage (from 100% down to 10%)

@rmechler
Copy link
Author

Note, when using SSL, recv() mostly always returns a small number of bytes, it's not just a state that it gets into.

I tried a similar probe in current master:

data = self._sock.recv(SOCK_CHUNK_BYTES)
print("bytes read: {}".format(len(data)))
# We expect socket.recv to raise an exception if there is not

with the following sample result:

bytes read: 30
bytes read: 48
bytes read: 30
bytes read: 30
bytes read: 30
bytes read: 39
bytes read: 30
bytes read: 30
bytes read: 30
bytes read: 42

I would have expected the cpu to be better in this case, since it reads up to 4096 bytes in a loop, but it was still 100%.

@rmechler
Copy link
Author

I misread the code for _recv() in master, it doesn't keep looping until it gets 4096 bytes, it only loops if it reads the full 4096, which makes sense and also explains why the cpu is still high.

So I'm at a loss as to why I'm getting these small reads. I've tried to reproduce with a simple client and server app running on the same machines as my kafka client app and kafka broker. The server will respond to client requests with a 16K byte response, and the client selects using the default selector from selectors34 and does a non blocking recv()... and the recv() returns the full 16K bytes every time. It's a bit of a head scratcher.

@rmechler
Copy link
Author

Ok, I kind of know what's going now. My test server was sending 16K in a single write. When I make it respond with multiple small writes of 50 bytes each, the client recv() returns 50 bytes at a time. Probably what is happening here (and I am speculating a bit as I am not intimately familiar with the SSL protocol) is that each write from server is encoded as a separate SSL record, and on the client side recv() for an SSL socket will return 1 record at a time.

It looks like the Kafka broker is sending the FetchResponse as a bunch of small writes. In fact it looks like it is doing a write for each topic name, and a write for each (empty) message set. That explains the pattern of 18 byte reads (the message sets) and somewhat larger reads (the topics). The reason I see 30 byte reads when using the master code is that it is using the newer protocol. If I specify api_version=(0,10), it goes back to 18 bytes.

So, I'm not sure there is much that can be done about getting the small chunks of data from recv(). Do you think it might be possible to safely assemble the response in a tighter loop, without going back through a select() call every time, in order to make it more efficient?

@dpkp
Copy link
Owner

dpkp commented Dec 13, 2017

very interesting. exceptional investigation @rmechler !

@rmechler
Copy link
Author

Thanks. It's when my colleague pointed out that my test server should do multiple small writes that things started to click. :)

So it seems like we have a degenerate case: SSL + lots of topics and partitions + relatively low traffic with periods of idle. We can address this to a fair degree on our end by (a) consolidating some of our topics and reducing the number of partitions in some cases, and (b) reducing the frequency of fetches in idle time using fetch_min_bytes and fetch_max_wait.

Nevertheless, it seems there is some efficiency to be gained in kafka-python by trying to read as many records as possible before doing another poll. Are you open to that idea?

@dpkp
Copy link
Owner

dpkp commented Dec 13, 2017 via email

@dpkp
Copy link
Owner

dpkp commented Dec 27, 2017

I put up a PR that may help in this situation. Would love if you were able to test in your setup and see if you get any improvement!

@rmechler
Copy link
Author

Cool, I'll definitely test it out. I'm on holiday without access to my environment at the moment though so won't be able to try out until probably Sunday.

@rmechler
Copy link
Author

rmechler commented Jan 4, 2018

Finally got a chance to test, sorry for the delay. Looks good. Ran a test that ~40 topics that previously sent the CPU to 100%, and now it goes to ~15% (with the patch). That's without setting fetch_min_bytes / fetch_max_wait_ms. If I set fetch_min_bytes=1000000 / fetch_max_wait_ms=2000, CPU goes to < 5%. If I set fetch_max_wait_ms=5000, CPU goes to < 3% (without the patch, this config is still at 25% CPU).

Note that I tested by applying the patch to a699f6a, because I couldn't get current HEAD to work properly, I keep getting an exception:

Traceback (most recent call last):
  File "./client_staging.py", line 53, in <module>
    results = consumer.poll(timeout_ms=10000, max_records=1)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 601, in poll
    records = self._poll_once(remaining, max_records)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 621, in _poll_once
    self._coordinator.poll()
  File "/usr/local/lib/python2.7/dist-packages/kafka/coordinator/consumer.py", line 271, in poll
    self.ensure_active_group()
  File "/usr/local/lib/python2.7/dist-packages/kafka/coordinator/base.py", line 401, in ensure_active_group
    if future.failed():
AttributeError: 'NoneType' object has no attribute 'failed'

The problem happens here:

                # after beginning the rebalance in the call to poll below.
                # This ensures that we do not mistakenly attempt to rejoin
                # before the pending rebalance has completed.
                if self.join_future is None:
                    self.state = MemberState.REBALANCING
                    self.join_future = self._send_join_group_request()

                    # handle join completion in the callback so that the
                    # callback will be invoked even if the consumer is woken up
                    # before finishing the rebalance
                    self.join_future.add_callback(self._handle_join_success)

                    # we handle failures below after the request finishes.
                    # If the join completes after having been woken up, the
                    # exception is ignored and we will rejoin
                    self.join_future.add_errback(self._handle_join_failure)

                future = self.join_future
                self._client.poll(future=future)

                if future.failed():
                    exception = future.exception

self.join_future.add_errback(self._handle_join_failure) actually ends up calling the errback immediately because it has an exception set (NodeNotReadyError: 4), and that results in self.join_future being set to None, thus future.failed() blows up.

@dpkp
Copy link
Owner

dpkp commented Jan 4, 2018

Great. I'm going to close this. I've filed the NoneType exception as a separate issue. Thanks for testing!!

@dpkp dpkp closed this as completed Jan 4, 2018
@dpkp
Copy link
Owner

dpkp commented Jan 4, 2018

Forgot that the PR hasn't landed yet. Will close when merged.

@dpkp dpkp reopened this Jan 4, 2018
@rmechler
Copy link
Author

rmechler commented Jan 4, 2018

Thanks, I probably should have opened a separate ticket myself. And thanks for getting a fix in for the CPU issue!

@dpkp dpkp closed this as completed Jan 11, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants