Skip to content

Commit

Permalink
Adds ability of asynchronous transports, such as gevent, to operate s…
Browse files Browse the repository at this point in the history
…ynchronously

Adds ability of channels to act synchronously on top of an asynchronous
transport
Adds ability to enforce a synchronous connection handshake on an
asynchronous transport
Catch `EINTR` in a read loop, raise all other read exceptions to fix #44
Immediately closes a connection and raises ConnectionClosed if there is
a frame error when reading from the broker
Immediately closes a connection and raises ConnectionClosed if user
tries to send a frame that is larger than negotiated frame max
Adds Connection.closed property
  • Loading branch information
awestendorf committed Mar 27, 2014
1 parent 61d415c commit e601940
Show file tree
Hide file tree
Showing 15 changed files with 505 additions and 147 deletions.
36 changes: 36 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
sync-and-exception-fixes
========================

Adds `synchronous` keyword argument to `Connection` constructor, which makes
the connection behave as if it were synchronous, no matter the underlying
transport type. Useful for gevent where one does not want to use callbacks.
Applies to all channels, enforces `nowait=False` where applicable.

Adds `synchronous` keyword argument to `Connection.channel`, which makes the
`Channel` object synchronous independent of the underlying transport type.
For example, if a synchronous channel on an asynchronous transport has a
protocol method called with `nowait=False` (where applicable), then the method
will not return until the response frame has been read. Does not enforce
`nowait=False` where applicable.

Adds `synchronous_connect` option to Connection constructor which will enforce
synchronous behavior in `Connection.connect` regardless of the underlying
transport type. Improves features to handle issue #7. Also makes
`Connection.close` synchronous. Defaults to True if transport is synchronous
or `synchronous=True` passed in constructor.
https://github.com/agoragames/haigha/issues/7

Standard and gevent socket transport will raise EnvironmentErrors that aren't
in `(errno.EAGAIN,errno.EWOULDBLOCK,errno.EINTR)`. Fixes notifying read loops
of potential problems and fixes #44
https://github.com/agoragames/haigha/issues/44

Immediately close connection and raise ConnectionClosed if there is a
FrameError in reading frames from the broker in `Connection.read_frames`.

Detect frames that are larger than negotiated frame max in
`Connection.send_frame`, immediately close connection and raise
ConnectionClosed.

Add property `Connection.closed`

0.6.2
=====

Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Beyond that, there is no specific order of features to implement. Generally, the
* Edge cases in frame management
* Improvements to usabililty
* SSL
* Allow nowait when asynchronous transport but Connection put into synchronous mode.

Haigha has been tested exclusively with Python 2.6 and 2.7, but we intend for it to work with the 3.x series as well. Please `report <https://github.com/agoragames/haigha/issues>`_ any issues you may have.

Expand Down
2 changes: 1 addition & 1 deletion development.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-r requirements.txt

chai>=0.2.0
chai>=0.4.7
unittest2
File renamed without changes.
59 changes: 59 additions & 0 deletions examples/synchronous_gevent
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import sys, os
sys.path.append(os.path.abspath("."))
sys.path.append(os.path.abspath(".."))

import logging
import random
import socket
from optparse import OptionParser

from haigha.connection import Connection
from haigha.message import Message

parser = OptionParser(
usage='Usage: synchronous_test [options]'
)
parser.add_option('--user', default='guest', type='string')
parser.add_option('--pass', default='guest', dest='password', type='string')
parser.add_option('--vhost', default='/', type='string')
parser.add_option('--host', default='localhost', type='string')
parser.add_option('--debug', default=0, action='count')

(options,args) = parser.parse_args()

debug = options.debug
level = logging.DEBUG if debug else logging.INFO

# Setup logging
logging.basicConfig(level=level, format="[%(levelname)s %(asctime)s] %(message)s" )
logger = logging.getLogger('haigha')

sock_opts = {
(socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1,
}
connection = Connection(logger=logger, debug=debug,
user=options.user, password=options.password,
vhost=options.vhost, host=options.host,
heartbeat=None,
sock_opts=sock_opts,
transport='gevent',
synchronous=True)

ch = connection.channel()
ch.exchange.declare('foo', 'direct')
ch.queue.declare('bar')
ch.queue.bind('bar', 'foo', 'route')
ch.basic.publish(Message('hello world'), 'foo', 'route')
print 'GET:', ch.basic.get('bar')

ch.basic.publish(Message('hello world'), 'foo', 'route')
ch.basic.publish(Message('hello world'), 'foo', 'route')
print 'PURGE:', ch.queue.purge('bar')

ch.basic.publish(Message('hello world'), 'foo', 'route')
ch.basic.publish(Message('hello world'), 'foo', 'route')
print 'DELETED:', ch.queue.delete('bar')

19 changes: 16 additions & 3 deletions haigha/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class InvalidClass(ChannelError): '''The method frame referenced an invalid clas
class InvalidMethod(ChannelError): '''The method frame referenced an invalid method. Non-fatal.'''
class Inactive(ChannelError): '''Tried to send a content frame while the channel was inactive. Non-fatal.'''

def __init__(self, connection, channel_id, class_map):
def __init__(self, connection, channel_id, class_map, **kwargs):
'''
Initialize with a handle to the connection and an id. Caller must
supply a mapping of {class_id:ProtocolClass} which defines what
Expand Down Expand Up @@ -72,6 +72,8 @@ def __init__(self, connection, channel_id, class_map):
}
self._active = True

self._synchronous = kwargs.get('synchronous', False)

@property
def connection(self):
return self._connection
Expand Down Expand Up @@ -103,6 +105,14 @@ def active(self):
'''
return self._active

@property
def synchronous(self):
'''
Return if this channel is acting synchronous, of its own accord or because
the connection is synchronous.
'''
return self._synchronous or self._connection.synchronous

def add_open_listener(self, listener):
'''
Add a listener for open events on this channel. The listener should be
Expand Down Expand Up @@ -206,7 +216,10 @@ def process_frames(self):
self.dispatch( frame )
except ProtocolClass.FrameUnderflow:
return
except Exception:
except (ConnectionClosed,ChannelClosed):
# Immediately raise if connection or channel is closed
raise
except Exception as e:
# Spec says that channel should be closed if there's a framing error.
# Unsure if we can send close if the current exception is transport
# level (e.g. gevent.GreenletExit)
Expand Down Expand Up @@ -263,7 +276,7 @@ def add_synchronous_cb(self, cb):
'''
Add an expectation of a callback to release a synchronous transaction.
'''
if self.connection.synchronous:
if self.connection.synchronous or self._synchronous:

This comment has been minimized.

Copy link
@vitaly-krugl

vitaly-krugl Apr 18, 2014

@awestendorf
Can the new synchronous(self) property-getter be used here instead of a copy of the logic?

This comment has been minimized.

Copy link
@awestendorf

awestendorf Apr 18, 2014

Author Member

It can, but where possible I try to not use indirection through properties on oft-called methods. I hate pre-optimizations like that, but the state of python JITs leaves much to be desired. This method in particular is called a lot, so it should be as fast as possible. In that regard, the synchronous attribute of the connection probably should be cached in the constructor, or better yet a direct binding used for add_synchronous_cb, similar to what I did for the pattern matching code in torus' Schema.

This comment has been minimized.

Copy link
@vitaly-krugl

vitaly-krugl Apr 18, 2014

Yeah, I think that caching it in the constructor or the direct binding would be less error-prone.

wrapper = SyncWrapper(cb)
self._pending_events.append( wrapper )
while wrapper._read:
Expand Down
5 changes: 2 additions & 3 deletions haigha/classes/protocol_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ def name(self):
raise NotImplementedError('must provide a name for %s'%(self))

def allow_nowait(self):
'''Return True if the transport allows nowait, False otherwise.'''
# hack: this is a function to make testing easier
return not self._channel.connection.synchronous
'''Return True if the transport or channel allows nowait, False otherwise.'''
return not self._channel.synchronous

def _cleanup(self):
'''
Expand Down
94 changes: 75 additions & 19 deletions haigha/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ def __init__(self, **kwargs):
else:
self._transport = transport

# Set these after the transport is initialized, so that we can access the
# synchronous property
self._synchronous = kwargs.get('synchronous',False)
self._synchronous_connect = kwargs.get('synchronous_connect',False) or self.synchronous

self._output_frame_buffer = []
self.connect( self._host, self._port )

Expand Down Expand Up @@ -152,6 +157,11 @@ def frames_written(self):
'''Number of frames written in the lifetime of this connection.'''
return self._frames_written

@property
def closed(self):
'''Return the closed state of the connection.'''
return self._closed

@property
def close_info(self):
'''Return dict with information on why this connection is closed. Will
Expand All @@ -165,13 +175,16 @@ def transport(self):

@property
def synchronous(self):
'''True if transport is synchronous, False otherwise.'''
'''
True if transport is synchronous or the connection has been forced into
synchronous mode, False otherwise.
'''
if self._transport==None:
if self._close_info and len(self._close_info['reply_text'])>0:
raise ConnectionClosed("connection is closed: %s : %s"%\
(self._close_info['reply_code'],self._close_info['reply_text']) )
raise ConnectionClosed("connection is closed")
return self.transport.synchronous
return self.transport.synchronous or self._synchronous

def connect(self, host, port):
'''
Expand All @@ -193,12 +206,31 @@ def connect(self, host, port):
'class_id' : 0,
'method_id' : 0
}

self._transport.connect( (host,port) )
self._transport.write( PROTOCOL_HEADER )

while self.synchronous and not self._connected:
self.read_frames()
if self._synchronous_connect:
# Have to queue this callback just after connect, it can't go into the
# constructor because the channel needs to be "always there" for frame
# processing, but the synchronous callback can't be added until after
# the protocol header has been written. This SHOULD be registered before
# the protocol header is written, in the case where the header bytes
# are written, but this thread/greenlet/context does not return until
# after another thread/greenlet/context has read and processed the
# recv_start frame. Without more re-write to add_sync_cb though, it will
# block on reading responses that will never arrive because the protocol
# header isn't written yet. TBD if needs refactoring. Could encapsulate
# entirely here, wherein read_frames exits if protocol header not yet
# written. Like other synchronous behaviors, adding this callback will
# result in a blocking frame read and process loop until _recv_start and
# any subsequent synchronous callbacks have been processed. In the event
# that this is /not/ a synchronous transport, but the caller wants the
# connect to be synchronous so as to ensure that the connection is ready,
# then do a read frame loop here.
self._channels[0].add_synchronous_cb( self._channels[0]._recv_start )
while not self._connected:
self.read_frames()

def disconnect(self):
'''
Expand Down Expand Up @@ -230,8 +262,8 @@ def transport_closed(self, **kwargs):
TODO: document args
"""
msg = 'transport to %s closed : unknown cause'%(self._host)
self.logger.warning( kwargs.get('msg', msg) )
msg = 'unknown cause'
self.logger.warning( 'transport to %s closed : %s'%(self._host, kwargs.get('msg', msg)) )
self._close_info = {
'reply_code' : kwargs.get('reply_code',0),
'reply_text' : kwargs.get('msg', msg),
Expand All @@ -258,12 +290,16 @@ def _next_channel_id(self):
self._channel_counter = 1
return self._channel_counter

def channel(self, channel_id=None):
def channel(self, channel_id=None, synchronous=False):
"""
Fetch a Channel object identified by the numeric channel_id, or
create that object if it doesn't already exist. If channel_id is not
None but no channel exists for that id, will raise InvalidChannel. If
there are already too many channels open, will raise TooManyChannels.
If synchronous=True, then the channel will act synchronous in all cases
where a protocol method supports `nowait=False`, or where there is an
implied callback in the protocol.
"""
if channel_id is None:
# adjust for channel 0
Expand All @@ -280,7 +316,7 @@ def channel(self, channel_id=None):

# Call open() here so that ConnectionChannel doesn't have it called. Could
# also solve this other ways, but it's a HACK regardless.
rval = Channel(self, channel_id, self._class_map)
rval = Channel(self, channel_id, self._class_map, synchronous=synchronous)
self._channels[ channel_id ] = rval
rval.add_close_listener( self._channel_closed )
rval.open()
Expand All @@ -295,7 +331,7 @@ def _channel_closed(self, channel):
except KeyError:
pass

def close(self, reply_code=0, reply_text='', class_id=0, method_id=0):
def close(self, reply_code=0, reply_text='', class_id=0, method_id=0, disconnect=False):
'''
Close this connection.
'''
Expand All @@ -305,7 +341,12 @@ def close(self, reply_code=0, reply_text='', class_id=0, method_id=0):
'class_id' : class_id,
'method_id' : method_id
}
self._channels[0].close()
if disconnect:
self._closed = True
self.disconnect()
self._callback_close()
else:
self._channels[0].close()

def _callback_open(self):
'''
Expand Down Expand Up @@ -348,13 +389,19 @@ def read_frames(self):
reader = Reader( data )
p_channels = set()

for frame in Frame.read_frames( reader ):
if self._debug > 1:
self.logger.debug( "READ: %s", frame )
self._frames_read += 1
ch = self.channel( frame.channel_id )
ch.buffer_frame( frame )
p_channels.add( ch )
try:
for frame in Frame.read_frames( reader ):
if self._debug > 1:
self.logger.debug( "READ: %s", frame )
self._frames_read += 1
ch = self.channel( frame.channel_id )
ch.buffer_frame( frame )
p_channels.add( ch )
except Frame.FrameError as e:
# Frame error in the peer, disconnect
self.close(reply_code=501, reply_text='frame error from %s : %s'%(self._host, str(e)), class_id=0, method_id=0, disconnect=True)
raise ConnectionClosed("connection is closed: %s : %s"%\
(self._close_info['reply_code'],self._close_info['reply_text']) )

self._transport.process_channels( p_channels )

Expand Down Expand Up @@ -399,6 +446,10 @@ def send_frame(self, frame):

buf = bytearray()
frame.write_frame(buf)
if len(buf) > self._frame_max:
self.close(reply_code=501, reply_text='attempted to send frame of %d bytes, frame max %d'%(len(buf), self._frame_max), class_id=0, method_id=0, disconnect=True)
raise ConnectionClosed("connection is closed: %s : %s"%\
(self._close_info['reply_code'],self._close_info['reply_text']) )
self._transport.write( buf )

self._frames_written += 1
Expand Down Expand Up @@ -437,7 +488,8 @@ def dispatch(self, frame):
if frame.class_id==10:
cb = self._method_map.get( frame.method_id )
if cb:
cb( frame )
method = self.clear_synchronous_cb( cb )
method( frame )
else:
raise Channel.InvalidMethod("unsupported method %d on channel %d",
frame.method_id, self.channel_id )
Expand Down Expand Up @@ -484,6 +536,8 @@ def _send_start_ok(self):
args.write_shortstr(self.connection._locale)
self.send_frame( MethodFrame(self.channel_id, 10, 11, args) )

self.add_synchronous_cb( self._recv_tune )

def _recv_tune(self, method_frame):
self.connection._channel_max = method_frame.args.read_short() or self.connection._channel_max
self.connection._frame_max = method_frame.args.read_long() or self.connection._frame_max
Expand Down Expand Up @@ -521,6 +575,7 @@ def _send_open(self):
args.write_bit(True) # insist flag for older amqp, not used in 0.9.1

self.send_frame( MethodFrame(self.channel_id, 10, 40, args) )
self.add_synchronous_cb( self._recv_open_ok )

def _recv_open_ok(self, method_frame):
self.connection._connected = True
Expand All @@ -534,6 +589,7 @@ def _send_close(self):
args.write_short( self.connection._close_info['class_id'] )
args.write_short( self.connection._close_info['method_id'] )
self.send_frame( MethodFrame(self.channel_id, 10, 50, args) )
self.add_synchronous_cb( self._recv_close_ok )

def _recv_close(self, method_frame):
self.connection._close_info = {
Expand Down
Loading

0 comments on commit e601940

Please sign in to comment.