diff --git a/celery/tests/worker/test_loops.py b/celery/tests/worker/test_loops.py index 00c5d960f16..00043a03d19 100644 --- a/celery/tests/worker/test_loops.py +++ b/celery/tests/worker/test_loops.py @@ -119,6 +119,10 @@ def add(x, y): return x + y self.add = add + def test_drain_after_consume(self): + x, _ = get_task_callback(self.app) + x.connection.drain_events.assert_called_with() + def test_setup_heartbeat(self): x = X(self.app, heartbeat=10) x.hub.call_repeatedly = Mock(name='x.hub.call_repeatedly()') diff --git a/celery/worker/loops.py b/celery/worker/loops.py index 08bba315608..6ba3be258cd 100644 --- a/celery/worker/loops.py +++ b/celery/worker/loops.py @@ -47,6 +47,11 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos, if not obj.restart_count and not obj.pool.did_start_ok(): raise WorkerLostError('Could not start worker processes') + # consumer.consume() may have prefetched up to our + # limit - drain an event so we are in a clean state + # prior to starting our event loop. + connection.drain_events() + # FIXME: Use loop.run_forever # Tried and works, but no time to test properly before release. hub.propagate_errors = errors