New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a concurrency model with ThreadPoolExecutor #5011
Conversation
I've been meaning to do this myself. I'll review this as soon as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR requires some adjustments before we can merge it. Some of them are outlined in the diff itself.
We also require:
- Unit tests to ensure this works correctly.
- A requirements file for Python 2.7 users in
requirements/extra/
is necessary. Please ensure to provide the appropriate version markers. - Documentation adjustments.
signal_safe = False | ||
|
||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super()
calls are only valid with Python 3.
Celery 4.x still supports Python 2.7 so we'll need to adjust that.
|
||
def on_stop(self): | ||
self.executor.shutdown() | ||
super().on_stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super()
calls are only valid with Python 3.
Celery 4.x still supports Python 2.7 so we'll need to adjust that.
def _get_info(self): | ||
return { | ||
'max-concurrency': self.limit, | ||
'threads': len(self.executor._threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of using private APIs. Can we change this to something more sensible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual number of threads in the executor is not accessible from a public API. So I can do 2 things: either modify concurrent.futures/thread.py to create one (modification to a core python lib) or remove de actual number of threads info in the concurrency model (ie remove line 39).
The former is not sure to be accepted by the PSF. I already have a pull request pending for Python 3.8, so I can try anyway ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you are correct. There's no public API for that.
I think a TODO comment to change that once it lands is sufficient.
Thank you for the review, I will apply your recommandation in the next week. |
@whuji Did you get a chance to work on this? |
Not yet. I am doing it right now. |
Hello,
I don't know if it would be useful to someone else, but I needed to implement a concurrency model based on thread (and not processes) in celery because I wanted to pass future objects between tasks and some coroutines and it is not pickable.
Of course it is not scale out and it has limitations (because of the GIL), but it can be useful for people who wants the share memory between tasks and another thread (for example the asyncio event loop) without blocking as the 'solo' concurrency model.
I am open to comment. Maybe this model won't be needed anymore in celery 5 as it could be replaced by an asyncio loop, which is not possible in celery 4.
And I would be happy to help on a massive asyncio refactoring for Celery 5.
Regards