Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection.channel() blocked #62

Closed
simomo opened this issue Sep 29, 2014 · 18 comments
Closed

connection.channel() blocked #62

simomo opened this issue Sep 29, 2014 · 18 comments

Comments

@simomo
Copy link

simomo commented Sep 29, 2014

Hi,
I'm using haigh with gevent to build a tcp based realtime chatroom, each socket handler (one greenlet) have their own channel, and all these channels are created from one connection ( singleton ).

My issue is that sometimes when my server are publishing and receiving message very quickly (not "very quickly" actually,it's less than 100 msgs per second) , the connection.channel() block. And once it block, it will never recovery, it seems that the whole connection is blocked, the gevent's streamsearver still works and handle the new connected sockets, but all these sockets are blocked in this step(creating the channel), unless I terminate (ctrl-c) my server process, and restart it.
Sometimes, I can get below trace back information after I type the ctrl-c (terminating the process):

^CTraceback (most recent call last):

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run

    result = self._run(*self.args, **self.kwargs)

  File "tcp_server.py", line 237, in _handle

    rmq_ch = rmq_con.channel()  <------ This is the place where it blocks #####################

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/connection.py", line 343, in channel

    rval.open()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 177, in open

    self.channel.open()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/classes/channel_class.py", line 48, in open

    self.channel.add_synchronous_cb(self._recv_open_ok)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/channel.py", line 316, in add_synchronous_cb

    self.connection.read_frames()

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/connection.py", line 407, in read_frames

    data = self._transport.read(self._heartbeat)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/haigha/transports/gevent_transport.py", line 72, in read

    self._read_wait.wait(timeout)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/event.py", line 74, in wait

    timer = Timeout.start_new(timeout)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/timeout.py", line 119, in start_new

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/timeout.py", line 119, in start_new

    timeout = cls(timeout, exception, ref=ref)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/timeout.py", line 90, in __init__

    self.timer = get_hub().loop.timer(seconds or 0.0, ref=ref, priority=priority)

  File "/root/py_envs/staging_tcp_server/local/lib/python2.7/site-packages/gevent/hub.py", line 166, in get_hub

    return _threadlocal.hub

Any ideas?
Really appreciate your help!

@awestendorf
Copy link
Member

It appears you have enabled synchronous behavior on gevent transport, and you have a pending read. If you're using the transport in synchronous mode, you need to not have any separate greenlets performing the read_frames loop.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

Many thanks for your quick reponse!

Yes, I'm using the synchronous mode of gevent_transport, and this is code:
rmq.py

import logging
import datetime

import gevent
from gevent import socket

from haigha.connection import Connection
from haigha.message import Message

from trace_error import full_stack

sock_opts = {
    (socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1,
}
rmq_con = Connection(host="localhost", debug=2, transport='gevent', sock_opts=sock_opts, synchronous=True, password='passwd')
rmq_ch = rmq_con.channel()

def message_pump(conn):
    while conn:
        try:
            conn.read_frames()
        except Exception, e:
            trace = full_stack()
            print 'message_pump error ', datetime.datetime.now(), '\n', trace
            #logger.error(trace)

        gevent.sleep()
gevent.spawn(message_pump, rmq_con)

and my server.py:

from gevent import monkey; monkey.patch_all();
import gevent
from gevent.server import StreamServer

from rmq import rmq_con


def _handle(socket, address):
    rmq_ch = rmq_con.channel()  # <---- This line will cause the blocking

    # declare some new queues and exchanges
    ....

    # adding consumers to these queues

    # read data from socket, and publish( channel.basic.publish ) the data to queues


if __name__ == '__main__':
    server = StreamServer(('0.0.0.0', 10545), _handle)
    server.serve_forever()

It seems that I don't have any other connection.read_frames() in my codes, is there any chance that I called some haigha apis and these apis called the read_frames functions which caused this issue?

@awestendorf
Copy link
Member

Yup, kill off that message pump and I think you'll be good. It's only necessary if you want to operate completely asynchronous with callbacks.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

It seems that I misunderstood something...
If I delete the codes about message pump, the consumers still will be called once new messages are coming, am I right?

@awestendorf
Copy link
Member

Ah, right, you need it for consumers. In that case, leave it in, remove it from the connection constructor, and then if you need/want channels to operate in a synchronous manner you can create them individually.

It's possible that you're encountering an edge case in how this can be used that needs to be fixed. The goal was to allow for using gevent in either synchronous or asynchronous mode, and separately to make channels synchronous on top of an asynchronous transport.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

Sorry, I didn't get your point.
What's the meaning of leave it in,remove it from the connection constructor, and then if you need/want channels to operate in a synchronous manner you can create them individually.?

Did you mean I need to removed the message_pump function from the rmq.py file, and create this message pump in each greenlet?

@awestendorf
Copy link
Member

Leave the message pump in place, remove synchronous=True from the connection constructor, and then if you need specific channels to operate synchronously try doing that when calling connection.channel.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

OK, I've removed it~
But I didn't understand what the 'channels operate synchronously ' is , did you means some APIs only can be called when the channels are in synchronous mode?

@awestendorf
Copy link
Member

You can create channels that act synchronously even though the rest of the connection is operating asynchronously. It's up to you, and useful for some kinds of business logic such as a chain of operations that set up your bindings.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

I use these codes to define queues and bindings, you mean if the rmq_ch is working on aysnc mode, these code will fail?

    fanout_exchange_name = 'fanout-' + exchange_name
    rmq_ch.queue.declare(queue_name, auto_delete=False)
    logger.debug('declare queue_name over')

    rmq_ch.exchange.declare(exchange_name, 'direct')
    logger.debug('declare exchange over')
    #rmq_ch.exchange.declare(fanout_exchange_name, 'fanout')
    #logger.debug('declare fanout_exchange over')

    rmq_ch.queue.bind(queue_name, exchange_name, routing_key_p2p)
    logger.debug('binding p2p exchange over')

@awestendorf
Copy link
Member

they won't fail, but you won't get any data returned for operations that return data, and any retry/fail logic is much more complicated if you use callbacks. The above is a good example of where using a synchronous channel is valuable.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

Hmmm, I didn't use the rmq_ch.basic.get to get data.
All I'm using are

  • rmq_ch.basic.ack
  • rmq_ch.basic.publish
  • rmq_ch.basic.cancel
  • rmq_ch.basic.consume
  • rmq_ch.close

(rmq_ch is in async mode)

I guess none of them will return data, and currently they all work good :)

@awestendorf
Copy link
Member

In the example I commented on, queue.declare can return data and exchange.declare can throw an exception.

@simomo
Copy link
Author

simomo commented Sep 29, 2014

....
I just know queue.declare can return data ....
Anyway, its returned data is useless for my codes~

And for exchange.declare exception, currently I will just ignore it, it's kind of trade-off~~

I need to keep my channels under async mode to fix the creating channel blocking issue

@simomo
Copy link
Author

simomo commented Oct 11, 2014

Hi,
I found that sometimes when I use

rmq_con = Connection(host="localhost", debug=2, transport='gevent', sock_opts=sock_opts, synchronous=False, password='')

to initialize a connection, rabbitmq would raise:

=ERROR REPORT==== 11-Oct-2014::20:54:34 ===                                     
exception on TCP connection <0.19482.1189> from 127.0.0.1:34205                 
{handshake_timeout,frame_header}                                                

=INFO REPORT==== 11-Oct-2014::20:54:34 ===                                      
closing TCP connection <0.19482.1189> from 127.0.0.1:34205

Did I do something wrong when declaring the async rabbitmq connection?

Thanks

@simomo
Copy link
Author

simomo commented Oct 11, 2014

Find the root cause: message_pump function is necessary when declaring a new connection~

@awestendorf
Copy link
Member

You are correct. There's also a synchronous_connect option.

@simomo
Copy link
Author

simomo commented Dec 2, 2014

Really appreciate your help, this issue never occur again, :)
Many thanks!

@simomo simomo closed this as completed Dec 2, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants