Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError in main celery process #171

Closed
olliewalsh opened this issue Nov 1, 2012 · 22 comments
Closed

TypeError in main celery process #171

olliewalsh opened this issue Nov 1, 2012 · 22 comments

Comments

@olliewalsh
Copy link
Contributor

This occurs quite regularly ~ once a day.

[2012-11-01 08:04:34,107: ERROR/MainProcess] Unrecoverable error: TypeError("'long' object is not iterable",)
Traceback (most recent call last):
  File "c:\myvirtualenv\lib\site-packages\celery\worker\__init__.py", line 347, in start
    component.start()
  File "c:\myvirtualenv\lib\site-packages\celery\worker\consumer.py", line 390, in start
    self.consume_messages()
  File "c:\myvirtualenv\lib\site-packages\celery\worker\consumer.py", line 868, in consume_messages
    self.connection.drain_events(timeout=10.0)
  File "c:\myvirtualenv\lib\site-packages\kombu\connection.py", line 197, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "c:\myvirtualenv\lib\site-packages\kombu\transport\virtual\__init__.py", line 746, in drain_events
    item, channel = get(timeout=timeout)
  File "c:\myvirtualenv\lib\site-packages\kombu\transport\redis.py", line 237, in get
    self.on_poll_empty()
  File "c:\myvirtualenv\lib\site-packages\kombu\transport\redis.py", line 210, in on_poll_empty
    return channel.qos.restore_visible()
  File "c:\myvirtualenv\lib\site-packages\kombu\transport\redis.py", line 102, in restore_visible
    start=start, num=num, withscores=True)
  File "c:\myvirtualenv\lib\site-packages\redis\client.py", line 1176, in zrevrangebyscore
    return self.execute_command(*pieces, **options)
  File "c:\myvirtualenv\lib\site-packages\kombu\transport\redis.py", line 547, in execute_command
    return self.parse_response(conn, command_name, **options)
  File "c:\myvirtualenv\lib\site-packages\redis\client.py", line 372, in parse_response
    return self.response_callbacks[command_name](response, **options)
File "c:\myvirtualenv\lib\site-packages\redis\client.py", line 128, in zset_score_pairs
    it = iter(response)
TypeError: 'long' object is not iterable

Worker (XP SP3 32bit):
2 worker processes.
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_ACKS_LATE = True
Python 2.7.3
celery==3.0.11
celery-with-redis==3.0
kombu==2.4.7
redis==2.7.1

I've not seen this issue on a similar Windows 7 x64 worker although it is not using CELERY_ACKS_LATE = True and has a single worker process.

Client debian squeeze x86_64:
Python 2.7.3
celery==3.0.11
celery-with-redis==3.0
kombu==2.4.7
redis==2.6.2

Redis server debian squeeze x86_64:
redis 2.4.13

@olliewalsh
Copy link
Contributor Author

I was viewing #138 at the time and didn't notice that it was an issue in the kombu repo. I meant to submit this to the celery repo.

I've not managed to reproduce this issue when running a single worker process.

@ask
Copy link
Contributor

ask commented Nov 2, 2012

Kombu is the right place to report this issue so that's fine.

Would you able to print out the value it receives? (Value of response in \redis\client.py", line 128, in zset_score_pairs)

@olliewalsh
Copy link
Contributor Author

The response value is 1.
I've caught the exception in parse_response, in this case the exception was raised in a different callback:
line 208, in string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None)

@olliewalsh
Copy link
Contributor Author

I added logging to redis-py to track everything sent/received and on which socket fd to catch this in action:

[2012-11-13 16:13:31,000: WARNING/MainProcess] <socket object, fd=1260, family=2, type=1, protocol=0> send: ['*10', '$5', 'BRPOP', '$47', ...
[2012-11-13 16:13:31,000: WARNING/MainProcess] <socket object, fd=1260, family=2, type=1, protocol=0>send: ['*3', '$7', 'PUBLISH', '$8', 'celeryev', '$624', ... 
[2012-11-13 16:13:32,986: WARNING/MainProcess] <socket object, fd=1260, family=2, type=1, protocol=0> read: ['*-1'] (The BRPOP response)
...
Lots of traffic on fd==1168, almost all SELECT & PUBLISH for the heartbeat event
...
[2012-11-13 16:18:34,033: WARNING/MainProcess] <socket object, fd=1260, family=2, type=1, protocol=0> read: [':1'] (The PUBLISH response)
TypeError in _brpop_read

I tracked this down to a race between the main celery thread where the BRPOP command is sent and the celery heartbeat thread where a PUBLISH command is sent. If _in_poll is still False the heartbeat thread will not use a new client. This should be True when blocking on BRPOP however _in_poll is set to True after BRPOP has been sent.

@olliewalsh
Copy link
Contributor Author

There is still a problem when PUBLISH and ZREVRANGEBYSCORE are interleaved.

ask pushed a commit that referenced this issue Nov 16, 2012
This removes a race condition where _avail_client can return a polling client instead of a new client.

Fixes issue #171
@olliewalsh
Copy link
Contributor Author

I think ZREVRANGEBYSCORE is slightly different.

redis.Redis.execute_command pops a connection from the list of available connections or creates a new connection. AFAIK List.pop is atomic.

KombuRedis overrides this to use a single connection created in init. It's no longer safe to run redis commands from multiple threads.

Actually, now that I think about it, this was the issue with BRPOP.

@ask
Copy link
Contributor

ask commented Nov 21, 2012

The author of redis-py wrote the KombuRedis class, I'm not sure why it's there but I think it was due to changes in the redis-py API.

@olliewalsh
Copy link
Contributor Author

Yea, I spotted that in 7c362f6

ask pushed a commit that referenced this issue Nov 21, 2012
This removes a race condition where _avail_client can return a polling client instead of a new client.

Fixes issue #171
@olliewalsh
Copy link
Contributor Author

I've taken the easy option and used rabbitmq as the backend for now. I'll try to take another look at this when I have more time to spare.

@vad
Copy link

vad commented Dec 6, 2012

kombu+rabbitmq is not working for me, so i tried redis and i have this issue...

@olliewalsh
Copy link
Contributor Author

kombu+rabbitmq is not working for me

It should work, it's the most common configuration. Have you created another issue for this?

i tried redis and i have this issue...

Any details? Package versions, OS version, using celery? With celery events enabled?

@vad
Copy link

vad commented Dec 7, 2012

Have you created another issue for this?

yes: celery/celery#644

Any details?

Kombu 2.5.3, celery 3.0.12, django-celery 3.0.11 on a debian testing.

With celery events enabled?

yes.

Did you try to remove the client instance in self.conn_or_acquire(client) ?

olliewalsh added a commit to olliewalsh/kombu that referenced this issue Dec 7, 2012
Remove KombuRedis.execute_command. It is not thread safe. The base class implementation is thread safe.
@olliewalsh
Copy link
Contributor Author

Did you try to remove the client instance in self.conn_or_acquire(client) ?

No, I was using 2.4.8. conn_or_acquire was added in the 2.5 branch...

I'm almost certain that the problem is this:
The main thread sends a ZREVRANGEBYSCORE command
The celery heartbeat thread sends a PUBLISH command (using the same connection)
This thread reads the response (the wrong response, but does nothing with it)
The main thread reads the next response which blows up as it's not an iterable.

You need to be extremely unlucky for this to occur. I've not managed to reproduce it on a test system. However it's easy to emulate what would happen by injecting a publish command:

    class KombuRedis(redis.Redis):  # pragma: no cover
        def __init__(self, *args, **kwargs):
            super(KombuRedis, self).__init__(*args, **kwargs)
            self.connection = self.connection_pool.get_connection('_')
            self.counter=0

        def execute_command(self, *args, **options):
            conn = self.connection
            command_name = args[0]
            try:
                conn.send_command(*args)
                # Inject a publish sometimes
                if command_name=="ZREVRANGEBYSCORE":
                    self.counter+=1
                    if self.counter % 10:
                        conn.send_command('PUBLISH', 'celeryev', {'foo':'bar'})
                        self.parse_response(conn, 'PUBLISH')
                return self.parse_response(conn, command_name, **options)
            except redis.ConnectionError:
                conn.disconnect()
                conn.send_command(*args)
                return self.parse_response(conn, command_name, **options)

I think it's safe to remove execute_command completely. The base redis.Redis implementation is thread safe (takes a connection from the pool each time and returned it afterwards).

frol added a commit to frol/kombu that referenced this issue Dec 18, 2012
@dknecht
Copy link

dknecht commented Mar 19, 2013

We are seeing this issue also a few times a day. Any plans to commit this fix?

@frol
Copy link

frol commented Mar 19, 2013

@dknecht FYI my forks of celery and kombu works stable in production for about 3 months without any problems...

@ask
Copy link
Contributor

ask commented Mar 19, 2013

Using this fix? I haven't seen any pull request?

@ask
Copy link
Contributor

ask commented Mar 19, 2013

Btw, you could also try using the 3.0 branch, as I've removed a thread that was accidentaly enabled. The worker should be thread-less when using Redis:

pip install -U https://github.com/celery/celery/zipball/3.0,

@ask
Copy link
Contributor

ask commented Mar 21, 2013

Want to apply this patch, but anyone think we would need the KombuRedis class at all? It was originally added there by the author of redis-py, but I'm not entirely sure of the purpose. We definitely need Redis.connection to get to the underlying socket though, but that's just for the connections used for polling.

@ask
Copy link
Contributor

ask commented Mar 21, 2013

where polling here refers to kqueue/select/epoll

ask added a commit that referenced this issue Mar 21, 2013
@ask
Copy link
Contributor

ask commented Mar 21, 2013

Patch merged into 2.5 and master

@dknecht
Copy link

dknecht commented Mar 21, 2013

Thanks. We will give this a try today.

ask added a commit that referenced this issue Mar 28, 2013
@ask
Copy link
Contributor

ask commented Apr 17, 2013

Fixed

@ask ask closed this as completed Apr 17, 2013
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants