Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

None is passed to Client.listen callback instead of brukva.client.Message instance #10

Open
kmike opened this Issue Apr 16, 2011 · 7 comments

Comments

Projects
None yet
3 participants

kmike commented Apr 16, 2011

I got this problem again.

Stack trace from ipdb's 'bt' command:

/Users/kmike/svn/tornadio/tornadio/server.py(71)__init__()
     67             except Exception, ex:
     68                 logging.error('Failed to start Flash policy server: %s', ex)
     69 
     70         logging.info('Entering IOLoop...')
---> 71         io_loop.start()

  /Users/kmike/svn/tornado/tornado/ioloop.py(270)start()
    268                 fd, events = self._events.popitem()
    269                 try:
--> 270                     self._handlers[fd](fd, events)
    271                 except (KeyboardInterrupt, SystemExit):
    272                     raise

  /Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
    171                 callback(*args, **kwargs)
    172         else:
--> 173             callback(*args, **kwargs)
    174     if getattr(fn, 'stack_context_wrapped', False):
    175         return fn

  /Users/kmike/svn/tornado/tornado/iostream.py(199)_handle_events()
    197         try:
    198             if events & self.io_loop.READ:
--> 199                 self._handle_read()
    200             if not self.socket:
    201                 return

  /Users/kmike/svn/tornado/tornado/iostream.py(251)_handle_read()
    249                 # can't see it; the only way to find out if it's there is to

    250                 # try to read it.

--> 251                 result = self._read_to_buffer()
    252             except Exception:
    253                 self.close()

  /Users/kmike/svn/tornado/tornado/iostream.py(287)_read_to_buffer()
    285         """
    286         try:
--> 287             chunk = self._read_from_socket()
    288         except socket.error, e:
    289             # ssl.SSLError is a subclass of socket.error


  /Users/kmike/svn/tornado/tornado/iostream.py(275)_read_from_socket()
    273                 raise
    274         if not chunk:
--> 275             self.close()
    276             return None
    277         return chunk

  /Users/kmike/svn/tornado/tornado/iostream.py(180)close()
    178             self.socket = None
    179             if self._close_callback:
--> 180                 self._run_callback(self._close_callback)
    181 
    182     def reading(self):

  /Users/kmike/svn/tornado/tornado/iostream.py(230)_run_callback()
    228             # inside our blanket exception handler rather than outside.

    229             with stack_context.NullContext():
--> 230                 callback(*args, **kwargs)
    231         except:
    232             logging.error("Uncaught exception, closing connection.",

  /Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
    171                 callback(*args, **kwargs)
    172         else:
--> 173             callback(*args, **kwargs)
    174     if getattr(fn, 'stack_context_wrapped', False):
    175         return fn

  /Users/kmike/svn/tornado/tornado/websocket.py(204)on_connection_close()
    202     def on_connection_close(self):
    203         self.client_terminated = True
--> 204         self.on_close()
    205 
    206     def _not_supported(self, *args, **kwargs):

  /Users/kmike/svn/tornadio/tornadio/persistent.py(84)on_close()
     82     def on_close(self):
     83         try:
---> 84             self.connection.on_close()
     85         finally:
     86             self.connection.is_closed = True

  /Users/kmike/dev/planor/realtime/connection.py(73)on_close()
     71     def on_close(self):
     72         if self.authenticated:
---> 73             self.logout()
     74 
     75     def logout(self):

  /Users/kmike/dev/planor/realtime/connection.py(77)logout()
     75     def logout(self):
     76         self.authenticated = False
---> 77         self.pubsub_client.disconnect()
     78         for handler in self.handlers:
     79             handler.on_logout()

  /Users/kmike/svn/brukva/brukva/client.py(334)disconnect()
    332 
    333     def disconnect(self):
--> 334         self.connection.disconnect()
    335 
    336     def on_connect(self):

  /Users/kmike/svn/brukva/brukva/client.py(141)disconnect()
    139             except socket.error, e:
    140                 pass
--> 141             self._stream = None
    142 
    143     def write(self, data, try_left=None):

  /Users/kmike/svn/brukva/brukva/client.py(873)listen()
    871                     raise response
    872                 result = self.format_reply(cmd_listen, response)
--> 873                 ctx.ret_call(result)
    874 
    875     ### CAS


  /Users/kmike/svn/brukva/brukva/client.py(39)__exit__()
     37 
     38         if self.is_active:
---> 39             self._call_callbacks(value)
     40             return True
     41         else:

  /Users/kmike/svn/brukva/brukva/client.py(29)_call_callbacks()
     27                     cb(value)
     28             else:
---> 29                 self.callbacks(value)
     30 
     31     def __enter__(self):

  /Users/kmike/dev/planor/realtime/connection.py(83)on_pubsub_message()
     79             handler.on_logout()
     80 
     81     def on_pubsub_message(self, message):
     82         for handler in self.handlers:
---> 83             handler.on_pubsub_message(message)

None
> /Users/kmike/dev/planor/realtime/handlers/debug.py(14)on_pubsub_message()
     12         logging.debug(message)
     13         if message is None:
---> 14             import ipdb; ipdb.set_trace()
     15         #logging.debug('channel: %s, body: %s' % (message.channel, message.body))

     16 

kmike commented Apr 16, 2011

Hmm, this may be because I don't unsubscribe before disconnecting.

The bad thing is that exception occurred in Client.listen callback was printed to stderr and ignored so I haven't noticed it immediately:

Exception AttributeError: "'NoneType' object has no attribute 'channel'" in <generator object listen at 0x102a70aa0> ignored

@evilkost evilkost closed this Apr 26, 2011

kmike commented Apr 26, 2011

I'm still getting None as a message sometimes (with latest master):

  /Users/kmike/svn/tornadio/tornadio/server.py(71)__init__()
     67             except Exception, ex:
     68                 logging.error('Failed to start Flash policy server: %s', ex)
     69 
     70         logging.info('Entering IOLoop...')
---> 71         io_loop.start()

  /Users/kmike/svn/tornado/tornado/ioloop.py(220)start()
    218             callbacks = self._callbacks
    219             self._callbacks = []
--> 220             for callback in callbacks:
    221                 self._run_callback(callback)
    222 

  /Users/kmike/svn/brukva/brukva/client.py(914)listen()
    912                         break
    913                 else:
--> 914                     ctx.ret_call(result)
    915 
    916     ### CAS


  /Users/kmike/svn/brukva/brukva/client.py(43)__exit__()
     41 
     42         if self.is_active:
---> 43             self.ret_call(value)
     44             return True
     45         else:

  /Users/kmike/svn/brukva/brukva/client.py(56)ret_call()
     54     def ret_call(self, value):
     55         self.is_active = False
---> 56         self._call_callbacks(self.callbacks, value)
     57         self.is_active = True
     58 

  /Users/kmike/svn/brukva/brukva/client.py(30)_call_callbacks()
     28                     cb(value)
     29             else:
---> 30                 callbacks(value)
     31 
     32     def __enter__(self):

> /Users/kmike/dev/planor/realtime/connection.py(84)on_pubsub_message()
     82         if message is None:
     83             import ipdb; ipdb.set_trace()
---> 84             logging.warning('on_pubsub_message is None')
     85             return
     86

Don't know if it is a bug: pub/sub seems to work otherwise. Can you share some knowledge about what the problem may be?

State (maybe it will help):

ipdb> self.pubsub_client.subscribed
True

ipdb> self.pubsub_client._waiting_callbacks
defaultdict(<type 'list'>, {})

ipdb> self.pubsub_client.queue
[]

kmike commented Apr 26, 2011

Btw,

self.is_active = False
self._call_callbacks(self.callbacks, value)
self.is_active = True

always seems scary (as well as ExecutionContext.enable and ExecutionContext.disable) because is_active can become incorrect if exception in _call_callbacks occurs and it is not clear how this is handled. Though I've checked the code when 'enable' and 'disable' methods were introduced and they were applied carefully, with all edge cases handled.

@evilkost evilkost reopened this Apr 26, 2011

Owner

evilkost commented Apr 26, 2011

Sorry, i can't reproduce. Could you try 85c67d9fe and show logs for additional info.

@evilkost evilkost closed this Apr 26, 2011

@evilkost evilkost reopened this Apr 26, 2011

kmike commented Apr 26, 2011

Yes, sure. Thanks for moving things forward! It now produces this:

ERROR:brukva.client:None
Traceback (most recent call last):
  File "/Users/kmike/svn/brukva/brukva/client.py", line 901, in listen
    data = yield async(self.connection.readline)()
GeneratorExit
Owner

evilkost commented Apr 26, 2011

I'm confused by (type_=GeneratorExit, value==None) in ExecutionContext.exit . And why None is not replaced with Exception?!:

    if self.error_wrapper:
        value = self.error_wrapper(value)
    else:
        value = value or Exception(
            'Strange exception with None value type: %s; tb: %s' %
            (type_, '\n'.join(traceback.format_tb(tb))
        ))

Can't come up with test case to reproduce this. Could you extract example from your project?

[E 111211 10:54:35 client:49] None
Traceback (most recent call last):
File "/usr/local/lib/python2.6/dist-packages/brukva-0.0.1-py2.6.egg/brukva/client.py", line 926, in listen
data = yield async(self.connection.readline)()
GeneratorExit

Seems like this issue still holds. Here is scenerio for to reproduce it. Make a long polling for a while but if cilent move to idle state brukva fire this exception.

class MessageUpdatesHandler(BaseHandler):
    @tornado.web.authenticated
    @tornado.web.asynchronous
    def post(self):
        self.listing_id = self.get_argument("listing_id", None)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe(self.listing_id)
        self.client.listen(self.on_new_messages)

    def on_new_messages(self, messages):
        # Closed client connection
        if self.request.connection.stream.closed():
            return
        self.finish(dict(messages=str(messages.body)))
        self.client.unsubscribe(self.listing_id)

    def on_connection_close(self):
        # unsubscribe user from channel
        self.client.unsubscribe(self.listing_id)
        self.client.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment