Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer memory leak with delivery.report.only.error #84

Closed
fillest opened this issue Nov 24, 2016 · 13 comments
Closed

Producer memory leak with delivery.report.only.error #84

fillest opened this issue Nov 24, 2016 · 13 comments
Labels

Comments

@fillest
Copy link

fillest commented Nov 24, 2016

confluent-kafka==0.9.2, librdkafka 0.9.2, Python 2.7.6, Ubuntu 14.04

I run a web app in 16 gunicorn workers (processes). Each has a producer, which works continually until the process is restarted (normally by deploy system). This instance serves ~100-250 req/s (some part of which produce to kafka) and has ~3.8gb or memory. After switching to confluent-kafka and running it for some time in production I'm observing worker RSS memory only growing and growing (it was not the case before with kafka-python). Here is a screenshot of memory monitoring graph:
mem monitoring
The code is roughly like this (so I poll(0) after each produce):

def error_cb (err):
    logger.error('kafka producer error_cb: {}: %s'.format(err.name()), err.str())
def on_delivery (err, msg):
    assert err
    logger.error('kafka producer on_delivery {}'.format(err.name()))
_producer = confluent_kafka.Producer(**{
    'client.id': 'qwe_producer',
    'bootstrap.servers': bootstrap_servers, #3 brokers
    'log.connection.close': False,
    'api.version.request': True,

    'queue.buffering.max.messages': 100000,
    'queue.buffering.max.kbytes': 4000000,
    'queue.buffering.max.ms': 1000,
    'message.send.max.retries': 9000,
    'batch.num.messages': 10000,
    'delivery.report.only.error': True,

    'default.topic.config': {
        'request.required.acks': -1,
    },
    'error_cb': error_cb,
    'on_delivery': on_delivery,
})
atexit.register(_producer.flush)

def kafka_async_send (topic, value):
    _producer.produce(topic, cPickle.dumps(value, 2))
    _producer.poll(0)

def some_request_handler (request):
    #...
    kafka_async_send('foo', ('some', 'stuff'))
    #...

What should I do now? Do you probably have a suggestion how to debug it? (preferably right in production - it is kinda testing production so any overhead is currently affordable)

@fillest fillest changed the title Memory leak Producer memory leak Nov 24, 2016
@ewencp
Copy link
Contributor

ewencp commented Nov 25, 2016

@fillest This gets tricky because the issue could be in Python or C, or even due to some bad interaction between the two. You can find a few strategies for starting to pin down heap issues on the Python side here: http://chase-seibert.github.io/blog/2013/08/03/diagnosing-memory-leaks-python.html They'll probably require some code changes & a redploy.

Probably the most interesting thing to do initially would be to use heappy to get a diff over a substantial amount of time (e.g. at least a few minutes) and then find out what in that diff is eating the most memory. If the diff looks too small to account for the difference then it would suggest we're missing cleaning something up on the C side.

Of course the more relevant stats you can get for us, the better.

@edenhill
Copy link
Contributor

edenhill commented Nov 26, 2016

Could you monitor len(_producer) to see if messages are stuck in the underlying C library?

@fillest
Copy link
Author

fillest commented Nov 27, 2016

I've added a thread printing len(_producer) each 5 minutes and left it for an evening and a night. Nope, numbers look normal, they fluctuate in a small range.

I will try to debug using memory tools (thanks for a link) later (somewhere in december, I think) cause I don't have enough time now unfortunately. As a band-aid I'm turning on auto-restarting workers after each N requests for now.

@edenhill
Copy link
Contributor

Thank you for taking the time to investigate this.

@edenhill
Copy link
Contributor

Any progress on this @fillest?

@mayurjpatel
Copy link

I ran into this leak today as well. I created a test producer program that sends 20k messages to a kafka topic and waits for it to be flushed. Running the test program under a debug python build and valgrind helped me pinpoint the leak.

Version of confluent-kafka-python: 0.9.1.2

==14139== 479,928 bytes in 19,997 blocks are definitely lost in loss record 517 of 518
==14139==    at 0x4C277E3: calloc (vg_replace_malloc.c:623)
==14139==    by 0x7A83A1E: Producer_msgstate_new (Producer.c:67)
==14139==    by 0x7A84496: Producer_produce (Producer.c:281)
==14139==    by 0x4891FB: PyCFunction_Call (methodobject.c:85)
==14139==    by 0x529760: do_call (ceval.c:4236)
==14139==    by 0x528B3A: call_function (ceval.c:4044)
==14139==    by 0x523732: PyEval_EvalFrameEx (ceval.c:2666)
==14139==    by 0x528E14: fast_function (ceval.c:4107)
==14139==    by 0x528B1E: call_function (ceval.c:4042)
==14139==    by 0x523732: PyEval_EvalFrameEx (ceval.c:2666)
==14139==    by 0x526120: PyEval_EvalCodeEx (ceval.c:3253)
==14139==    by 0x51C18D: PyEval_EvalCode (ceval.c:667)

It looks like msgstate object in Producer_produce call is never getting freed up in success case. Calling Producer_msgstate_destroy just before returning seems to eliminate the leak.

msgstate is a struct of 3 pointers. On a 64-bit machine, that translates to a leak of 24 bytes per successful produce. This matches the valgrind report of 479,928 bytes lost over 19,997 blocks (479,928/19,997 == 24).

@edenhill
Copy link
Contributor

Ah, I think I know what the problem is:
'delivery.report.only.error': True,

Since a msgstate is allocated for each produce it needs to be freed in the delivery report handler, but setting delivery.report.only.error to True means that the delivery report handler isn't called for succesfully delivered messages, just as you say, so that means those msgstates will leak.

There is no obvious fix for this so I must advise against the use of delivery.report.only.error and have your application check error status in the delivery report handler instead.

@edenhill edenhill changed the title Producer memory leak Producer memory leak with delivery.report.only.error Feb 15, 2017
@edenhill edenhill added bug and removed wait info labels Feb 15, 2017
@mayurjpatel
Copy link

You are right, I am indeed setting delivery.report.only.error to True in my producer configs. Who decides to skip the callback in success case? Is it done at librdkafka layer or at confluent-kafka-python layer?

I kinda like having that flag because it makes callback implementation in Python easy. Does the throughput drop if the delivery report handler is called on each message instead of only failed messages?

@edenhill
Copy link
Contributor

It is librdkafka skipping the callback, so the Python code really doesnt have any means of freeing its msgstate, there isnt really a viable solution for this so we'll just have to disalow using delivery.report.only.error on Python or abstract the functionality to filter out succesfull DRs in the Python bindings instead.

I dont think performance is a big concern but you'll have to try it to see, performance is really hard to talk generally about since it depends on so many environment specific things.

@ewencp
Copy link
Contributor

ewencp commented Feb 26, 2017

@edenhill Seems like we don't actually have to disallow it. Rather, we might want to intercept this setting, force our own callback that delegates to the user callback in the case of error, and in other cases handles cleaning up the message state.

We might want to avoid that since it's somewhat misleading (we will be executing a callback for every message, regardless of there being an error or not). If so, we should explicitly disable this option for python, but if we think its still useful even if we have some small (C callback) overhead, we might want to use the approach I described.

@edenhill
Copy link
Contributor

@ewencp Spot on! See #140 :)

edenhill added a commit that referenced this issue Feb 28, 2017
Handle delivery.report.only.error in Python (#84)
@edenhill
Copy link
Contributor

This is fixed on master, can you give it a try @fillest?

@edenhill edenhill closed this as completed Mar 2, 2017
anguenot added a commit to ilanddev/syslogng_kafka that referenced this issue Jul 4, 2017
Bug in 'confluent-kafka': confluentinc/confluent-kafka-python#84

Let's revisit when 0.11 is released.
@fillest
Copy link
Author

fillest commented Jul 26, 2018

This is fixed on master, can you give it a try @fillest?

Sorry, nope, we've switched from Python to Go

@oranb oranb mentioned this issue Feb 20, 2019
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants