Skip to content

Commit

Permalink
Raise an exception if a channel is closed while processing frames in a
Browse files Browse the repository at this point in the history
synchronous transport. Solves #31
  • Loading branch information
awestendorf committed Jul 15, 2013
1 parent 5985024 commit c03d28c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
10 changes: 10 additions & 0 deletions haigha/channel.py
Expand Up @@ -266,6 +266,16 @@ def add_synchronous_cb(self, cb):
self._pending_events.append( wrapper )
while wrapper._read:
self.connection.read_frames()

# frame processing may have resulted in a closed channel
if self.closed:
if self.close_info and len(self.close_info['reply_text'])>0:
raise ChannelClosed(
"channel %d is closed: %s : %s",
self.channel_id,
self.close_info['reply_code'],
self.close_info['reply_text'] )
raise ChannelClosed()
return wrapper._result
else:
self._pending_events.append( cb )
Expand Down
17 changes: 17 additions & 0 deletions haigha/tests/unit/channel_test.py
Expand Up @@ -363,6 +363,23 @@ def test_add_synchronous_cb_when_transport_synchronous(self):
# in this method
assertEquals( deque([wrapper]), c._pending_events )

def test_add_synchronous_cb_when_transport_synchronous_and_channel_closes(self):
conn = mock()
conn.synchronous = True
c = Channel(conn,None,{})

wrapper = mock()
wrapper._read = True
wrapper._result = 'done'

expect( channel.SyncWrapper ).args( 'foo' ).returns( wrapper )
expect( conn.read_frames )
expect( conn.read_frames ).side_effect(
lambda: setattr(c, '_closed', True) )

with assert_raises( ChannelClosed ):
c.add_synchronous_cb('foo')

def test_clear_synchronous_cb_when_no_pending(self):
c = Channel(None,None,{})
stub( c._flush_pending_events )
Expand Down

3 comments on commit c03d28c

@xjdrew
Copy link
Contributor

@xjdrew xjdrew commented on c03d28c Sep 6, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this patch is not suitable:
in consume function, if my code throw an exception, it will trigger line 1, then trigger this patch's code, throw a ChannelClosed exception; this exception will overlap my exception, and I get no idea what's wrong with my code. It's difficult to debug.

  def process_frames(self):
    '''
    Process the input buffer.
    '''
    while len(self._frame_buffer):
      try:
        # It would make sense to call next_frame, but it's technically faster
        # to repeat the code here.
        frame = self._frame_buffer.popleft()
        self.dispatch( frame )
      except ProtocolClass.FrameUnderflow:
        return
      except Exception:
        # Spec says that channel should be closed if there's a framing error.
        # Unsure if we can send close if the current exception is transport
        # level (e.g. gevent.GreenletExit)
        self.close( 500, "Failed to dispatch %s"%(str(frame)) )                  -------------------->  <line 1>
        raise

@awestendorf
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you submit a PR that does what you're looking for? I never use the synchronous transport but I'd like it to work right, I just don't have any context for the situation. Maybe another issue with a different failure case that I can use as a regression test?

@xjdrew
Copy link
Contributor

@xjdrew xjdrew commented on c03d28c Sep 7, 2013

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review my pull request and modify rpc_server.py file. you can make a typo in on_request function, for example, "msg -> mgs", you will see!

Please sign in to comment.