Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer.flush make celery hang #1098

Closed
starplanet opened this issue May 7, 2017 · 7 comments
Closed

producer.flush make celery hang #1098

starplanet opened this issue May 7, 2017 · 7 comments

Comments

@starplanet
Copy link

@starplanet starplanet commented May 7, 2017

The following is my test code:

from celery import Celery
from kafka import KafkaProducer

app = Celery('test', broker='redis://127.0.0.1:6379/0')

producer = KafkaProducer(bootstrap_servers=['172.16.24.45:9092', '172.16.24.44:9092'])

@app.task
def send_msg():
    # producer = KafkaProducer(bootstrap_servers=['172.16.24.45:9092', '172.16.24.44:9092'])
    for i in range(10):
        producer.send('test', b'this is the %dth test message' % i)
    producer.flush()


if __name__ == '__main__':
        app.start()

I use the following command to start worker:

celery -A app worker -l debug

then I enter python command line to send task:

from app import *
send_msg.delay()

If I use global producer variable, when I call send_msg.delay(), celery worker will hang and wait for producer flush, it will never end. But If I use local producer variable which is commented in above code, celery worker will work well.

I want to use global producer because It will work more efficient than local and not frequently create and close connections with kafka brokers. But how can I fix this problem?

Please help me and thanks.

@dpkp

This comment has been minimized.

Copy link
Owner

@dpkp dpkp commented May 7, 2017

@starplanet

This comment has been minimized.

Copy link
Author

@starplanet starplanet commented May 7, 2017

Thanks for your reply. Can you tell me why? Is there a thread lock problem?

@tvoinarovskyi

This comment has been minimized.

Copy link
Collaborator

@tvoinarovskyi tvoinarovskyi commented May 7, 2017

@starplanet Celery can be run with different workers. You will need to understand those.

  • Default is probably process workers, and kafka-python is not safe to use with multiprocessing.
  • If you use thread workers, most likely your code will work as expected.
  • If you use tasklet or other async workers, it will work, but not as expected.

I would recommend you to create a producer globaly, but do it in a ''worker init'' function. Look up the startup hooks for celery in docs.

@tvoinarovskyi

This comment has been minimized.

Copy link
Collaborator

@tvoinarovskyi tvoinarovskyi commented May 7, 2017

@starplanet Ok, checked the docs for celery, it does not have the threaded worker mode, it's called solo, and is not what you'd want (cause it's basically a single thread).

As for the worker init, something like this should do the trick: http://docs.celeryproject.org/en/latest/userguide/signals.html?highlight=signals#worker-process-init.
I'm not a celery expert, but the idea is to have 1 Producer per process, as you can't share those with multiprocessing (which is used by celery to spawn the worker pool).

@starplanet

This comment has been minimized.

Copy link
Author

@starplanet starplanet commented May 8, 2017

Thanks a lot, I will try worker-process-init signal and ask for help to celery.

@starplanet starplanet mentioned this issue May 8, 2017
1 of 2 tasks complete
@jeffwidman jeffwidman closed this May 8, 2017
@gzeronet

This comment has been minimized.

Copy link

@gzeronet gzeronet commented Jan 9, 2018

Same issue with multiprocessing, gevent works for me. Thx.

@sharkyze

This comment has been minimized.

Copy link

@sharkyze sharkyze commented Oct 16, 2019

hey @gzeronet we are having the same issue where the KafkaProducer is not working with multiprocessing, could you please share some resources on how you solved this with gevent ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.