New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"assert proc.outqR_fd in hub.readers" AssertionError #1785

Closed
ionelmc opened this Issue Jan 13, 2014 · 20 comments

Comments

Projects
None yet
6 participants
@ionelmc
Member

ionelmc commented Jan 13, 2014

[2014-01-13 15:06:53,047] pid=33970/MainProcess - ERROR    - celery.worker - Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 270, in start
    blueprint.start(self)
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 786, in start
    c.loop(*c.loop_args())
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/worker/loops.py", line 71, in asynloop
    next(loop)
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/kombu/async/hub.py", line 288, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/kombu/async/hub.py", line 151, in fire_timers
    entry()
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/ionel/projects/core/.ve/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 504, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
[2014-01-13 15:06:53,048] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Worker: Closing Hub...
[2014-01-13 15:06:53,048] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Worker: Closing Pool...
[2014-01-13 15:06:53,048] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Worker: Closing Consumer...
[2014-01-13 15:06:53,048] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Worker: Stopping Consumer...
[2014-01-13 15:06:53,048] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Connection...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Events...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Mingle...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Tasks...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Control...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Gossip...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing Heart...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Closing event loop...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Stopping event loop...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Stopping Heart...
[2014-01-13 15:06:53,049] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Stopping Gossip...
[2014-01-13 15:06:53,050] pid=33970/MainProcess - DEBUG    - celery.bootsteps - | Consumer: Stopping Control...

I don't have a isolated testcase yet. Does the cause look like something obvious ?

@ionelmc

This comment has been minimized.

Show comment
Hide comment
@ionelmc
Member

ionelmc commented Jan 13, 2014

@scottccoates

This comment has been minimized.

Show comment
Hide comment
@scottccoates

scottccoates Jan 17, 2014

I too am getting a similar issue. Which transport are you using?

scottccoates commented Jan 17, 2014

I too am getting a similar issue. Which transport are you using?

@ionelmc

This comment has been minimized.

Show comment
Hide comment
@ionelmc

ionelmc Jan 17, 2014

Member

It was redis. I will post more details when I get the chance

Member

ionelmc commented Jan 17, 2014

It was redis. I will post more details when I get the chance

@adepue

This comment has been minimized.

Show comment
Hide comment
@adepue

adepue Jan 22, 2014

Contributor

I am getting this issues with rabbitmq/amqp.

Contributor

adepue commented Jan 22, 2014

I am getting this issues with rabbitmq/amqp.

@mrkafk

This comment has been minimized.

Show comment
Hide comment
@mrkafk

mrkafk Jan 28, 2014

I'm getting it too, on Redis backend. Celery Beat continues, worker restarts but seems to be idle (no output from tasks in log, unlike when I stop entire instance, do flushall on redis and start celery again, then it starts working again).

mrkafk commented Jan 28, 2014

I'm getting it too, on Redis backend. Celery Beat continues, worker restarts but seems to be idle (no output from tasks in log, unlike when I stop entire instance, do flushall on redis and start celery again, then it starts working again).

@ionelmc

This comment has been minimized.

Show comment
Hide comment
@ionelmc

ionelmc Jan 29, 2014

Member

The problem code is in https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L501

Do you guys use CELERYD_MAX_TASKS_PER_CHILD with low value ?

Member

ionelmc commented Jan 29, 2014

The problem code is in https://github.com/celery/celery/blob/master/celery/concurrency/asynpool.py#L501

Do you guys use CELERYD_MAX_TASKS_PER_CHILD with low value ?

@mrkafk

This comment has been minimized.

Show comment
Hide comment
@mrkafk

mrkafk Jan 29, 2014

Yes! In fact I set it to 1, otherwise I had massive memory leaks under heavy loads (lots of workers, big tasks, not releasing memory to OS causing memory use creeping up and then OOM killer incoming) etc.

mrkafk commented Jan 29, 2014

Yes! In fact I set it to 1, otherwise I had massive memory leaks under heavy loads (lots of workers, big tasks, not releasing memory to OS causing memory use creeping up and then OOM killer incoming) etc.

@scottccoates

This comment has been minimized.

Show comment
Hide comment
@scottccoates

scottccoates Jan 29, 2014

We have it set to 1000.

scottccoates commented Jan 29, 2014

We have it set to 1000.

@adepue

This comment has been minimized.

Show comment
Hide comment
@adepue

adepue Jan 29, 2014

Contributor

Yep it is set to 2500 for us.

Contributor

adepue commented Jan 29, 2014

Yep it is set to 2500 for us.

@ionelmc

This comment has been minimized.

Show comment
Hide comment
@ionelmc

ionelmc Jan 30, 2014

Member

So I've been able to replicate this in a somewhat unreliable manner (sometimes it takes a minute, sometimes it takes an hour) with this code:

#!.ve/bin/python -u
import os
import sys
import time
from datetime import datetime
#os.environ['MP_LOG'] = '1'
#os.environ['CELERY_RDBSIG'] = '1'
from logging import getLogger, DEBUG
logger = getLogger(__name__)
import billiard
billiard.log_to_stderr(level=DEBUG)

