Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dead lock when calling basic.consume fucntion #54

Closed
simomo opened this issue May 8, 2014 · 2 comments
Closed

dead lock when calling basic.consume fucntion #54

simomo opened this issue May 8, 2014 · 2 comments

Comments

@simomo
Copy link

simomo commented May 8, 2014

Hi awestendorf,

The environment:

keywords: gevent, monkey patch, gevent transport, concurrent

from gevent import monkey; monkey.patch_all();
import gevent
from gevent import socket

from haigha.connection import Connection
from haigha.message import Message


sock_opts = {
    (socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1,
}
rmq_con = Connection(host="localhost", debug=True, transport='gevent', sock_opts=sock_opts)
rmq_ch = rmq_con.channel(synchronous=True)

def message_pump(conn):
    while conn:
        conn.read_frames()
        gevent.sleep()
gevent.spawn(message_pump, rmq_con)

The blocked line:

Some times the process will block, so I look into it and find the root cause is this line

rmq_ch.basic.consume(queue_name, rmq_msg_handler, consumer_tag=queue_name)

is blocked.

Can you imagine the situation which may cause this? or do you have any ideas about it?
Any comments are welcomed~

Thanks !

@simomo
Copy link
Author

simomo commented May 8, 2014

After I interrupted blocked process, I got this tracebak:

^CTraceback (most recent call last): 
  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs) 
  File "tcp_server.py", line 198, in _handle 
    queue_name, exchange_name, fanout_exchange_name, routing_key_p2p, routing_key_p2g = prepare_rmq_things(doctor_id, client_id, token, socket, sock_write_lock)
  File "tcp_server.py", line 127, in prepare_rmq_things
    rmq_ch.basic.consume(queue_name, wrapper_rmq_msg_handler, consumer_tag=queue_name)
  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/basic_class.py", line 101, in consume
    self.channel.add_synchronous_cb( self._recv_consume_ok )
  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 295, in add_synchronous_cb
    self.connection.read_frames()
KeyboardInterrupt
<Greenlet at 0x2664f50: _handle(<socket at 0x27cd9d0 fileno=16 sock=> failed with KeyboardInterrupt

It looks like self.connection.read_frames() blocked.

@simomo
Copy link
Author

simomo commented May 8, 2014

I think I find the root cause, basic.consume will block, if binding same consumer_tag two times.
Because rabbitmq server will thought it as error.

@simomo simomo closed this as completed Dec 2, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant