Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected task delay w/ Redis + acks_late #4378

Open
jsjohns opened this Issue Nov 10, 2017 · 6 comments

Comments

Projects
None yet
6 participants
@jsjohns
Copy link

jsjohns commented Nov 10, 2017

There is a ~2s delay between execution of tasks when Celery is configured to use Redis as the broker and acks_late is enabled.

This behavior disappears when either:

  • task_acks_late is disabled
  • AMQP is used as the transport
  • The line timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 1e3) in kombu.utils.eventio._poll.poll is changed to timeout = 0 if timeout and timeout < 0 else round((timeout or 0) * 10) (i.e. poll() timeout reduced)

I expected that the task execution delay would not be significantly influenced by the choice of broker or the value of task_acks_late.

This behavior has been tested on macOS 10.13.1 and Ubuntu 16.04 with Celery 4.1.0 and Redis 4.0.1. It can be reproduced with the following scripts:

request.py:

from celery import group
from worker import noop

tasks = []
for i in range(5):
  tasks.append(noop.s(i))

group(tasks).apply_async()

worker.py:

from celery import Celery
import settings

app = Celery()
app.config_from_object(settings)

@app.task(bind=True)
def noop(self, input):
    return "noop: %s" % input

settings.py:

broker_url = "redis://"
worker_prefetch_multiplier = 1
task_acks_late = True

The output of a run exhibiting the bad behavior follows (with a few debugging statements added). Notice the timestamps of the lines beginning with "Received task"):

$ celery worker -A worker -l info -c 1 
 
 -------------- celery@io v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-17.2.0-x86_64-i386-64bit 2017-11-10 15:57:38
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x108b71b70
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . worker.noop

