Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(composition): add support for group completion callbacks #150

Merged
merged 6 commits into from
Feb 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion dramatiq/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import time
from uuid import uuid4

from .broker import get_broker
from .rate_limits import Barrier
from .results import ResultMissing


Expand Down Expand Up @@ -173,6 +175,7 @@ class group:
def __init__(self, children, *, broker=None):
self.children = list(children)
self.broker = broker or get_broker()
self.completion_callbacks = []

def __len__(self):
"""Returns the size of the group.
Expand All @@ -182,6 +185,21 @@ def __len__(self):
def __str__(self): # pragma: no cover
return "group([%s])" % ", ".join(str(c) for c in self.children)

def add_completion_callback(self, message):
"""Adds a completion callback to run once every job in this
group has completed. Each group may have multiple completion
callbacks.

Warning:
This functionality is dependent upon the GroupCallbacks
middleware. If that's not set up correctly, then calling
run after adding a callback will raise a RuntimeError.

Parameters:
message(Message)
"""
self.completion_callbacks.append(message.asdict())

@property
def completed(self):
"""Returns True when all the jobs in the group have been
Expand Down Expand Up @@ -227,7 +245,51 @@ def run(self, *, delay=None):
delay(int): The minimum amount of time, in milliseconds,
each message in the group should be delayed by.
"""
for child in self.children:
if self.completion_callbacks:
from .middleware.group_callbacks import GROUP_CALLBACK_BARRIER_TTL, GroupCallbacks

rate_limiter_backend = None
for middleware in self.broker.middleware:
if isinstance(middleware, GroupCallbacks):
rate_limiter_backend = middleware.rate_limiter_backend
break
else:
raise RuntimeError(
"GroupCallbacks middleware not found! Did you forget "
"to set it up? It is required if you want to use "
"group callbacks."
)

# Generate a new completion uuid on every run so that if a
# group is re-run, the barriers are all separate.
# Re-using a barrier's name is an unsafe operation.
completion_uuid = str(uuid4())
completion_barrier = Barrier(rate_limiter_backend, completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
completion_barrier.create(len(self.children))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While writing #144, I determined that while the first barrier in a chain of groups should be created immediately, I think that callback groups need to have their barriers created by the middleware. I recognize that your implementation doesn't, yet, support groups as callbacks, but I do think that you'll want to do that.

This is what I created the groups_starts option for. And the matching GroupStart data class, but that's an implementation detail, it can be whatever form it needs, but it needs to allow for signaling (1) which group to start, and (2) how many tasks are in the group that will start.

In order to support directly nested groups (still raises a NotImplementedError in your current implementation, but written in such a way to suggest to me that you may be intending to), you'll need to allow for starting multiple groups at once.

Given the other options you're adding in this PR, I'll suggest that this should be called group_completion_starts, and have a shape like this:

message.options['group_completion_starts'] = [
    {'uuid': 'the-first-uuid', 'count': 25},
    {'uuid': 'the-second-uuid', 'count': 50},
]


children = []
for child in self.children:
if isinstance(child, group):
raise NotImplementedError

elif isinstance(child, pipeline):
pipeline_children = child.messages[:]
pipeline_children[-1] = pipeline_children[-1].copy(options={
"group_completion_uuid": completion_uuid,
"group_completion_callbacks": self.completion_callbacks,
})

children.append(pipeline(pipeline_children, broker=child.broker))

else:
children.append(child.copy(options={
"group_completion_uuid": completion_uuid,
Copy link
Contributor

@ryanhiebert ryanhiebert Dec 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Groups of groups, unlike pipelines of pipelines, cannot be reduced, because there will a functional distinction when results are eventually considered, which I'm sure is an eventual goal of this feature, though I would try to leave it to a later iteration.

To that end, you'll need to make this property take many values, and because of that I'd recommend changing the name to group_completion_uuids, and having it be a list instead of a string. Even if you decide to punt on the group-of-groups feature for now and just leave it NotImplemented, I'd encourage changing the name so that we don't need to write compatiblity code that will need to handle multiple names in the future.

"group_completion_callbacks": self.completion_callbacks,
}))
else:
children = self.children

for child in children:
if isinstance(child, (group, pipeline)):
child.run(delay=delay)
else:
Expand Down
3 changes: 2 additions & 1 deletion dramatiq/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .age_limit import AgeLimit
from .callbacks import Callbacks
from .current_message import CurrentMessage
from .group_callbacks import GroupCallbacks
from .middleware import Middleware, MiddlewareError, SkipMessage
from .pipelines import Pipelines
from .prometheus import Prometheus
Expand All @@ -34,7 +35,7 @@
"Interrupt", "raise_thread_exception",

# Middlewares
"AgeLimit", "Callbacks", "CurrentMessage", "Pipelines", "Retries",
"AgeLimit", "Callbacks", "CurrentMessage", "GroupCallbacks", "Pipelines", "Retries",
"Shutdown", "ShutdownNotifications", "TimeLimit", "TimeLimitExceeded",
"Prometheus",
]
Expand Down
40 changes: 40 additions & 0 deletions dramatiq/middleware/group_callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# This file is a part of Dramatiq.
#
# Copyright (C) 2017,2018 CLEARTYPE SRL <bogdan@cleartype.io>
#
# Dramatiq is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# Dramatiq is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os

from ..rate_limits import Barrier
from .middleware import Middleware

GROUP_CALLBACK_BARRIER_TTL = int(os.getenv("dramatiq_group_callback_barrier_ttl", "86400000"))


class GroupCallbacks(Middleware):
def __init__(self, rate_limiter_backend):
self.rate_limiter_backend = rate_limiter_backend

def after_process_message(self, broker, message, *, result=None, exception=None):
from ..message import Message

if exception is None:
group_completion_uuid = message.options.get("group_completion_uuid")
group_completion_callbacks = message.options.get("group_completion_callbacks")
if group_completion_uuid and group_completion_callbacks:
barrier = Barrier(self.rate_limiter_backend, group_completion_uuid, ttl=GROUP_CALLBACK_BARRIER_TTL)
if barrier.wait(block=False):
for message in group_completion_callbacks:
broker.enqueue(Message(**message))
100 changes: 99 additions & 1 deletion tests/test_composition.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import time
from threading import Condition
from threading import Condition, Event

import pytest

import dramatiq
from dramatiq import group, middleware, pipeline
from dramatiq.middleware import GroupCallbacks
from dramatiq.results import Results, ResultTimeout


Expand Down Expand Up @@ -380,3 +381,100 @@ def add(x, y):
with pytest.raises(ResultTimeout):
for _ in pipe.get_results(block=True, timeout=300):
pass


def test_groups_can_have_completion_callbacks(stub_broker, stub_worker, rate_limiter_backend):
# Given that I have a rate limiter backend
# And I've added the GroupCallbacks middleware to my broker
stub_broker.add_middleware(GroupCallbacks(rate_limiter_backend))

do_nothing_times = []
finalize_times = []
finalized = Event()

@dramatiq.actor
def do_nothing():
do_nothing_times.append(time.monotonic())

@dramatiq.actor
def finalize(n):
assert n == 42
finalize_times.append(time.monotonic())
finalized.set()

# When I group together some messages with a completion callback
g = group(do_nothing.message() for n in range(5))
g.add_completion_callback(finalize.message(42))
g.run()

# And wait for the callback to be callled
finalized.wait(timeout=30)

# Then all the messages in the group should run
assert len(do_nothing_times) == 5

# And the callback
assert len(finalize_times) == 1

# And the callback should run after all the messages
assert sorted(do_nothing_times)[-1] <= finalize_times[0]


def test_groups_with_completion_callbacks_fail_unless_group_callbacks_is_set_up(stub_broker, stub_worker):
# Given that I haven't set up GroupCallbacks
@dramatiq.actor
def do_nothing():
pass

@dramatiq.actor
def finalize(n):
pass

# When I group together some messages with a completion callback
g = group(do_nothing.message() for n in range(5))
g.add_completion_callback(finalize.message(42))

# And run the group
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
g.run()


def test_groups_of_pipelines_can_have_completion_callbacks(stub_broker, stub_worker, rate_limiter_backend):
# Given that I have a rate limiter backend
# And I've added the GroupCallbacks middleware to my broker
stub_broker.add_middleware(GroupCallbacks(rate_limiter_backend))

do_nothing_times = []
finalize_times = []
finalized = Event()

@dramatiq.actor
def do_nothing(_):
do_nothing_times.append(time.monotonic())

@dramatiq.actor
def finalize(n):
assert n == 42
finalize_times.append(time.monotonic())
finalized.set()

# When I group together some messages with a completion callback
g = group([
do_nothing.message(1) | do_nothing.message(),
do_nothing.message(1)
])
g.add_completion_callback(finalize.message(42))
g.run()

# And wait for the callback to be callled
finalized.wait(timeout=30)

# Then all the messages in the group should run
assert len(do_nothing_times) == 3

# And the callback
assert len(finalize_times) == 1

# And the callback should run after all the messages
assert sorted(do_nothing_times)[-1] <= finalize_times[0]