-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add requeueing in background #12
base: master
Are you sure you want to change the base?
Conversation
Is it WIP? |
Also please update a README file. Ideally show some usage example there. |
@@ -47,6 +47,5 @@ performed during each such call. | |||
be renamed into `put_nowait`. | |||
- `get_multi` and `put_multi` methods, allowing getting and putting multiple | |||
items from queue with one call | |||
- method for periodical requeueing of not acknowledged tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add at least minimal usage example
|
||
|
||
class Stopped(Exception): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a newline
@@ -82,6 +97,9 @@ def _put_pipe(self, task_id, task_payload): | |||
) | |||
|
|||
def put(self, task, method='lua'): | |||
if self._is_stopped: | |||
raise exceptions.Stopped |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an error: you are rising an instance of type type
. (Pylint should be able to find this)
@@ -145,12 +169,18 @@ def _ack_pipe(self, task_id): | |||
) | |||
|
|||
def _ack(self, task_id, method='multi'): | |||
if self._is_stopped: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one is excessive
return await self._redis.evalsha( | ||
self._lua_sha['requeue'], | ||
keys=[self._keys['ack'], self._keys['queue']], | ||
args=[before], | ||
) | ||
|
||
def stop(self): | ||
if self._is_stopped: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that one too (it is OK to call .stop()
multiple times)
|
||
if self._requeue_interval != 0: | ||
self._regular_requeue_task = \ | ||
self._loop.create_task(self._requeue_regularly()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.
Creating a task in constructor doesn't feel right. Especially if we have to call .stop()
afterwords (we did't .start()
or .run()
anything, right?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we call __init__
it is not guaranteed that loop
is activated and redis
has connection (although it wouldn't be a very popular use case).
So we have three options:
- start periodic task only when some other method called,
requeue_interval != 0
and correspondingfuture
isNone
(not really a great idea but will kind of work) - Wrap everything into
__aenter__
and__aexit__
so to use an instance one will have to call create it withasync with
. It is unclear whether we should or should not support calls outsideasync with
block. - Provide
__aenter__
and__aexit__
only for periodic task. This is a little bit more flexible option for user as periodic task can be stopped and started multiple times. But it is kind of harder to implement: We need to provide separate class (asynchronous context manager which would implement those magic methods) for this particular case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably the easiest way for now is to provide run_periodic()
method that will have requeue_interval
argument. Corresponding stop_periodic()
will have obvious semantics. Your more general .stop()
method will have to go to a different pull request (or to /dev/null
)
No description provided.