from celery import Celery
celery = Celery(
)
celery.conf.update(
    BROKER_URL="redis://localhost/5",
    CELERY_RESULT_BACKEND="redis://localhost/6",
    CELERY_TASK_SERIALIZER="pickle",
    CELERY_RESULT_SERIALIZER="pickle",
    CELERY_EVENT_SERIALIZER="pickle",
    CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml'],
    CELERYD_MAX_TASKS_PER_CHILD=1,
    CELERY_DISABLE_RATE_LIMITS=True,
)

@celery.task
def ok(arg=None):
    logger.warn('Running task code.')
    return 'ok'

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == 'loop':
        while 1:
            print datetime.now()
            print [r.get(propagate=False) for r in [ok.delay() for i in range(200)]]
            time.sleep(1)
    else:
        celery.start()

My best guess here, after adding some logging in celery and kombu is that there is some file descriptor management issue. File descriptors are frequently recycled and my theory is that kombu's hub will remove an closed-and-already-opened fd (the outqR_fd). That fd will be assigned to a newly spawned worker process.

Eg:

  • Process-1 has inqR_fd=100 (in the MainProcess)
  • Process-1 does work and it is exiting due to the maxtaskperchild cycle
  • Kombu's Hub still has fd 100 registered (in the MainProcess)
  • MainProcess has closes all handles related to Process-1, including fd 100, but the Hub still has a reference to it
  • MainProcess spawns Process-2, that happens to have outqR_fd with the same number (100)
  • Kombu's Hub unregisters fd 100 from the hub, but it was for a different process and for different type of operation (read, not write)

This can happen I think because the Hub makes no distinction between a fd that was added for read or for write when removing it. See https://github.com/celery/kombu/blob/master/kombu/async/hub.py#L260

@ask What do you thing - does this sound plausible?

Member

ionelmc commented Jan 30, 2014

So I've been able to replicate this in a somewhat unreliable manner (sometimes it takes a minute, sometimes it takes an hour) with this code:

#!.ve/bin/python -u
import os
import sys
import time
from datetime import datetime
#os.environ['MP_LOG'] = '1'
#os.environ['CELERY_RDBSIG'] = '1'
from logging import getLogger, DEBUG
logger = getLogger(__name__)
import billiard
billiard.log_to_stderr(level=DEBUG)

from celery import Celery
celery = Celery(
)
celery.conf.update(
    BROKER_URL="redis://localhost/5",
    CELERY_RESULT_BACKEND="redis://localhost/6",
    CELERY_TASK_SERIALIZER="pickle",
    CELERY_RESULT_SERIALIZER="pickle",
    CELERY_EVENT_SERIALIZER="pickle",
    CELERY_ACCEPT_CONTENT=['pickle', 'json', 'msgpack', 'yaml'],
    CELERYD_MAX_TASKS_PER_CHILD=1,
    CELERY_DISABLE_RATE_LIMITS=True,
)

@celery.task
def ok(arg=None):
    logger.warn('Running task code.')
    return 'ok'

if __name__ == '__main__':
    if len(sys.argv) > 1 and sys.argv[1] == 'loop':
        while 1:
            print datetime.now()
            print [r.get(propagate=False) for r in [ok.delay() for i in range(200)]]
            time.sleep(1)
    else:
        celery.start()

My best guess here, after adding some logging in celery and kombu is that there is some file descriptor management issue. File descriptors are frequently recycled and my theory is that kombu's hub will remove an closed-and-already-opened fd (the outqR_fd). That fd will be assigned to a newly spawned worker process.

Eg:

  • Process-1 has inqR_fd=100 (in the MainProcess)
  • Process-1 does work and it is exiting due to the maxtaskperchild cycle
  • Kombu's Hub still has fd 100 registered (in the MainProcess)
  • MainProcess has closes all handles related to Process-1, including fd 100, but the Hub still has a reference to it
  • MainProcess spawns Process-2, that happens to have outqR_fd with the same number (100)
  • Kombu's Hub unregisters fd 100 from the hub, but it was for a different process and for different type of operation (read, not write)

This can happen I think because the Hub makes no distinction between a fd that was added for read or for write when removing it. See https://github.com/celery/kombu/blob/master/kombu/async/hub.py#L260

@ask What do you thing - does this sound plausible?

@ionelmc

This comment has been minimized.

Show comment
Hide comment
@ionelmc

ionelmc Jan 30, 2014

Member

I think I have a fix in #1834

Can you give it a try ? I've been running it for an hour, no issues so far.

Member

ionelmc commented Jan 30, 2014

I think I have a fix in #1834

Can you give it a try ? I've been running it for an hour, no issues so far.

@ionelmc ionelmc closed this in #1834 Jan 31, 2014

@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu Apr 11, 2014

Contributor

This bug still is happening for us, we have maxtasks per child set to 2500. Has anybody else been seeing it?

14-04-11 18:11:34,263: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 801, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py", line 504, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
Contributor

rogerhu commented Apr 11, 2014

This bug still is happening for us, we have maxtasks per child set to 2500. Has anybody else been seeing it?

14-04-11 18:11:34,263: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/consumer.py", line 801, in start
    c.loop(*c.loop_args())
  File "/usr/local/lib/python2.7/dist-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/usr/local/lib/python2.7/dist-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/concurrency/asynpool.py", line 504, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu Apr 20, 2014

Contributor

We're still seeing the issue with max tasks per child=2500 and here's my hypothesis for what's going on:

  1. on_process_up -> new Celery worker starts up
  2. on_process_down (for an old worker that was using the file descriptor to write) -> the hub's file descriptor is removed (since hub.remove() will delete from hub.readers and hub.writers even though it was used for writing only):
    def _discard(self, fd):
        fd = fileno(fd)
        self.readers.pop(fd, None)
        self.writers.pop(fd, None)
        self.consolidate.discard(fd)
  1. verify_process_alive for new worker starting up -> first 2 assert statements check out, the 3rd fails
    because Celery worker dies because of Unrecoverable Error.

It seems that the hub should be less aggressive about discarding both readers/writers, and that it should be removing the file descriptor only if is defined as a reader or writer but not both.

It seems like the hub.remove() needs to be a bit more selective. Does rogerhu@f271d30 look like the right track?

Contributor

rogerhu commented Apr 20, 2014

We're still seeing the issue with max tasks per child=2500 and here's my hypothesis for what's going on:

  1. on_process_up -> new Celery worker starts up
  2. on_process_down (for an old worker that was using the file descriptor to write) -> the hub's file descriptor is removed (since hub.remove() will delete from hub.readers and hub.writers even though it was used for writing only):
    def _discard(self, fd):
        fd = fileno(fd)
        self.readers.pop(fd, None)
        self.writers.pop(fd, None)
        self.consolidate.discard(fd)
  1. verify_process_alive for new worker starting up -> first 2 assert statements check out, the 3rd fails
    because Celery worker dies because of Unrecoverable Error.

It seems that the hub should be less aggressive about discarding both readers/writers, and that it should be removing the file descriptor only if is defined as a reader or writer but not both.

It seems like the hub.remove() needs to be a bit more selective. Does rogerhu@f271d30 look like the right track?

@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu May 1, 2014

Contributor

I'm able to trigger the problem twice by adding delays in the WORKER_UP command when a new process is started back up to try to see if I can identity the race condition. It obviously does not happen too often but I was able to dump out the file descriptors being used between process up/process down calls:

[2014-05-01 07:41:42,322: ERROR/MainProcess] on process up proc.outqR_fd=32, proc.inqW_fd=31
[2014-05-01 07:41:42,332: ERROR/MainProcess] on process up proc.outqR_fd=20, proc.inqW_fd=19
[2014-05-01 07:41:42,387: ERROR/MainProcess] on process down proc.inqW_fd=11, proc.outqR_fd=12
[2014-05-01 07:41:42,387: ERROR/MainProcess] on process down proc.inqW_fd=14, proc.outqR_fd=15
[2014-05-01 07:41:42,388: ERROR/MainProcess] on process down proc.inqW_fd=7, proc.outqR_fd=8
[2014-05-01 07:41:42,396: ERROR/MainProcess] on process up proc.outqR_fd=12, proc.inqW_fd=11
[2014-05-01 07:41:42,406: ERROR/MainProcess] on process up proc.outqR_fd=8, proc.inqW_fd=7
[2014-05-01 07:41:42,417: ERROR/MainProcess] on process up proc.outqR_fd=15, proc.inqW_fd=14
[2014-05-01 07:41:44,194: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-32, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,204: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-33, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,215: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-34, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,226: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-35, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,236: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-36, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,246: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-37, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,257: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-38, started daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,258: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
[2014-05-01 07:41:45,779: ERROR/MainProcess] on process down proc.inqW_fd=34, proc.outqR_fd=35
Contributor

rogerhu commented May 1, 2014

I'm able to trigger the problem twice by adding delays in the WORKER_UP command when a new process is started back up to try to see if I can identity the race condition. It obviously does not happen too often but I was able to dump out the file descriptors being used between process up/process down calls:

[2014-05-01 07:41:42,322: ERROR/MainProcess] on process up proc.outqR_fd=32, proc.inqW_fd=31
[2014-05-01 07:41:42,332: ERROR/MainProcess] on process up proc.outqR_fd=20, proc.inqW_fd=19
[2014-05-01 07:41:42,387: ERROR/MainProcess] on process down proc.inqW_fd=11, proc.outqR_fd=12
[2014-05-01 07:41:42,387: ERROR/MainProcess] on process down proc.inqW_fd=14, proc.outqR_fd=15
[2014-05-01 07:41:42,388: ERROR/MainProcess] on process down proc.inqW_fd=7, proc.outqR_fd=8
[2014-05-01 07:41:42,396: ERROR/MainProcess] on process up proc.outqR_fd=12, proc.inqW_fd=11
[2014-05-01 07:41:42,406: ERROR/MainProcess] on process up proc.outqR_fd=8, proc.inqW_fd=7
[2014-05-01 07:41:42,417: ERROR/MainProcess] on process up proc.outqR_fd=15, proc.inqW_fd=14
[2014-05-01 07:41:44,194: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-32, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,204: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-33, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,215: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-34, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,226: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-35, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,236: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-36, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,246: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-37, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,257: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-38, started daemon)> waiting_to_start=set([<Worker(Worker-45, started daemon)>, <Worker(Worker-40, started daemon)>, <Worker(Worker-42, started daemon)>, <Worker(Worker-38, started daemon)>, <Worker(Worker-41, started daemon)>, <Worker(Worker-44, started daemon)>, <Worker(Worker-43, started daemon)>, <Worker(Worker-46, started daemon)>])
[2014-05-01 07:41:44,258: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
[2014-05-01 07:41:45,779: ERROR/MainProcess] on process down proc.inqW_fd=34, proc.outqR_fd=35
@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu May 1, 2014

Contributor

I'm able to get it a 3rd time now (dumb luck):

[2014-05-01 07:49:45,197: ERROR/MainProcess] on process down <Worker(Worker-9, stopped[155] daemon)> proc.inqW_fd=27, proc.outqR_fd=28
[2014-05-01 07:49:45,205: ERROR/MainProcess] on process up <Worker(Worker-17, started daemon)> proc.outqR_fd=28, proc.inqW_fd=27
[2014-05-01 07:49:45,247: ERROR/Worker-10] Worker up!
[2014-05-01 07:49:45,259: ERROR/Worker-11] Worker up!
[2014-05-01 07:49:45,269: ERROR/MainProcess] on process down <Worker(Worker-10, stopped[155] daemon)> proc.inqW_fd=19, proc.outqR_fd=20
[2014-05-01 07:49:45,271: ERROR/Worker-12] Worker up!
[2014-05-01 07:49:45,278: ERROR/MainProcess] on process up <Worker(Worker-18, started daemon)> proc.outqR_fd=20, proc.inqW_fd=19
[2014-05-01 07:49:45,283: ERROR/Worker-13] Worker up!
[2014-05-01 07:49:45,324: ERROR/MainProcess] on process down <Worker(Worker-12, stopped[155] daemon)> proc.inqW_fd=23, proc.outqR_fd=24
[2014-05-01 07:49:45,324: ERROR/MainProcess] on process down <Worker(Worker-11, stopped[155] daemon)> proc.inqW_fd=15, proc.outqR_fd=16
[2014-05-01 07:49:45,332: ERROR/MainProcess] on process up <Worker(Worker-19, started daemon)> proc.outqR_fd=23, proc.inqW_fd=22
[2014-05-01 07:49:45,344: ERROR/MainProcess] on process up <Worker(Worker-20, started daemon)> proc.outqR_fd=16, proc.inqW_fd=15
[2014-05-01 07:49:45,347: ERROR/Worker-14] Worker up!
[2014-05-01 07:49:45,360: ERROR/Worker-15] Worker up!
[2014-05-01 07:49:45,371: ERROR/Worker-16] Worker up!
[2014-05-01 07:49:47,165: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-9, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,237: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-10, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,248: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-11, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,259: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-12, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,271: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-13, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,337: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-14, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,348: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-15, started daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,359: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-16, started daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:49,233: ERROR/MainProcess] on process down <Worker(Worker-13, stopped[155] daemon)> proc.inqW_fd=31, proc.outqR_fd=32
[2014-05-01 07:49:49,234: ERROR/MainProcess] on process down <Worker(Worker-14, stopped[155] daemon)> proc.inqW_fd=35, proc.outqR_fd=36
[2014-05-01 07:49:49,243: ERROR/MainProcess] on process up <Worker(Worker-21, started daemon)> proc.outqR_fd=36, proc.inqW_fd=35
[2014-05-01 07:49:49,254: ERROR/MainProcess] on process up <Worker(Worker-22, started daemon)> proc.outqR_fd=31, proc.inqW_fd=30
[2014-05-01 07:49:49,298: ERROR/MainProcess] on process down <Worker(Worker-15, stopped[155] daemon)> proc.inqW_fd=11, proc.outqR_fd=12
[2014-05-01 07:49:49,299: ERROR/MainProcess] on process down <Worker(Worker-16, stopped[155] daemon)> proc.inqW_fd=7, proc.outqR_fd=8
[2014-05-01 07:49:49,307: ERROR/MainProcess] on process up <Worker(Worker-23, started daemon)> proc.outqR_fd=12, proc.inqW_fd=11
[2014-05-01 07:49:49,317: ERROR/MainProcess] on process up <Worker(Worker-24, started daemon)> proc.outqR_fd=8, proc.inqW_fd=7
[2014-05-01 07:49:53,221: ERROR/Worker-17] Worker up!
[2014-05-01 07:49:53,299: ERROR/Worker-18] Worker up!
[2014-05-01 07:49:53,347: ERROR/Worker-19] Worker up!
[2014-05-01 07:49:53,359: ERROR/Worker-20] Worker up!
[2014-05-01 07:49:54,372: ERROR/MainProcess] on process down <Worker(Worker-17, stopped[155] daemon)> proc.inqW_fd=27, proc.outqR_fd=28
[2014-05-01 07:49:54,382: ERROR/MainProcess] on process up <Worker(Worker-25, started daemon)> proc.outqR_fd=28, proc.inqW_fd=27
[2014-05-01 07:49:55,387: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-17, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-24, started daemon)>, <Worker(Worker-23, started daemon)>, <Worker(Worker-19, started daemon)>, <Worker(Worker-22, started daemon)>, <Worker(Worker-21, started daemon)>, <Worker(Worker-25, started daemon)>])
[2014-05-01 07:49:55,388: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-18, started daemon)> waiting_to_start=set([<Worker(Worker-24, started daemon)>, <Worker(Worker-23, started daemon)>, <Worker(Worker-19, started daemon)>, <Worker(Worker-22, started daemon)>, <Worker(Worker-21, started daemon)>, <Worker(Worker-25, started daemon)>])
[2014-05-01 07:49:55,388: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-19, started daemon)> waiting_to_start=set([<Worker(Worker-24, started daemon)>, <Worker(Worker-23, started daemon)>, <Worker(Worker-19, started daemon)>, <Worker(Worker-22, started daemon)>, <Worker(Worker-21, started daemon)>, <Worker(Worker-25, started daemon)>])
[2014-05-01 07:49:55,389: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError

Contributor

rogerhu commented May 1, 2014

I'm able to get it a 3rd time now (dumb luck):

[2014-05-01 07:49:45,197: ERROR/MainProcess] on process down <Worker(Worker-9, stopped[155] daemon)> proc.inqW_fd=27, proc.outqR_fd=28
[2014-05-01 07:49:45,205: ERROR/MainProcess] on process up <Worker(Worker-17, started daemon)> proc.outqR_fd=28, proc.inqW_fd=27
[2014-05-01 07:49:45,247: ERROR/Worker-10] Worker up!
[2014-05-01 07:49:45,259: ERROR/Worker-11] Worker up!
[2014-05-01 07:49:45,269: ERROR/MainProcess] on process down <Worker(Worker-10, stopped[155] daemon)> proc.inqW_fd=19, proc.outqR_fd=20
[2014-05-01 07:49:45,271: ERROR/Worker-12] Worker up!
[2014-05-01 07:49:45,278: ERROR/MainProcess] on process up <Worker(Worker-18, started daemon)> proc.outqR_fd=20, proc.inqW_fd=19
[2014-05-01 07:49:45,283: ERROR/Worker-13] Worker up!
[2014-05-01 07:49:45,324: ERROR/MainProcess] on process down <Worker(Worker-12, stopped[155] daemon)> proc.inqW_fd=23, proc.outqR_fd=24
[2014-05-01 07:49:45,324: ERROR/MainProcess] on process down <Worker(Worker-11, stopped[155] daemon)> proc.inqW_fd=15, proc.outqR_fd=16
[2014-05-01 07:49:45,332: ERROR/MainProcess] on process up <Worker(Worker-19, started daemon)> proc.outqR_fd=23, proc.inqW_fd=22
[2014-05-01 07:49:45,344: ERROR/MainProcess] on process up <Worker(Worker-20, started daemon)> proc.outqR_fd=16, proc.inqW_fd=15
[2014-05-01 07:49:45,347: ERROR/Worker-14] Worker up!
[2014-05-01 07:49:45,360: ERROR/Worker-15] Worker up!
[2014-05-01 07:49:45,371: ERROR/Worker-16] Worker up!
[2014-05-01 07:49:47,165: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-9, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,237: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-10, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,248: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-11, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,259: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-12, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,271: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-13, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,337: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-14, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,348: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-15, started daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:47,359: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-16, started daemon)> waiting_to_start=set([<Worker(Worker-17, started daemon)>, <Worker(Worker-20, started daemon)>, <Worker(Worker-18, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 07:49:49,233: ERROR/MainProcess] on process down <Worker(Worker-13, stopped[155] daemon)> proc.inqW_fd=31, proc.outqR_fd=32
[2014-05-01 07:49:49,234: ERROR/MainProcess] on process down <Worker(Worker-14, stopped[155] daemon)> proc.inqW_fd=35, proc.outqR_fd=36
[2014-05-01 07:49:49,243: ERROR/MainProcess] on process up <Worker(Worker-21, started daemon)> proc.outqR_fd=36, proc.inqW_fd=35
[2014-05-01 07:49:49,254: ERROR/MainProcess] on process up <Worker(Worker-22, started daemon)> proc.outqR_fd=31, proc.inqW_fd=30
[2014-05-01 07:49:49,298: ERROR/MainProcess] on process down <Worker(Worker-15, stopped[155] daemon)> proc.inqW_fd=11, proc.outqR_fd=12
[2014-05-01 07:49:49,299: ERROR/MainProcess] on process down <Worker(Worker-16, stopped[155] daemon)> proc.inqW_fd=7, proc.outqR_fd=8
[2014-05-01 07:49:49,307: ERROR/MainProcess] on process up <Worker(Worker-23, started daemon)> proc.outqR_fd=12, proc.inqW_fd=11
[2014-05-01 07:49:49,317: ERROR/MainProcess] on process up <Worker(Worker-24, started daemon)> proc.outqR_fd=8, proc.inqW_fd=7
[2014-05-01 07:49:53,221: ERROR/Worker-17] Worker up!
[2014-05-01 07:49:53,299: ERROR/Worker-18] Worker up!
[2014-05-01 07:49:53,347: ERROR/Worker-19] Worker up!
[2014-05-01 07:49:53,359: ERROR/Worker-20] Worker up!
[2014-05-01 07:49:54,372: ERROR/MainProcess] on process down <Worker(Worker-17, stopped[155] daemon)> proc.inqW_fd=27, proc.outqR_fd=28
[2014-05-01 07:49:54,382: ERROR/MainProcess] on process up <Worker(Worker-25, started daemon)> proc.outqR_fd=28, proc.inqW_fd=27
[2014-05-01 07:49:55,387: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-17, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-24, started daemon)>, <Worker(Worker-23, started daemon)>, <Worker(Worker-19, started daemon)>, <Worker(Worker-22, started daemon)>, <Worker(Worker-21, started daemon)>, <Worker(Worker-25, started daemon)>])
[2014-05-01 07:49:55,388: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-18, started daemon)> waiting_to_start=set([<Worker(Worker-24, started daemon)>, <Worker(Worker-23, started daemon)>, <Worker(Worker-19, started daemon)>, <Worker(Worker-22, started daemon)>, <Worker(Worker-21, started daemon)>, <Worker(Worker-25, started daemon)>])
[2014-05-01 07:49:55,388: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-19, started daemon)> waiting_to_start=set([<Worker(Worker-24, started daemon)>, <Worker(Worker-23, started daemon)>, <Worker(Worker-19, started daemon)>, <Worker(Worker-22, started daemon)>, <Worker(Worker-21, started daemon)>, <Worker(Worker-25, started daemon)>])
[2014-05-01 07:49:55,389: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 274, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError

@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu May 1, 2014

Contributor

Got a fourth failure this time and it's more clear this time Worker-29 picks up the same file descriptors that Worker-23 is using. Worker-23 is detected to be dead after Worker-29, causing the hub.readers to be removed. When the process is verified to be alive, it fails.

It seems like the assumption doing these assertion checks is very problematic given that the OS will give back file descriptors before a worker is detected to be lost.

[2014-05-01 08:13:04,910: ERROR/MainProcess] on process up <Worker(Worker-29, started daemon)> proc.outqR_fd=28, proc.inqW_fd=27
[2014-05-01 08:13:04,915: ERROR/MainProcess] on process down <Worker(Worker-23, stopped[155] daemon)> proc.inqW_fd=27, proc.outqR_fd=28
[2014-05-01 08:13:04,923: ERROR/MainProcess] on process up <Worker(Worker-30, started daemon)> proc.outqR_fd=24, proc.inqW_fd=23
[2014-05-01 08:13:06,602: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-17, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-30, started daemon)>, <Worker(Worker-24, started daemon)>, <Worker(Worker-26, started daemon)>, <Worker(Worker-27, started daemon)>, <Worker(Worker-29, started daemon)>, <Worker(Worker-25, started daemon)>, <Worker(Worker-28, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 08:13:07,605: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-18, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-30, started daemon)>, <Worker(Worker-24, started daemon)>, <Worker(Worker-26, started daemon)>, <Worker(Worker-27, started daemon)>, <Worker(Worker-29, started daemon)>, <Worker(Worker-25, started daemon)>, <Worker(Worker-28, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 08:13:07,606: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-19, started daemon)> waiting_to_start=set([<Worker(Worker-30, started daemon)>, <Worker(Worker-24, started daemon)>, <Worker(Worker-26, started daemon)>, <Worker(Worker-27, started daemon)>, <Worker(Worker-29, started daemon)>, <Worker(Worker-25, started daemon)>, <Worker(Worker-28, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 08:13:07,606: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 275, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
Contributor

rogerhu commented May 1, 2014

Got a fourth failure this time and it's more clear this time Worker-29 picks up the same file descriptors that Worker-23 is using. Worker-23 is detected to be dead after Worker-29, causing the hub.readers to be removed. When the process is verified to be alive, it fails.

It seems like the assumption doing these assertion checks is very problematic given that the OS will give back file descriptors before a worker is detected to be lost.

[2014-05-01 08:13:04,910: ERROR/MainProcess] on process up <Worker(Worker-29, started daemon)> proc.outqR_fd=28, proc.inqW_fd=27
[2014-05-01 08:13:04,915: ERROR/MainProcess] on process down <Worker(Worker-23, stopped[155] daemon)> proc.inqW_fd=27, proc.outqR_fd=28
[2014-05-01 08:13:04,923: ERROR/MainProcess] on process up <Worker(Worker-30, started daemon)> proc.outqR_fd=24, proc.inqW_fd=23
[2014-05-01 08:13:06,602: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-17, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-30, started daemon)>, <Worker(Worker-24, started daemon)>, <Worker(Worker-26, started daemon)>, <Worker(Worker-27, started daemon)>, <Worker(Worker-29, started daemon)>, <Worker(Worker-25, started daemon)>, <Worker(Worker-28, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 08:13:07,605: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-18, stopped[155] daemon)> waiting_to_start=set([<Worker(Worker-30, started daemon)>, <Worker(Worker-24, started daemon)>, <Worker(Worker-26, started daemon)>, <Worker(Worker-27, started daemon)>, <Worker(Worker-29, started daemon)>, <Worker(Worker-25, started daemon)>, <Worker(Worker-28, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 08:13:07,606: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-19, started daemon)> waiting_to_start=set([<Worker(Worker-30, started daemon)>, <Worker(Worker-24, started daemon)>, <Worker(Worker-26, started daemon)>, <Worker(Worker-27, started daemon)>, <Worker(Worker-29, started daemon)>, <Worker(Worker-25, started daemon)>, <Worker(Worker-28, started daemon)>, <Worker(Worker-19, started daemon)>])
[2014-05-01 08:13:07,606: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 275, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError
@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu May 1, 2014

Contributor

The 2nd scenario I found occurs if a worker picks up file descriptors for read/write opposite to the previous worker. It seems as if hub_remove() is called not within on_process_down (since proc.dead = True) but in the try..finally clause of _write_job(). This explains how the 2nd phenomenon can occur:

[2014-05-01 08:34:14,506: ERROR/MainProcess] on process down <Worker(Worker-77, stopped[155] daemon)> proc.inqW_fd=11, proc.outqR_fd=12 proc.dead=True
[2014-05-01 08:34:14,515: ERROR/MainProcess] on process up <Worker(Worker-85, started daemon)> proc.outqR_fd=12, proc.inqW_fd=11
[2014-05-01 08:34:14,517: ERROR/MainProcess] Removing from hub: 11

[2014-05-01 08:34:17,323: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-80, started daemon)> waiting_to_start=set([<Worker(Worker-83, started daemon)>, <Worker(Worker-82, started daemon)>, <Worker(Worker-81, started daemon)>, <Worker(Worker-84, started daemon)>, <Worker(Worker-85, started daemon)>, <Worker(Worker-87, started daemon)>, <Worker(Worker-80, started daemon)>, <Worker(Worker-86, started daemon)>])
[2014-05-01 08:34:17,324: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 275, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError

Contributor

rogerhu commented May 1, 2014

The 2nd scenario I found occurs if a worker picks up file descriptors for read/write opposite to the previous worker. It seems as if hub_remove() is called not within on_process_down (since proc.dead = True) but in the try..finally clause of _write_job(). This explains how the 2nd phenomenon can occur:

[2014-05-01 08:34:14,506: ERROR/MainProcess] on process down <Worker(Worker-77, stopped[155] daemon)> proc.inqW_fd=11, proc.outqR_fd=12 proc.dead=True
[2014-05-01 08:34:14,515: ERROR/MainProcess] on process up <Worker(Worker-85, started daemon)> proc.outqR_fd=12, proc.inqW_fd=11
[2014-05-01 08:34:14,517: ERROR/MainProcess] Removing from hub: 11

[2014-05-01 08:34:17,323: ERROR/MainProcess] Verifying process alive: proc=<Worker(Worker-80, started daemon)> waiting_to_start=set([<Worker(Worker-83, started daemon)>, <Worker(Worker-82, started daemon)>, <Worker(Worker-81, started daemon)>, <Worker(Worker-84, started daemon)>, <Worker(Worker-85, started daemon)>, <Worker(Worker-87, started daemon)>, <Worker(Worker-80, started daemon)>, <Worker(Worker-86, started daemon)>])
[2014-05-01 08:34:17,324: ERROR/MainProcess] Unrecoverable error: AssertionError()
Traceback (most recent call last):
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 275, in create_loop
    poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 136, in fire_timers
    entry()
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/kombu/async/timer.py", line 64, in __call__
    return self.fun(*self.args, **self.kwargs)
  File "/home/rhu/projects/HearsayLabs/.virtualenv/local/lib/python2.7/site-packages/celery/concurrency/asynpool.py", line 509, in verify_process_alive
    assert proc.outqR_fd in hub.readers
AssertionError

@ask

This comment has been minimized.

Show comment
Hide comment
@ask

ask May 1, 2014

Member

The assertion checks should be there right? AFAICT it would be a disaster if the fd is not registered at that point (eventual deadlock), but a false positive is not really a problem. Does this still happen even after your "selective remove" patch?

Member

ask commented May 1, 2014

The assertion checks should be there right? AFAICT it would be a disaster if the fd is not registered at that point (eventual deadlock), but a false positive is not really a problem. Does this still happen even after your "selective remove" patch?

@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu May 1, 2014

Contributor

There are two problems that I can see from the assertion failures:

  1. Two workers using the same file descriptors but on_process_down() is called after on_process_up(). Since the OS is giving back the file descriptor before we detect it, this suggests a race condition.

The assertion check will fail depending on the order in which process_up/process_down call. Would it be safer instead to try to use a count before trying to remove a file descriptor from the hub? (i.e. count has to decrement to zero)

  1. Two workers swap fd's for read/write. The selective remove patch should conceivably fix the issue
Contributor

rogerhu commented May 1, 2014

There are two problems that I can see from the assertion failures:

  1. Two workers using the same file descriptors but on_process_down() is called after on_process_up(). Since the OS is giving back the file descriptor before we detect it, this suggests a race condition.

The assertion check will fail depending on the order in which process_up/process_down call. Would it be safer instead to try to use a count before trying to remove a file descriptor from the hub? (i.e. count has to decrement to zero)

  1. Two workers swap fd's for read/write. The selective remove patch should conceivably fix the issue

rogerhu added a commit to rogerhu/celery that referenced this issue May 1, 2014

Fix async loop bug causing all jobs to be considered partially written.
If MAX_TASKS_PER_CHILD is set to a low number, then the Billiard async loop
attempts to call on_job_process_down().  This call causes the process to be
marked dead, preventing cleanup work done by on_process_down.  Without the
cleanup, then certain file descriptors are removed when the process starts
up with the on_poll_start.

Related to issue: celery#1785
@rogerhu

This comment has been minimized.

Show comment
Hide comment
@rogerhu

rogerhu May 1, 2014

Contributor

@ask #2021 appears to help solve both issues. I suspect that the on_poll_start() function that gets triggered when a new process registers with the event loop and closes file descriptors improperly:

            def on_poll_start():  # noqa                                                                                                                                                                     
                if outbound:
                    [hub_add(fd, None, WRITE | ERR, consolidate=True)
                     for fd in diff(active_writes)]
                else:
                    [hub_remove(fd) for fd in diff(active_writes)]
  1. What is the purpose of on_job_process_down? Should there simply be a standard cleanup function when file descriptors are no longer being used? Right now this code sets proc.dead = True and causes problems since on_process_down() assumes cleanup has already been done:
    def on_job_process_down(self, job, pid_gone):
        """Handler called for each job when the process it was assigned to                                                                                                                                   
        exits."""
        if job._write_to and not job._write_to._is_alive():
            # job was partially written                                                                                                                                                                      
            self.on_partial_read(job, job._write_to)
  1. I do think it seems safer to implement some type of reference counting system for the actual removal of file descriptors for both read/write. This patch seems to prevent the on_poll_start() from triggering both issues, but I'm not 100% confident that it solves everything.

FYI - I'm able to replicate the AssertionError issue somewhat consistently now by restarting the celery_test.py program listed above with 8 concurrent workers and adding 8 second delays inside the on_loop_start() of the Worker class (and increasing the worker timeout to 10 seconds)

Contributor

rogerhu commented May 1, 2014

@ask #2021 appears to help solve both issues. I suspect that the on_poll_start() function that gets triggered when a new process registers with the event loop and closes file descriptors improperly:

            def on_poll_start():  # noqa                                                                                                                                                                     
                if outbound:
                    [hub_add(fd, None, WRITE | ERR, consolidate=True)
                     for fd in diff(active_writes)]
                else:
                    [hub_remove(fd) for fd in diff(active_writes)]
  1. What is the purpose of on_job_process_down? Should there simply be a standard cleanup function when file descriptors are no longer being used? Right now this code sets proc.dead = True and causes problems since on_process_down() assumes cleanup has already been done:
    def on_job_process_down(self, job, pid_gone):
        """Handler called for each job when the process it was assigned to                                                                                                                                   
        exits."""
        if job._write_to and not job._write_to._is_alive():
            # job was partially written                                                                                                                                                                      
            self.on_partial_read(job, job._write_to)
  1. I do think it seems safer to implement some type of reference counting system for the actual removal of file descriptors for both read/write. This patch seems to prevent the on_poll_start() from triggering both issues, but I'm not 100% confident that it solves everything.

FYI - I'm able to replicate the AssertionError issue somewhat consistently now by restarting the celery_test.py program listed above with 8 concurrent workers and adding 8 second delays inside the on_loop_start() of the Worker class (and increasing the worker timeout to 10 seconds)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment