Skip to content

Commit

Permalink
Fix backward compability making metrics explicit.
Browse files Browse the repository at this point in the history
  • Loading branch information
debonzi committed Nov 9, 2019
1 parent 641ffe7 commit 5674994
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 13 deletions.
33 changes: 22 additions & 11 deletions crossover/__init__.py
Expand Up @@ -36,13 +36,13 @@ class CrossoverRouter(celery.Task):

def run(self, *args, **kwargs):
metrics = CrossoverMetrics.load(kwargs)
metrics.set_remote_time()
if metrics:
metrics.set_remote_time()
metrics_subscribe.call_subscribers(metrics)

app = celery.current_app if celery.VERSION < self.CELERY_4_VERSION else self.app
task_name = kwargs.pop('task_name')

metrics_subscribe.call_subscribers(metrics)

logger.debug('Got Crossover task: {}'.format(task_name))
_task = app.tasks.get(task_name)
if not _task:
Expand Down Expand Up @@ -73,15 +73,16 @@ def register_router(celery_app):
_register_celery_4(celery_app=celery_app, queue=queue)


def _build_callback(task):
def _build_callback(task, bind_metrics):
if not hasattr(task.app.conf, 'broker_url'): # Celery 3
broker_url = task.app.conf.BROKER_URL
else: # Celery 4
broker_url = task.app.conf.broker_url

return {
'broker': broker_url,
'task': task.name
'task': task.name,
'bind_metrics': bind_metrics,
}


Expand Down Expand Up @@ -111,10 +112,15 @@ class CallBack(object):
def __init__(self, callback_data):
self.requester = None
if callback_data:
self.requester = _Requester(callback_data.get('broker'), callback_data.get('task'))
self.requester = _Requester(
callback_data.get('broker'),
callback_data.get('task'),
)
self.bind_metrics = callback_data.get('bind_metrics', False)

def __call__(self, *args, **kwargs):
if self.requester:
kwargs.update({'bind_metrics': self.bind_metrics})
self.requester(*args, **kwargs)


Expand All @@ -124,14 +130,19 @@ def __init__(self, broker, remote_task_name, task=CROSSOVER_ROUTER_NAME, queue=C
self.remote_task_name = remote_task_name

def __call__(self, *args, **kwargs):
metrics = CrossoverMetrics(task_name=self.remote_task_name)
metrics.set_origin_time()
bind_metrics = kwargs.pop('bind_metrics', False)

kwargs['task_name'] = self.remote_task_name
if 'callback' in kwargs:
kwargs['callback'] = _build_callback(kwargs['callback'])

kwargs['metrics'] = metrics.dump()
kwargs['callback'] = _build_callback(
task=kwargs['callback'],
bind_metrics=bind_metrics,
)

if bind_metrics:
metrics = CrossoverMetrics(task_name=self.remote_task_name)
metrics.set_origin_time()
kwargs['metrics'] = metrics.dump()

requests.post(self.url, json=kwargs)

Expand Down
6 changes: 6 additions & 0 deletions examples/project_2/project.py
Expand Up @@ -28,3 +28,9 @@ def plus_callback(result):
def times_callback(result):
logger.info('Got Multiplication callback = {0}'.format(result))
redis.db.set('times_callback', result)


@crossover.metrics_subscribe()
def metrics_subscriber(metrics):
redis.db.set('callback_queue_time', metrics.dispatch_queue_time)
redis.db.set('callback_task_name', metrics.task_name)
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -38,7 +38,7 @@ def long_desc_img_replacer(long_desc):


setup(name='celery-crossover',
version='1.1.12',
version='1.1.13',
description='Celery Crossover aims to make it really easy to execute tasks in another service.',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
51 changes: 50 additions & 1 deletion tests/test_crossover.py
Expand Up @@ -24,11 +24,33 @@ def test_auto_callback(worker_1, worker_2, p1_client, redis):
p1_client.plus(x=340, y=210, callback=plus_callback)
assert redis.get('plus_callback') == b'550'


def test_auto_callback_metrics(worker_1, worker_2, p1_client, redis):
"""
P2 call P1 `plus` task with P2 `plus_callback` as P1 `plus` callback.
p1 `plus` has @crossover.callback(auto_callback=True) decorator.
P2 p1_client.plus -> P1
P2 plus_callback <- P1
P2 stores the value received by plus_callback into redis
so we can check its value
"""
from examples.project_2.project import plus_callback

p1_client.plus(x=340, y=210, callback=plus_callback, bind_metrics=True)
assert redis.get('plus_callback') == b'550'

dispatch_queue_time = redis.get('dispatch_queue_time')
assert dispatch_queue_time is not None
assert isinstance(float(dispatch_queue_time), float)
assert redis.get('task_name') == b'plus'

callback_queue_time = redis.get('callback_queue_time')
assert callback_queue_time is not None
assert isinstance(float(callback_queue_time), float)
assert redis.get('callback_task_name') == b'plus_callback'


def test_callback_meta(worker_1, worker_2, p1_client, redis):
"""
Expand All @@ -48,10 +70,37 @@ def test_callback_meta(worker_1, worker_2, p1_client, redis):
"""
from examples.project_2.project import times_callback

p1_client.times(x=340, y=210, callback=times_callback)
p1_client.times(x=340, y=210, callback=times_callback, bind_metrics=True)
assert redis.get('times_callback') == b'71400'


def test_callback_meta_metrics(worker_1, worker_2, p1_client, redis):
"""
P2 call P1 `times` task with P2 `times_callback` as P1 `times` callback.
P1 `times` has @crossover.callback(bind_callback_meta=True) decorator and
will get `callback_meta` as first argument.
P1 `times` will call P1 `calculate_times` task passing `callback_meta`.
P1 `calculate_times` will instantiate a crossover.CallBack object and
send the result to P2 `times_callback`
P2 p1_client.plus -> P1 (plus) -> P1 (calculate_times)
P2 times_callback <----------------- P1
P2 stores the value received by times_callback into redis
so we can check its value
"""
from examples.project_2.project import times_callback

p1_client.times(x=340, y=210, callback=times_callback, bind_metrics=True)
assert redis.get('times_callback') == b'71400'

dispatch_queue_time = redis.get('dispatch_queue_time')
assert dispatch_queue_time is not None
assert isinstance(float(dispatch_queue_time), float)
assert redis.get('task_name') == b'times'

callback_queue_time = redis.get('callback_queue_time')
assert callback_queue_time is not None
assert isinstance(float(callback_queue_time), float)
assert redis.get('callback_task_name') == b'times_callback'

0 comments on commit 5674994

Please sign in to comment.