Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

First running version.

  • Loading branch information...
commit 9f45b69abd4966175d8dcc881679b1dbc05a62e8 0 parents
Alexandre Bourget authored
3  .gitignore
... ... @@ -0,0 +1,3 @@
  1 +*~
  2 +*.pyc
  3 +pyramid_socketio.egg-info/
131 README.txt
... ... @@ -0,0 +1,131 @@
  1 +Gevent-based Socket.IO integration for Pyramid (and WSGI frameworks)
  2 +====================================================================
  3 +
  4 +Simple usage:
  5 +
  6 +<pre>
  7 +### somewhere in a Pyramid view:
  8 +from intr.socketio import SocketIOContext, socketio_manage
  9 +
  10 +
  11 +class ConnectIOContext(SocketIOContext):
  12 + """Starting context, which will go one side or the other"""
  13 + def msg_connect(self, msg):
  14 + if msg.get('context') not in contexts:
  15 + self.io.send(dict(type="error", error="unknown_connect_context",
  16 + msg="You asked for a context that doesn't exist"))
  17 + return
  18 + # Waiting for a msg such as: {'type': connect', 'context': 'interest'}
  19 + newctx = self.switch(contexts[msg['context']])
  20 + if hasattr(newctx, 'startup'):
  21 + newctx.startup(msg)
  22 + # Returning a new IOContext switches the WebSocket context, and will
  23 + # call this context's methods for next incoming messages.
  24 + return newctx
  25 +
  26 + def msg_login(self, msg):
  27 + # Do the login, then wait for the next connect
  28 + from intr.bound_models import User, ObjectId
  29 + u = User.find_one({'_id': ObjectId("4d4892a12a16e62df4000000")})
  30 + intrs = u['interests']
  31 + from intr.views.auth import create_session
  32 + create_session(request, u, intrs)
  33 + print "Logged, created session"
  34 +
  35 +
  36 +class InterestIOContext(SocketIOContext):
  37 + def startup(self, connect_msg):
  38 + print "Started the interest context"
  39 + self.intr_id = connect_msg['interest_id']
  40 + # TODO: make sure we don't leak Sessions from MongoDB!
  41 + from intr.models import mdb # can't import globally, because of Pyramid
  42 + self.db = mdb
  43 + self.conn = BrokerConnection("localhost", "guest", "guest", "/")
  44 + self.chan = self.conn.channel()
  45 + self.queue = Queue("session-%s" % self.io.session.session_id,
  46 + exchange=intr_exchange,
  47 + durable=False, exclusive=True,
  48 + auto_delete=True,
  49 + routing_key="interest.%s" % self.intr_id)
  50 +
  51 + self.producer = Producer(self.chan, exchange=intr_exchange,
  52 + serializer="json",
  53 + routing_key="interest.%s" % self.intr_id)
  54 + self.producer.declare()
  55 + self.consumer = Consumer(self.chan, [self.queue])
  56 + self.consumer.declare()
  57 + self.consumer.register_callback(self.consume_queue_message)
  58 + self.spawn(self.queue_recv)
  59 +
  60 + # Do we need this ? Please fix the session instead, have a new one
  61 + # init'd for each incoming msg, or when calling save(), re-create a new
  62 + # SessionObject.
  63 + request = self.request
  64 + self.user = request.session['user']
  65 + self.temporary = request.session['temporary']
  66 + self.user_id = request.session['user_id']
  67 +
  68 + def consume_queue_message(self, body, message):
  69 + """Callback when receiving anew message from Message Queue"""
  70 + # Do something when received :)
  71 + print "Received message from queue:", self.io.session.session_id, body
  72 + self.io.send(body)
  73 +
  74 + def queue_recv(self):
  75 + """Wait for messages from Queue"""
  76 + self.consumer.consume(no_ack=True)
  77 + # consume queue...
  78 + while True:
  79 + gevent.sleep(0)
  80 + self.conn.drain_events()
  81 + if not self.io.connected():
  82 + return
  83 +
  84 + #
  85 + # Socket messages
  86 + #
  87 + def msg_memorize(self, msg):
  88 + # do something
  89 +
  90 + def msg_forget(self, msg):
  91 + pass
  92 +
  93 + def msg_interest(self, msg):
  94 + pass
  95 +
  96 + def msg_change_privacy(self, msg):
  97 + pass
  98 +
  99 + def msg_get_members(self, msg):
  100 + pass
  101 +
  102 +
  103 +
  104 +
  105 +contexts = {'interest': InterestIOContext,
  106 + 'somewhereelse': SocketIOContext,
  107 + }
  108 +
  109 +
  110 +
  111 +#
  112 +# SOCKET.IO implementation
  113 +#
  114 +@view_config(route_name="socket_io")
  115 +def socket_io(request):
  116 + """Deal with the SocketIO protocol, using SocketIOContext objects"""
  117 + # Offload management to the pyramid_socketio module
  118 +
  119 + retval = socketio_manage(ConnectIOContext(request))
  120 + #print "socketio_manage ended"
  121 + return Response(retval)
  122 +
  123 +
  124 +
  125 +#### Inside __init__.py for your Pyramid application:
  126 +def main(..):
  127 + ...
  128 + config.add_static_view('socket.io/lib', 'intr:static')
  129 + config.add_route('socket_io', 'socket.io/*remaining')
  130 + ....
  131 +</pre>
