Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial commit

  • Loading branch information...
commit 85534ef0f802a3ee0a38287921d83f9fbc446956 0 parents
@guyzmo authored
163 README.md
@@ -0,0 +1,163 @@
+Event Source Library for Python
+===============================
+
+This library implements W3C Draft's on event-source:
+ * http://dev.w3.org/html5/eventsource/
+
+It enables a halfduplex communication from server to client, but initiated
+by the client, through standard HTTP(S) communication.
+
+Dependances
+===========
+
+ - Fairly recent python (tested with 2.7)
+ - Fairly recent tornado (tested with 2.2.1)
+
+Usage
+=====
+
+ 1. Launch the server:
+
+ python event_source/event_source_listener.py -P 8888 -i -k 50000
+
+ 2. Launch the client:
+
+ python event_source/event_source_client.py 69:69:69:69:69:69 -r 5000
+
+ 3. Send requests:
+
+ python event_source/send_request.py 69:69:69:69:69:69 ping "42"
+ python event_source/send_request.py 69:69:69:69:69:69 close
+
+Command Line arguments
+======================
+
+ - event_source_listener:
+
+ usage: event_source/event_source_listener.py [-h] [-H HOST] [-P PORT] [-d]
+ [-j] [-k KEEPALIVE] [-i]
+
+ Event Source Listener
+
+ optional arguments:
+ -h, --help show this help message and exit
+ -H HOST, --host HOST Host to bind on
+ -P PORT, --port PORT Port to bind on
+ -d, --debug enables debug output
+ -j, --json to enable JSON Event
+ -k KEEPALIVE, --keepalive KEEPALIVE
+ Keepalive timeout
+ -i, --id to generate identifiers
+
+ - event_source_client:
+
+ usage: event_source/event_source_client.py [-h] [-H HOST] [-P PORT] [-d]
+ [-r RETRY]
+ token
+
+ Event Source Client
+
+ positional arguments:
+ token Token to be used for connection
+
+ optional arguments:
+ -h, --help show this help message and exit
+ -H HOST, --host HOST Host to connect to
+ -P PORT, --port PORT Port to be used connection
+ -d, --debug enables debug output
+ -r RETRY, --retry RETRY
+ Reconnection timeout
+
+ - send_request:
+
+ usage: event_source/send_request.py [-h] [-H HOST] [-P PORT] [-j]
+ token action [data]
+
+ Generates event for Event Source Library
+
+ positional arguments:
+ token Token to be used for connection
+ action Action to send
+ data Data to be sent
+
+ optional arguments:
+ -h, --help show this help message and exit
+ -H HOST, --host HOST Host to connect to
+ -P PORT, --port PORT Port to be used connection
+ -j, --json Treat data as JSON
+
+
+Integrate
+=========
+
+On the server side, basically all you have to do is to add the following to your code:
+
+ from event_source import event_source_listener
+
+ application = tornado.web.Application([
+ (r"/(.*)/(.*)", event_source_listener.EventSourceHandler,
+ dict(event_class=EVENT,
+ keepalive=KEEPALIVE)),
+ ])
+
+ application.listen(PORT)
+ tornado.ioloop.IOLoop.instance().start()
+
+where:
+ - PORT is an integer for the port to bind to
+ - KEEPALIVE is an integer for the timeout between two keepalive messages (to protect from disconnections)
+ - EVENT is a event_source_listener.Event based class, either one you made or
+ - event_source_listener.StringEvent : Each event gets and resends multiline strings
+ - event_source_listener.StringIdEvent : Each event gets and resends multiline strings, with an unique id for each event
+ - event_source_listener.JSONEvent : Each event gets and resends JSON valid strings
+ - event_source_listener.JSONIdEvent : Each event gets and resends JSON valid string, with an unique id for each event
+
+Extend
+======
+
+To extend the behaviour of the event source library, without breaking event_source
+definition, the Event based classes implements all processing elements that shall
+be done on events.
+
+There is two abstract classes that defines Event:
+ - event_source_listener.Event : defines the constructor of an Event
+ - event_source_listener.EventId : defines an always incrementing id handler
+
+here is an example to create a new Event that takes multiline data and join it in a one
+line string seperated with semi-colons.
+
+ class OneLineEvent(Event):
+ ACTIONS = ["ping",Event.FINISH]
+
+ """Property to enable multiline output of the value"""
+ def get_value(self):
+ # replace carriage returns by semi-colons
+ # this method shall always return a list (even if one value)
+ return [";".join([line for line in self._value.split('\n')])]
+
+ value = property(get_value,set_value)
+
+And now, I want to add basic id support to OneLineEvent, in OneLineIdEvent,
+nothing is easier :
+
+ class OneLineIdEvent(OneLineEvent,IdEvent):
+ id = property(IdEvent.get_value)
+
+Or if I want the id to be a timestamp:
+
+ import time
+ class OneLineTimeStampEvent(OneLineEvent):
+ id = property(lambda s: "%f" % (time.time(),))
+
+You can change the behaviour of a few things in a Event-based class:
+ - Event.LISTEN contains the GET action to open a connection (per default "poll")
+ - Event.FINISH contains the POST action to close a connection (per default "close")
+ - Event.RETRY contains the POST action to define the timeout after reconnecting on network disconnection (per default "0", which means disabled)
+ - in the Event.ACTIONS list, you define what POST actions are allowed, per default, only Event.FINISH is allowed.
+ - Event.content_type contains the "content_type" that will be asked for every form (it is not enforced).
+
+To change the way events are generated, you can directly call EventSourceHandler.buffer_event()
+to create a new event to be sent. But the post action is best, at least while WSGI can't handle
+correctly long polling connections.
+
+EOF
0  event_source/__init__.py
No changes.
139 event_source/event_source_client.py
@@ -0,0 +1,139 @@
+import sys
+import time
+import argparse
+import logging
+log = logging.getLogger('eventsource.client')
+
+from tornado.ioloop import IOLoop
+from tornado.httpclient import AsyncHTTPClient, HTTPRequest
+
+class Event(object):
+ """
+ Defines a received event
+ """
+ def __init__(self):
+ self.name = None
+ self.data = None
+ self.id = None
+
+ def __repr__(self):
+ return "Event<%s,%s,%s>" % (str(self.id), str(self.name), str(self.data.replace('\n','\\n')))
+
+class EventSourceClient(object):
+ def __init__(self,url,action,target,callback=None,retry=-1):
+ self._url = "http://%s/%s/%s" % (url,action,target)
+ AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
+ self.http_client = AsyncHTTPClient()
+ self.http_request = HTTPRequest(url=self._url,
+ method='GET',
+ headers={"content-type":"text/event-stream"},
+ request_timeout=0,
+ streaming_callback=self.handle_stream)
+ if callback is None:
+ self.cb = lambda e: log.info( "received %s" % (e,) )
+ else:
+ self.cb = callback
+ self.retry_timeout = int(retry)
+
+ def poll(self):
+ if self.retry_timeout == 0:
+ self.http_client.fetch(self.http_request, self.handle_request)
+ IOLoop.instance().start()
+ while self.retry_timeout!=0:
+ self.http_client.fetch(self.http_request, self.handle_request)
+ IOLoop.instance().start()
+ time.sleep(self.retry_timeout/1000)
+
+ def end(self):
+ self.retry_timeout=0
+ IOLoop.instance().stop()
+
+ def handle_stream(self,message):
+ event = Event()
+ for line in message.strip('\r\n').split('\r\n'):
+ (field, value) = line.split(":",1)
+ if field == 'event':
+ event.name = value
+ elif field == 'data':
+ value = value.lstrip()
+ if event.data is None:
+ event.data = value
+ else:
+ event.data = "%s\n%s" % (event.data, value)
+ elif field == 'id':
+ event.id = value
+ elif field == 'retry':
+ try:
+ self.retry_timeout = int(value)
+ log.info( "timeout reset: %s" % (value,) )
+ except ValueError:
+ pass
+ elif field == '':
+ log.info( "received comment: %s" % (value,) )
+ else:
+ raise Exception("Unknown field !")
+ if event.name is not None:
+ self.cb(event)
+
+
+ def handle_request(self,response):
+ if response.error:
+ log.error(response.error)
+ else:
+ log.info("disconnection requested")
+ self.retry_timeout=0
+ IOLoop.instance().stop()
+
+def start():
+ parser = argparse.ArgumentParser(prog=sys.argv[0],
+ description="Event Source Client")
+ parser.add_argument("-H",
+ "--host",
+ dest="host",
+ default='127.0.0.1',
+ help='Host to connect to')
+ # PORT ARGUMENT
+ parser.add_argument("-P",
+ "--port",
+ dest="port",
+ default='8888',
+ help='Port to be used connection')
+
+ parser.add_argument("-d",
+ "--debug",
+ dest="debug",
+ action="store_true",
+ help='enables debug output')
+
+ parser.add_argument("-r",
+ "--retry",
+ dest="retry",
+ default='-1',
+ help='Reconnection timeout')
+
+ parser.add_argument(dest="token",
+ help='Token to be used for connection')
+
+ args = parser.parse_args(sys.argv[1:])
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+ else:
+ logging.basicConfig(level=logging.INFO)
+
+ ###
+
+ def log_events(event):
+ log.info( "received %s" % (event,) )
+
+ EventSourceClient(url="%(host)s:%(port)s" % args.__dict__,
+ action="poll",
+ target=args.token,
+ retry=args.retry).poll()
+
+ ###
+
+
+if __name__ == "__main__":
+ start()
+
385 event_source/event_source_listener.py
@@ -0,0 +1,385 @@
+# resources:
+# * http://stackoverflow.com/questions/10665569/websocket-event-source-implementation-to-expose-a-two-way-rpc-to-a-python-dj
+# * http://stackoverflow.com/questions/8812715/using-a-simple-python-generator-as-a-co-routine-in-a-tornado-async-handler
+# * http://dev.w3.org/html5/eventsource/#event-stream-interpretation
+
+import sys
+import time
+import argparse
+import logging
+log = logging.getLogger("eventsource.listener")
+
+import json
+import httplib
+import tornado.web
+import tornado.ioloop
+
+"""Event base"""
+
+class Event(object):
+ """
+ Class that defines an event, its behaviour and the matching actions
+
+ LISTEN is the GET event that will open an event source communication
+ FINISH is the POST event that will end an event source communication started by LISTEN
+ RETRY is the POST event that defines reconnection timeouts for the client
+ ACTIONS contains the list of acceptable POST targets.
+
+ target is the token that matches an event source channel
+ action contains the name of the action (which shall be in ACTIONS)
+ value contains a list of every lines of the value to be parsed
+ """
+ content_type = "text/plain"
+
+ LISTEN = "poll"
+ FINISH = "close"
+ RETRY = "retry"
+ ACTIONS=[FINISH]
+
+ """Property to encapsulate processing on value"""
+ def get_value(self):
+ return self._value
+
+ def set_value(self, v):
+ self._value = v
+
+ value = property(get_value,set_value)
+ id = None
+
+ def __init__(self, target, action, value=None):
+ """
+ Creates a new Event object with
+ @param target a string matching an open channel
+ @param action a string matching an action in the ACTIONS list
+ @param value a value to be embedded
+ """
+ self.target = target
+ self.action = action
+ self.set_value(value)
+
+class EventId(object):
+ cnt = 0
+ def get_id(self):
+ if self.cnt == EventId.cnt:
+ self.cnt = EventId.cnt
+ EventId.cnt+=1
+ return self.cnt
+ id = property(get_id)
+
+""" Reusable events """
+
+class StringEvent(Event):
+ ACTIONS=["ping",Event.FINISH]
+
+ """Property to enable multiline output of the value"""
+ def get_value(self):
+ return [line for line in self._value.split('\n')]
+
+ value = property(get_value,Event.set_value)
+
+class JSONEvent(Event):
+ content_type = "application/json"
+
+ LISTEN = "poll"
+ FINISH = "close"
+ ACTIONS=["ping",FINISH]
+
+ """Property to enable JSON checking of the value"""
+ def get_value(self):
+ return [json.dumps(self._value)]
+
+ def set_value(self, v):
+ self._value = json.loads(v)
+
+ value = property(get_value,set_value)
+
+class StringIdEvent(StringEvent,EventId):
+ ACTIONS=["ping",Event.RETRY,Event.FINISH]
+
+ id = property(EventId.get_id)
+
+class JSONIdEvent(JSONEvent,EventId):
+ content_type = JSONEvent.content_type
+ ACTIONS=["ping",Event.RETRY,Event.FINISH]
+
+ id = property(EventId.get_id)
+
+##
+"""EventSource mechanism"""
+
+class EventSourceHandler(tornado.web.RequestHandler):
+ _connected = {}
+ _events = {}
+
+ def initialize(self, event_class=StringEvent,keepalive=0):
+ """
+ Takes an Event based class to define the event's handling
+ """
+ self._event_class = event_class
+ self._retry = None
+ if keepalive is not 0:
+ self._keepalive = tornado.ioloop.PeriodicCallback(self.push_keepalive, keepalive);
+
+
+ """Tools"""
+
+ @tornado.web.asynchronous
+ def push_keepalive(self):
+ log.debug("push_keepalive()")
+ self.write(": keepalive %s\r\n\r\n" % (unicode(time.time())))
+ self.flush()
+
+ def push(self, event):
+ """
+ For a given event, write event-source outputs on current handler
+ @param event Event based incoming event
+ """
+ log.debug("push(%s,%s,%s)" % (event.id,event.action,event.value))
+ if hasattr(event, "id"):
+ self.write("id: %s\r\n" % (unicode(event.id)))
+ if self._retry is not None:
+ self.write("retry: %s\r\n" % (unicode(self._retry)))
+ self._retry = None
+ self.write("event: %s\r\n" % (unicode(event.action)))
+ for line in event.value:
+ self.write("data: %s\r\n" % (unicode(line),))
+ self.write("\r\n")
+ self.flush()
+
+ def buffer_event(self, target, action, value=None):
+ """
+ creates and store an event for the target
+
+ @param target string identifying current target
+ @param action string matching one of Event.ACTIONS
+ @param value string containing a value
+ """
+ self._events[target].append(self._event_class(target, action, value))
+
+ def is_connected(self, target):
+ """
+ @param target string identifying a given target
+ @return true if target is connected
+ """
+ return target in self._connected.values()
+
+ def set_connected(self, target):
+ """
+ registers target as being connected
+
+ @param target string identifying a given target
+
+ this method will add target to the connected list,
+ and create an empty event buffer
+ """
+ log.debug("set_connected(%s)" % (target,))
+ self._connected[self] = target
+ self._events[target] = []
+
+ def set_disconnected(self):
+ """
+ unregisters current handler as being connected
+
+ this method will remove target from the connected list,
+ and delete the event buffer
+ """
+ try:
+ target = self._connected[self]
+ log.debug("set_disconnected(%s)" % (target,))
+ self._keepalive.stop()
+ del(self._events[target])
+ del(self._connected[self])
+ except Exception, err:
+ log.error("set_disconnected(%s,%s): %s", str(self), target, err)
+
+ def write_error(self, status_code, **kwargs):
+ """
+ overloads the write_error() method of RequestHandler, to
+ support more explicit messages than only the ones from httplib.
+ """
+ if self.settings.get("debug") and "exc_info" in kwargs:
+ # in debug mode, try to send a traceback
+ self.set_header('Content-Type', 'text/plain')
+ for line in traceback.format_exception(*kwargs["exc_info"]):
+ self.write(line)
+ self.finish()
+ else:
+ if 'mesg' in kwargs:
+ self.finish("<html><title>%(code)d: %(message)s</title>"
+ "<body>%(code)d: %(mesg)s</body></html>\n" % {
+ "code": status_code,
+ "message": httplib.responses[status_code],
+ "mesg": kwargs["mesg"],
+ })
+ else:
+ self.finish("<html><title>%(code)d: %(message)s</title>"
+ "<body>%(code)d: %(message)s</body></html>\n" % {
+ "code": status_code,
+ "message": httplib.responses[status_code],
+ })
+
+ """Synchronous actions"""
+
+ def post(self,action,target):
+ """
+ Triggers an event
+
+ @param action string defining the type of event
+ @param target string defining the target handler to send it to
+
+ this method will look for the request body to get post's data.
+ """
+ log.debug("post(%s,%s)" % (target,action))
+ self.set_header("Accept", self._event_class.content_type)
+ if target not in self._connected.values():
+ self.send_error(404,mesg="Target is not connected")
+ elif action not in self._event_class.ACTIONS:
+ self.send_error(404,mesg="Unknown action requested")
+ else:
+ try:
+ self.buffer_event(target,action,self.request.body)
+ except ValueError, ve:
+ self.send_error(400,mesg="JSON data is not properly formatted: <br />%s" % (ve,))
+
+ """Asynchronous actions"""
+
+ def _event_generator(self,target):
+ """
+ parses all events buffered for target and yield them
+ """
+ for ev in self._events[target]:
+ self._events[target].remove(ev)
+ yield ev
+
+ def _event_loop(self):
+ """
+ for target matching current handler, gets and forwards all events
+ until Event.FINISH is reached, and then closes the channel.
+ """
+ if self.is_connected(self.target):
+ for event in self._event_generator(self.target):
+ if self._event_class.RETRY in self._event_class.ACTIONS:
+ if event.action == self._event_class.RETRY:
+ try:
+ self._retry = int(event.value[0])
+ continue
+ except ValueError:
+ log.error("incorrect retry value: %s" % (event.value,))
+ if event.action == self._event_class.FINISH:
+ self.set_disconnected()
+ self.finish()
+ return
+ self.push(event)
+ tornado.ioloop.IOLoop.instance().add_callback(self._event_loop)
+
+ @tornado.web.asynchronous
+ def get(self,action,target):
+ """
+ Opens a new event_source connection and wait for events to come
+ Returns error 423 if the target token already exists
+ Redirects to / if action is not matching Event.LISTEN.
+ """
+ log.debug("get(%s,%s)" % (target, action))
+ if action == self._event_class.LISTEN:
+ self.set_header("Content-Type", "text/event-stream")
+ self.set_header("Cache-Control", "no-cache")
+ self.target = target
+ if self.is_connected(target):
+ self.send_error(423,mesg="Target is already connected")
+ return
+ self.set_connected(target)
+ tornado.ioloop.IOLoop.instance().add_callback(self._event_loop)
+ self._keepalive.start()
+ else:
+ self.redirect("/",permanent=True)
+
+ def on_connection_close(self):
+ """
+ overloads RequestHandler's on_connection_close to disconnect
+ currents handler on client's socket disconnection.
+ """
+ log.debug("on_connection_close()")
+ self.set_disconnected()
+
+###
+
+def start():
+ parser = argparse.ArgumentParser(prog=sys.argv[0],
+ description="Event Source Listener")
+ parser.add_argument("-H",
+ "--host",
+ dest="host",
+ default='0.0.0.0',
+ help='Host to bind on')
+ # PORT ARGUMENT
+ parser.add_argument("-P",
+ "--port",
+ dest="port",
+ default='8888',
+ help='Port to bind on')
+
+ parser.add_argument("-d",
+ "--debug",
+ dest="debug",
+ action="store_true",
+ help='enables debug output')
+
+ parser.add_argument("-j",
+ "--json",
+ dest="json",
+ action="store_true",
+ help='to enable JSON Event')
+
+ parser.add_argument("-k",
+ "--keepalive",
+ dest="keepalive",
+ default="0",
+ help='Keepalive timeout')
+
+ parser.add_argument("-i",
+ "--id",
+ dest="id",
+ action="store_true",
+ help='to generate identifiers')
+
+ args = parser.parse_args(sys.argv[1:])
+
+ if args.debug:
+ logging.basicConfig(level=logging.DEBUG)
+ else:
+ logging.basicConfig(level=logging.INFO)
+
+ if args.json:
+ if args.id:
+ chosen_event = JSONIdEvent
+ else:
+ chosen_event = JSONEvent
+ else:
+ if args.id:
+ chosen_event = StringIdEvent
+ else:
+ chosen_event = StringEvent
+
+ try:
+ args.keepalive = int(args.keepalive)
+ except ValueError:
+ log.error("keepalive takes a numerical value")
+ sys.exit(1)
+
+ ###
+ try:
+ application = tornado.web.Application([
+ (r"/(.*)/(.*)", EventSourceHandler, dict(event_class=chosen_event,keepalive=args.keepalive)),
+ ])
+
+ application.listen(int(args.port))
+ tornado.ioloop.IOLoop.instance().start()
+ except ValueError:
+ log.error("The port '%d' shall be a numerical value." % (args.port,))
+ sys.exit(1)
+
+ ###
+
+if __name__ == "__main__":
+ start()
+
63 event_source/send_request.py
@@ -0,0 +1,63 @@
+import sys
+import argparse
+
+import json
+import urllib2
+
+def send_json(url, data):
+ if isinstance(data,str):
+ data = json.dumps(json.loads(data))
+ else:
+ data = json.dumps(data)
+ req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
+ f = urllib2.urlopen(req)
+ response = f.read()
+ f.close()
+
+def send_string(url, data):
+ f = urllib2.urlopen(url, data)
+ response = f.read()
+ f.close()
+
+def start():
+ parser = argparse.ArgumentParser(prog=sys.argv[0],
+ description="Generates event for Event Source Library")
+
+ parser.add_argument("token",
+ help='Token to be used for connection')
+
+ parser.add_argument("action",
+ help='Action to send')
+
+ parser.add_argument("data",
+ nargs='?',
+ default="",
+ help='Data to be sent')
+
+ parser.add_argument("-H",
+ "--host",
+ dest="host",
+ default='127.0.0.1',
+ help='Host to connect to')
+ # PORT ARGUMENT
+ parser.add_argument("-P",
+ "--port",
+ dest="port",
+ default='8888',
+ help='Port to be used connection')
+
+ parser.add_argument("-j",
+ "--json",
+ dest="json",
+ action="store_true",
+ help='Treat data as JSON')
+
+ args = parser.parse_args(sys.argv[1:])
+
+ if args.json:
+ send_json("http://%(host)s:%(port)s/%(action)s/%(token)s" % args.__dict__, args.data)
+ else:
+ send_string("http://%(host)s:%(port)s/%(action)s/%(token)s" % args.__dict__, args.data)
+
+if __name__ == "__main__":
+ start()
Please sign in to comment.
Something went wrong with that request. Please try again.