[2017-11-10 15:57:38,397: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-11-10 15:57:38,406: INFO/MainProcess] mingle: searching for neighbors
[2017-11-10 15:57:39,427: INFO/MainProcess] mingle: all alone
[2017-11-10 15:57:39,435: INFO/MainProcess] celery@io ready.
[2017-11-10 15:57:39,436: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R, on_result_readable(5)(5)->R, on_readable(15)(15)->R, on_readable(18)(18)->R
[2017-11-10 15:57:40,724: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:40,882: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:40,883: INFO/MainProcess] Received task: worker.noop[950a3898-1eac-42a2-bf4d-e17916b4dbe6]  
[2017-11-10 15:57:40,884: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 15:57:40,884: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:40,885: INFO/ForkPoolWorker-1] Task worker.noop[950a3898-1eac-42a2-bf4d-e17916b4dbe6] succeeded in 0.0005565830001614813s: 'noop: 0'
[2017-11-10 15:57:40,886: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:40,886: WARNING/MainProcess] -> acknowledge()
[2017-11-10 15:57:40,886: WARNING/MainProcess] <- acknowledge()
[2017-11-10 15:57:41,432: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:41,434: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R, on_readable(15)(15)->R
[2017-11-10 15:57:41,435: INFO/MainProcess] Received task: worker.noop[655bacca-03fc-4bc5-93aa-28cc47683705]  
[2017-11-10 15:57:41,435: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 15:57:41,436: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:41,436: INFO/ForkPoolWorker-1] Task worker.noop[655bacca-03fc-4bc5-93aa-28cc47683705] succeeded in 8.905499998945743e-05s: 'noop: 1'
[2017-11-10 15:57:41,437: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:41,437: WARNING/MainProcess] -> acknowledge()
[2017-11-10 15:57:41,437: WARNING/MainProcess] <- acknowledge()
[2017-11-10 15:57:43,434: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:43,436: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R, on_readable(15)(15)->R
[2017-11-10 15:57:43,437: INFO/MainProcess] Received task: worker.noop[4f1036b3-288d-46e3-8465-76101e8667f0]  
[2017-11-10 15:57:43,438: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 15:57:43,438: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:43,438: INFO/ForkPoolWorker-1] Task worker.noop[4f1036b3-288d-46e3-8465-76101e8667f0] succeeded in 0.00012072099980287021s: 'noop: 2'
[2017-11-10 15:57:43,439: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:43,439: WARNING/MainProcess] -> acknowledge()
[2017-11-10 15:57:43,439: WARNING/MainProcess] <- acknowledge()
[2017-11-10 15:57:44,428: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:44,429: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:44,430: INFO/MainProcess] Received task: worker.noop[58c1635d-2460-45e8-bd65-3b693b1234e0]  
[2017-11-10 15:57:44,431: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 15:57:44,431: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:44,431: INFO/ForkPoolWorker-1] Task worker.noop[58c1635d-2460-45e8-bd65-3b693b1234e0] succeeded in 0.00012039100010952097s: 'noop: 3'
[2017-11-10 15:57:44,432: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:44,432: WARNING/MainProcess] -> acknowledge()
[2017-11-10 15:57:44,432: WARNING/MainProcess] <- acknowledge()
[2017-11-10 15:57:44,435: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:44,436: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:44,437: INFO/MainProcess] Received task: worker.noop[0adbc12e-d7aa-46fc-8ac3-f077560ff132]  
[2017-11-10 15:57:44,438: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 15:57:44,438: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:44,438: INFO/ForkPoolWorker-1] Task worker.noop[0adbc12e-d7aa-46fc-8ac3-f077560ff132] succeeded in 9.108599988394417e-05s: 'noop: 4'
[2017-11-10 15:57:44,439: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 15:57:44,439: WARNING/MainProcess] -> acknowledge()
[2017-11-10 15:57:44,439: WARNING/MainProcess] <- acknowledge()
[2017-11-10 15:57:45,440: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:45,441: WARNING/MainProcess] [EVENTS]: on_readable(15)(15)->R
[2017-11-10 15:57:46,471: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:47,446: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:47,447: WARNING/MainProcess] [EVENTS]: on_readable(15)(15)->R
[2017-11-10 15:57:47,794: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:48,398: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:48,816: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 15:57:49,433: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:49,437: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:49,448: WARNING/MainProcess] [EVENTS]:
[2017-11-10 15:57:49,449: WARNING/MainProcess] [EVENTS]: on_readable(15)(15)->R

With the following settings.py (task_acks_late disabled)...

broker_url = "redis://"
worker_prefetch_multiplier = 1
task_acks_late = False

...the behavior is as follows (good):

$ celery worker -A worker -l info -c 1 
 
 -------------- celery@io v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-17.2.0-x86_64-i386-64bit 2017-11-10 16:02:21
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x10f222be0
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . worker.noop

[2017-11-10 16:02:21,817: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-11-10 16:02:21,826: INFO/MainProcess] mingle: searching for neighbors
[2017-11-10 16:02:22,849: INFO/MainProcess] mingle: all alone
[2017-11-10 16:02:22,857: INFO/MainProcess] celery@io ready.
[2017-11-10 16:02:22,858: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R, on_readable(18)(18)->R, on_readable(16)(16)->R
[2017-11-10 16:02:23,010: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:23,829: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:02:23,831: WARNING/MainProcess] [EVENTS]: on_readable(18)(18)->R
[2017-11-10 16:02:24,345: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:24,471: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:24,472: INFO/MainProcess] Received task: worker.noop[8fce37e5-2faa-4c09-a213-b6fd4392e6e7]  
[2017-11-10 16:02:24,473: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:02:24,474: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,474: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:02:24,474: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:02:24,475: INFO/ForkPoolWorker-1] Task worker.noop[8fce37e5-2faa-4c09-a213-b6fd4392e6e7] succeeded in 0.0006347779999487102s: 'noop: 0'
[2017-11-10 16:02:24,476: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,476: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:24,476: INFO/MainProcess] Received task: worker.noop[50aa07bf-59ce-4aa3-83f1-a175be3645d6]  
[2017-11-10 16:02:24,477: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:02:24,477: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,477: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:02:24,477: INFO/ForkPoolWorker-1] Task worker.noop[50aa07bf-59ce-4aa3-83f1-a175be3645d6] succeeded in 5.139399945619516e-05s: 'noop: 1'
[2017-11-10 16:02:24,477: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:02:24,478: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,478: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:24,479: INFO/MainProcess] Received task: worker.noop[4b15600c-73af-4050-bd70-cf3275837663]  
[2017-11-10 16:02:24,479: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:02:24,479: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,479: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:02:24,479: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:02:24,479: INFO/ForkPoolWorker-1] Task worker.noop[4b15600c-73af-4050-bd70-cf3275837663] succeeded in 7.287799962796271e-05s: 'noop: 2'
[2017-11-10 16:02:24,480: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,480: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:24,481: INFO/MainProcess] Received task: worker.noop[2af967de-e6e1-4a18-a124-433ec831f6c0]  
[2017-11-10 16:02:24,481: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:02:24,481: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,481: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:02:24,481: INFO/ForkPoolWorker-1] Task worker.noop[2af967de-e6e1-4a18-a124-433ec831f6c0] succeeded in 4.8488000174984336e-05s: 'noop: 3'
[2017-11-10 16:02:24,481: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:02:24,482: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,482: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:24,483: INFO/MainProcess] Received task: worker.noop[fdd33e95-ab15-41cc-ae18-3fd7de2d4c22]  
[2017-11-10 16:02:24,483: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:02:24,483: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:24,483: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:02:24,483: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:02:24,483: INFO/ForkPoolWorker-1] Task worker.noop[fdd33e95-ab15-41cc-ae18-3fd7de2d4c22] succeeded in 5.3837999985262286e-05s: 'noop: 4'
[2017-11-10 16:02:24,484: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:02:25,762: WARNING/MainProcess] [EVENTS]: on_readable(8)(8)->R
[2017-11-10 16:02:25,834: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:02:25,836: WARNING/MainProcess] [EVENTS]: on_readable(18)(18)->R

With the following settings.py (AMPQ transport)...

broker_url = "ampq://"
worker_prefetch_multiplier = 1
task_acks_late = True

...the behavior is as follows (also good):

$ celery worker -A worker -l info -c 1 
 
 -------------- celery@io v4.1.0 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-17.2.0-x86_64-i386-64bit 2017-11-10 16:01:05
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x10ca54ba8
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . worker.noop

[2017-11-10 16:01:05,908: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2017-11-10 16:01:05,918: INFO/MainProcess] mingle: searching for neighbors
[2017-11-10 16:01:06,942: INFO/MainProcess] mingle: all alone
[2017-11-10 16:01:06,956: INFO/MainProcess] celery@io ready.
[2017-11-10 16:01:07,061: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:08,959: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:08,961: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:10,961: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:10,963: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:11,948: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:11,958: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:12,968: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:12,970: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:13,669: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:13,669: INFO/MainProcess] Received task: worker.noop[61d56c61-3826-4eae-9ed5-deb21dda7873]  
[2017-11-10 16:01:13,670: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:01:13,670: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,671: INFO/ForkPoolWorker-1] Task worker.noop[61d56c61-3826-4eae-9ed5-deb21dda7873] succeeded in 0.0006549120007548481s: 'noop: 0'
[2017-11-10 16:01:13,672: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,672: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:01:13,672: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:01:13,673: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:13,673: INFO/MainProcess] Received task: worker.noop[78f0e1ca-aaf9-49e9-9e2a-4eb76ebe69d5]  
[2017-11-10 16:01:13,674: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:01:13,674: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,674: INFO/ForkPoolWorker-1] Task worker.noop[78f0e1ca-aaf9-49e9-9e2a-4eb76ebe69d5] succeeded in 6.282100002863444e-05s: 'noop: 1'
[2017-11-10 16:01:13,674: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,674: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:01:13,675: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:01:13,675: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:13,675: INFO/MainProcess] Received task: worker.noop[da3c8078-b593-45ae-971b-ad24475ef392]  
[2017-11-10 16:01:13,676: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:01:13,676: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,676: INFO/ForkPoolWorker-1] Task worker.noop[da3c8078-b593-45ae-971b-ad24475ef392] succeeded in 8.325599992531352e-05s: 'noop: 2'
[2017-11-10 16:01:13,677: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,677: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:01:13,677: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:01:13,678: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:13,679: INFO/MainProcess] Received task: worker.noop[24b7b946-5151-4268-869e-4c22a4d4d556]  
[2017-11-10 16:01:13,679: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:01:13,679: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,679: INFO/ForkPoolWorker-1] Task worker.noop[24b7b946-5151-4268-869e-4c22a4d4d556] succeeded in 5.8243000239599496e-05s: 'noop: 3'
[2017-11-10 16:01:13,680: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,680: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:01:13,680: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:01:13,681: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:13,681: INFO/MainProcess] Received task: worker.noop[95f23dd8-0b0b-42b7-9ee0-2a6e40dc3bd4]  
[2017-11-10 16:01:13,681: WARNING/MainProcess] [EVENTS]: schedule_writes(4)->W
[2017-11-10 16:01:13,681: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,681: INFO/ForkPoolWorker-1] Task worker.noop[95f23dd8-0b0b-42b7-9ee0-2a6e40dc3bd4] succeeded in 4.8175999836530536e-05s: 'noop: 4'
[2017-11-10 16:01:13,682: WARNING/MainProcess] [EVENTS]: on_result_readable(5)(5)->R
[2017-11-10 16:01:13,682: WARNING/MainProcess] -> acknowledge()
[2017-11-10 16:01:13,682: WARNING/MainProcess] <- acknowledge()
[2017-11-10 16:01:14,971: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:14,972: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:16,952: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:16,960: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:16,975: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:16,977: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R
[2017-11-10 16:01:18,980: WARNING/MainProcess] [EVENTS]:
[2017-11-10 16:01:18,981: WARNING/MainProcess] [EVENTS]: on_readable(<kombu.transport.pyamqp.Connection object at 0x10cd42748>, <Hub@0x10cc904e0: R:3 W:0>)(8)->R

Report below:

celery -A worker report

software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:3.6.3
            billiard:3.5.0.3 redis:2.10.6
platform -> system:Darwin arch:64bit imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:disabled

broker_url: 'redis://localhost:6379//'
task_acks_late: False
worker_prefetch_multiplier: 1
@jackdbernier

This comment has been minimized.

Copy link

jackdbernier commented Apr 11, 2018

I've been looking at this one a bit. Any insights about where to look? I'm getting a little lost between celery and kombu. I'd love to be able to pick this up.

@auvipy

This comment has been minimized.

Copy link
Member

auvipy commented Apr 11, 2018

first check the redis codes in kombu then celery

@jackdbernier

This comment has been minimized.

Copy link

jackdbernier commented Apr 11, 2018

I think it's somewhere around poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 but I'm sort of going in a circle. I'll read more.

Why only when asks_late=True?
Why using 1 second as default minimum or fallback?

I'm checking out the kombu repo now. I can definitely reproduce and we have been using this setting for a while now and we just assumed that the beast was slow... Feeling kinda of dumb now and hoping to find a decent solution.

@georgepsarakis

This comment has been minimized.

Copy link
Member

georgepsarakis commented Apr 12, 2018

@jackdbernier this section seems to handle the acknowledgement.

Also there are some Celery flows that may differentiate depending on the acks_late value, for example see here.

Hope this helps, let me know if you make any progress.

@jackdbernier

This comment has been minimized.

Copy link

jackdbernier commented Apr 13, 2018

Thanks for the pointers. I've definitely looked at those lines of code before but I've found nothing that screams "I'm polling twice and waiting 1 second". I think it is closer to what I pointed where the code tries to see if it has received a message or not. I'm still feeling that something is not set properly when the task is ack'ed after the execution to make poll_timeout = fire_timers(propagate=propagate) if scheduled else 1 (https://github.com/celery/kombu/blob/75695205f6e7af8e7e9178e010debc3871b19106/kombu/asynchronous/hub.py#L293) think it has to wait before the next poll or here https://github.com/celery/kombu/blob/75695205f6e7af8e7e9178e010debc3871b19106/kombu/transport/virtual/base.py#L198

Is there some documentation about how the processes communicate with each other (MainProcess <-> Worker Processes)?

@auvipy auvipy modified the milestones: v5.0.0, v4.3 May 27, 2018

@auvipy auvipy modified the milestones: v4.3, v5.0.0 Nov 17, 2018

@farhaanbukhsh

This comment has been minimized.

Copy link

farhaanbukhsh commented Feb 1, 2019

Hey I want to try and fix this @auvipy can you help me narrow it down?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.