156 pyramid_socketio/__init__.py
... ... @@ -0,0 +1,156 @@
  1 +# -=- encoding: utf-8 -=-
  2 +
  3 +import logging
  4 +import gevent
  5 +
  6 +__all__ = ['SocketIOError', 'SocketIOContext',
  7 + 'socketio_manage']
  8 +
  9 +log = logging.getLogger(__name__)
  10 +
  11 +class SocketIOError(Exception):
  12 + pass
  13 +
  14 +class SocketIOKeyAssertError(SocketIOError):
  15 + pass
  16 +
  17 +class SocketIOContext(object):
  18 + def __init__(self, request):
  19 + """Created by the call to connect() before doing any generic recv()"""
  20 + self.request = request
  21 + self.io = request.environ['socketio']
  22 + self._parent = None
  23 + if not hasattr(request, 'jobs'):
  24 + request.jobs = []
  25 + # Override self.debug if in production mode
  26 + #self.debug = lambda x: None
  27 +
  28 + def debug(self, msg):
  29 + log.debug("%s: %s" % (self.io.session.session_id, msg))
  30 +
  31 + def spawn(self, callable, *args):
  32 + """Spawn a new process in the context of this request.
  33 +
  34 + It will be monitored by the "watcher" method
  35 + """
  36 + self.debug("Spawning greenlet: %s" % callable.__name__)
  37 + new = gevent.spawn(callable, *args)
  38 + self.request.jobs.append(new)
  39 + return new
  40 +
  41 + def kill(self):
  42 + """Kill the current context, pass control to the parent context if
  43 + "return" is True. If this is the last context, close the connection."""
  44 + # Detach objects to dismantle cyclic references
  45 + # (was that going to happen anyway ?)
  46 + request = self.request
  47 + io = self.io
  48 + self.request = None
  49 + self.io = None
  50 + if self._parent:
  51 + parent = self._parent
  52 + self._parent = None
  53 + return parent
  54 + else:
  55 + io.close()
  56 + return
  57 +
  58 + def switch(self, new_context):
  59 + """Switch context, stack up contexts and pass on request, the caller
  60 + must return the value returned by switch().
  61 + """
  62 + self.debug("Switching context: %s" % new_context.__name__)
  63 + newctx = new_context(self.request)
  64 + newctx._parent = self
  65 + return newctx
  66 +
  67 + def error(self, code, msg):
  68 + """Used to quickly generate an error message"""
  69 + self.debug("error: %s, %s" % (code, msg))
  70 + self.io.send(dict(type='error', error=code, msg=msg))
  71 +
  72 + def msg(self, msg_type, **kwargs):
  73 + """Used to quickly generate an error message"""
  74 + self.debug("message: %s, %s" % (msg_type, kwargs))
  75 + self.io.send(dict(type=msg_type, **kwargs))
  76 +
  77 + def assert_keys(self, msg, elements):
  78 + """Make sure the elements are inside the message, otherwise send an
  79 + error message and skip the message.
  80 + """
  81 + if isinstance(elements, (str, unicode)):
  82 + elements = (elements,)
  83 + for el in elements:
  84 + if el not in msg:
  85 + self.error("bad_request", "Msg type '%s' should include all those keys: %s" % (msg['type'], elements))
  86 + raise SocketIOKeyAssertError()
  87 +
  88 + def __call__(self, msg):
  89 + """Parse the message upon reception and dispatch it to the good method.
  90 + """
  91 + msg_type = "msg_" + msg['type']
  92 + if not hasattr(self, msg_type) or \
  93 + not callable(getattr(self, msg_type)):
  94 + self.error("unknown_command", "Command unknown: %s" % msg['type'])
  95 + return
  96 + try:
  97 + self.debug("Calling msg type: %s with obj: %s" % (msg_type, msg))
  98 + return getattr(self, msg_type)(msg)
  99 + except SocketIOKeyAssertError, e:
  100 + return None
  101 +
  102 +
  103 +def watcher(request):
  104 + """Watch if any of the greenlets for a request have died. If so, kill the request and the socket.
  105 + """
  106 + # TODO: add that if any of the request.jobs die, kill them all and exit
  107 + io = request.environ['socketio']
  108 + gevent.sleep(5.0)
  109 + while True:
  110 + gevent.sleep(1.0)
  111 + if not io.connected():
  112 + gevent.killall(request.jobs)
  113 + return
  114 +
  115 +def socketio_recv(context):
  116 + """Manage messages arriving from Socket.IO, dispatch to context handler"""
  117 + io = context.io
  118 + while True:
  119 + for msg in io.recv():
  120 + # Skip invalid messages
  121 + if not isinstance(msg, dict):
  122 + context.error("bad_request",
  123 + "Your message needs to be JSON-formatted")
  124 + elif 'type' not in msg:
  125 + context.error("bad_request",
  126 + "You need a 'type' attribute in your message")
  127 + else:
  128 + # Call msg in context.
  129 + newctx = context(msg)
  130 +
  131 + # Switch context ?
  132 + if newctx:
  133 + context = newctx
  134 +
  135 + if not io.connected():
  136 + return
  137 +
  138 +def socketio_manage(start_context):
  139 + """Main SocketIO management function, call from within your Pyramid view"""
  140 + request = start_context.request
  141 + io = request.environ['socketio']
  142 +
  143 + if not io.connected():
  144 + # probably asked for something else dude!
  145 + return "there's no reason to get here, you won't get any further. have you mapped socket.io/lib to something ?"
  146 +
  147 + start_context.spawn(socketio_recv, start_context)
  148 +
  149 + # Launch the watcher thread
  150 + killall = gevent.spawn(watcher, request)
  151 +
  152 + gevent.joinall(request.jobs + [killall])
  153 +
  154 + start_context.debug("socketio_manage terminated")
  155 +
  156 + return "done"
70 pyramid_socketio/serve.py
... ... @@ -0,0 +1,70 @@
  1 +#!/usr/bin/env python
  2 +import gevent
  3 +from gevent import monkey; monkey.patch_all()
  4 +
  5 +from ConfigParser import ConfigParser
  6 +import logging
  7 +import logging.config
  8 +import socket
  9 +import sys
  10 +import os
  11 +
  12 +from socketio import SocketIOServer
  13 +from paste.deploy import loadapp
  14 +
  15 +
  16 +host = '127.0.0.1'
  17 +port = 6543
  18 +
  19 +def socketio_serve():
  20 + # See http://bitbucket.org/Jeffrey/socketio/src/9bf2cd777808/examples/chat.py
  21 +
  22 + if len(sys.argv) < 2:
  23 + print "ERROR: Please specify .ini file on command line"
  24 + sys.exit(1)
  25 +
  26 + do_reload = sys.argv[1] == '--reload'
  27 +
  28 + # Setup logging...
  29 + cfgfile = sys.argv[2] if do_reload else sys.argv[1]
  30 + logging.config.fileConfig(cfgfile)
  31 + log = logging.getLogger(__name__)
  32 +
  33 + cfg = ConfigParser()
  34 + cfg.readfp(open(cfgfile))
  35 + sec = 'server:main'
  36 + if sec in cfg.sections():
  37 + opts = cfg.options(sec)
  38 + if 'host' in opts:
  39 + host = cfg.get(sec, 'host')
  40 + if 'port' in opts:
  41 + port = cfg.getint(sec, 'port')
  42 +
  43 + def main():
  44 + # Load application and config.
  45 + app = loadapp('config:%s' % cfgfile, relative_to='.')
  46 + server = SocketIOServer((host, port), app,
  47 + resource="socket.io")
  48 +
  49 + try:
  50 + print "Serving on %s:%d (http://127.0.0.1:%d) ..." % (host, port, port)
  51 + server.serve_forever()
  52 + except socket.error, e:
  53 + print "ERROR SERVING WSGI APP: %s" % e
  54 + sys.exit(1)
  55 +
  56 + def reloader():
  57 + from paste import reloader
  58 + reloader.install()
  59 + reloader.watch_file(cfgfile)
  60 + import glob # Restart on "compile_catalog"
  61 + # TODO: make more generic, and more robust
  62 + for lang in glob.glob('*/locale/*/LC_MESSAGES/*.mo'):
  63 + reloader.watch_file(lang)
  64 + for lang in glob.glob('*/i18n/*/LC_MESSAGES/*.mo'):
  65 + reloader.watch_file(lang)
  66 +
  67 + jobs = [gevent.spawn(main)]
  68 + if do_reload:
  69 + jobs.append(gevent.spawn(reloader))
  70 + gevent.joinall(jobs)
