diff --git a/celery/worker/consumer/events.py b/celery/worker/consumer/events.py index d7b9f003930..ee7bcecb890 100644 --- a/celery/worker/consumer/events.py +++ b/celery/worker/consumer/events.py @@ -29,6 +29,7 @@ def __init__(self, c, not without_gossip or not without_heartbeat ) + self.enabled = self.send_events c.event_dispatcher = None super(Events, self).__init__(c, **kwargs) diff --git a/celery/worker/strategy.py b/celery/worker/strategy.py index 1abad835542..6eee1235a0f 100644 --- a/celery/worker/strategy.py +++ b/celery/worker/strategy.py @@ -116,7 +116,7 @@ def default(task, app, consumer, # (optimized to avoid calling request.send_event) eventer = consumer.event_dispatcher events = eventer and eventer.enabled - send_event = eventer.send + send_event = eventer and eventer.send task_sends_events = events and task.send_events call_at = consumer.timer.call_at diff --git a/t/unit/bin/test_worker.py b/t/unit/bin/test_worker.py index 03978d0c7db..fe992f7fe5e 100644 --- a/t/unit/bin/test_worker.py +++ b/t/unit/bin/test_worker.py @@ -70,6 +70,7 @@ def test_run_from_argv_basic(self): def run(*args, **kwargs): pass + x.run = run x.run_from_argv('celery', []) x.maybe_detach.assert_called() @@ -210,10 +211,10 @@ def test_init_queues(self): assert 'celery' not in app.amqp.queues.consume_from c.task_create_missing_queues = False - del(app.amqp.queues) + del (app.amqp.queues) with pytest.raises(ImproperlyConfigured): self.Worker(app=self.app).setup_queues(['image']) - del(app.amqp.queues) + del (app.amqp.queues) c.task_create_missing_queues = True worker = self.Worker(app=self.app) worker.setup_queues(['image']) @@ -374,6 +375,20 @@ def on_worker_ready(**kwargs): self.Worker(app=self.app).on_consumer_ready(object()) assert worker_ready_sent[0] + def test_disable_task_events(self): + worker = self.Worker(app=self.app, task_events=False, + without_gossip=True, + without_heartbeat=True) + consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps + assert not any(True for step in consumer_steps + if step.alias == 'Events') + + def test_enable_task_events(self): + worker = self.Worker(app=self.app, task_events=True) + consumer_steps = worker.blueprint.steps['celery.worker.components.Consumer'].obj.steps + assert any(True for step in consumer_steps + if step.alias == 'Events') + @mock.stdouts class test_funs: @@ -422,7 +437,6 @@ def test_main(self): @mock.stdouts class test_signal_handlers: - class _Worker(object): hostname = 'foo' stopped = False