11 pyramid_socketio/servereload.py
... ... @@ -0,0 +1,11 @@
  1 +#!/usr/bin/env python
  2 +
  3 +import os
  4 +import sys
  5 +
  6 +def socketio_serve_reload():
  7 + """Spawn a new process and reload when it dies"""
  8 + while True:
  9 + ret = os.system("socketio-serve %s" % (sys.argv[1]))
  10 + if ret != 3:
  11 + break
42 setup.py
... ... @@ -0,0 +1,42 @@
  1 +import os
  2 +import sys
  3 +
  4 +from setuptools import setup, find_packages
  5 +
  6 +here = os.path.abspath(os.path.dirname(__file__))
  7 +#README = open(os.path.join(here, 'README.txt')).read()
  8 +#CHANGES = open(os.path.join(here, 'CHANGES.txt')).read()
  9 +
  10 +requires = [
  11 + 'pyramid',
  12 + 'gevent',
  13 + 'gevent-socketio',
  14 + 'gevent-websocket',
  15 + 'greenlet',
  16 + ]
  17 +
  18 +setup(name='pyramid_socketio',
  19 + version='0.1',
  20 + description='Gevent-based Socket.IO pyramid integration and helpers',
  21 + #long_description=README + '\n\n' + CHANGES,
  22 + classifiers=[
  23 + "Programming Language :: Python",
  24 + "Framework :: Pylons",
  25 + "Framework :: Pyramid",
  26 + "Topic :: Internet :: WWW/HTTP",
  27 + ],
  28 + author='Alexandre Bourget',
  29 + author_email='alex@bourget.cc',
  30 + url='http://blog.abourget.net',
  31 + keywords='web wsgi pylons pyramid websocket python gevent socketio socket.io',
  32 + packages=find_packages(),
  33 + include_package_data=True,
  34 + zip_safe=False,
  35 + install_requires = requires,
  36 + entry_points = """\
  37 + [console_scripts]
  38 + socketio-serve-reload = pyramid_socketio.servereload:socketio_serve_reload
  39 + socketio-serve = pyramid_socketio.serve:socketio_serve
  40 + """,
  41 + )
  42 +

0 comments on commit 9f45b69

Please sign in to comment.
Something went wrong with that request. Please try again.