Permalink
Browse files

initial commit

  • Loading branch information...
1 parent f955672 commit 20ce1ebc955659763031a0bea6a76d4751e0af2c @Chris2048 committed Jan 7, 2013
Showing with 329 additions and 1 deletion.
  1. +4 −0 .gitignore
  2. +19 −1 README.md
  3. +96 −0 example/app.py
  4. +8 −0 example/gunsse.py
  5. +74 −0 example/static/site.js
  6. +11 −0 example/templates/home.html
  7. +117 −0 sse.py
View
4 .gitignore
@@ -33,3 +33,7 @@ nosetests.xml
.mr.developer.cfg
.project
.pydevproject
+
+# misc
+Cakefile
+*.coffee
View
20 README.md
@@ -1,4 +1,22 @@
flask-sse
=========
-flask-sse
+flask-sse
+
+A small module for sse in flask.
+
+There is a (hopefully) working test/demo in 'example/'
+
+Just run 'gunicorn -c gunsse.py app:app' from that directory,
+Then browse to 'http://localhost:8000'
+
+There should be an alternating 'PING' and 'PONG', and a
+randomly-generated graph should scroll (like on the flot homepage).
+--! The (blue) graphs don't look the same on different tabs.
+
+Browse to '/graph' and a number will be show, and should be plotted in a
+different color on the graph, keep refreshing to plot more.
+This is an example of one thread/request having an effect on the stream,
+and thus other thread/requests.
+--! The (yellow) graphs *do* look the same on different tabs.
+
View
96 example/app.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python2.7
+# -`*- coding: utf-8 -*-
+
+"""
+test for Server-Side events in flask
+
+inspiration from:
+http://www.html5rocks.com/en/tutorials/eventsource/basics/
+https://github.com/niwibe/sse.git
+https://github.com/niwibe/django-sse.git
+https://github.com/jkbr/chat
+"""
+
+from gevent import monkey
+monkey.patch_all()
+
+import flask
+app = flask.Flask(__name__)
+app.debug = True
+app.secret_key = 'asdf'
+
+import redis
+red = redis.Redis()
+
+import os
+import sys
+dir = os.path.dirname(__file__)
+sys.path.append(os.path.abspath(os.path.join(dir, '..')))
+
+from sse import PeriodicStream, RedisSseStream
+import random
+
+
+def numGen():
+ prev = 50
+ while True:
+ vals = [prev]
+ vals.append(prev + random.randrange(-10, 10))
+ vals.append(prev + random.randrange(-20, 20))
+ vals.append(prev + random.randrange(-30, 30))
+ vals.append(prev + random.randrange(-40, 40))
+ prev = vals[-1]
+ if prev > 100:
+ prev = 100
+ if prev < 0:
+ prev = 0
+ for i in vals:
+ yield i
+
+
+numgen = numGen()
+numgen2 = numGen()
+
+
+def ping(subscriber, freq):
+ "Alternate between sending 'PING' and 'PONG' messages"
+
+ if (subscriber.counter / freq) % 2 == 0:
+ subscriber.sse.add_message("PING", event='ping')
+ else:
+ subscriber.sse.add_message("PONG", event='ping')
+
+
+def foo(subscriber, freq):
+ subscriber.sse.add_message(numgen2.next(), event='graph')
+
+
+def data_points(subscriber):
+ subscriber.sse.add_message(red.get('nextval'), event='graph')
+
+
+app.add_url_rule('/stream/periodic',
+ view_func=PeriodicStream.as_view(str('PeriodicStream'),
+ {'foo': (1, foo), 'ping': (10, ping)}))
+
+app.add_url_rule('/stream/redis',
+ view_func=RedisSseStream.as_view(str('RedisSseStream'),
+ {'graphvals': data_points},
+ red.pubsub()))
+
+
+@app.route('/graph')
+def visit():
+ nextval = numgen.next()
+ red.set('nextval', nextval)
+ red.publish('stream', 'graphvals')
+ return '<!doctype html><head><body>%d</body></html>' % nextval
+
+
+@app.route('/')
+def home():
+ return flask.render_template('home.html')
+
+
+if __name__ == '__main__':
+ app.run()
View
8 example/gunsse.py
@@ -0,0 +1,8 @@
+bind = "0.0.0.0:8000"
+workers = 6
+backlog = 2048
+worker_class = "gevent"
+debug = True
+#daemon = True
+pidfile = "/tmp/gunicorn.pid"
+logfile = "/tmp/gunicorn.log"
View
74 example/static/site.js
@@ -0,0 +1,74 @@
+ jQuery( function($) {
+ var data = [];
+ function get_data(yval) {
+ while (data.length != 300) {
+ data.push(0);
+ }
+ data = data.slice(1);
+ data.push(yval);
+ // zip the generated y values with the x values
+ var res = [];
+ for (var i = 0; i < data.length; ++i) {
+ res.push([i, data[i]]); }
+ return res;
+ }
+
+ var data2 = [];
+ function get_data2(yval) {
+ while (data2.length != 300) {
+ data2.push(0);
+ }
+ data2 = data2.slice(1);
+ data2.push(yval);
+ // zip the generated y values with the x values
+ var res = [];
+ for (var i = 0; i < data2.length; ++i) {
+ res.push([i, data2[i]]); }
+ return res;
+ }
+
+ var options = {
+ series: { shadowSize: 0 },
+ lines: { show: true, fill: true },
+ yaxis: { show: false, min: 0, max: 100 },
+ xaxis: { show: false }
+ };
+
+ d1 = get_data2(0);
+ d2 = get_data(0);
+
+ var plot = $.plot( $("#gplot"),
+ [ { data: d1 }, { data: d2 } ], options
+ );
+
+
+ function update_gplot(yval) {
+ d2 = get_data(yval);
+ plot.setData([ { data: d1 }, { data: d2 } ]);
+ plot.draw();
+ }
+ function update_gplot2(yval) {
+ d1 = get_data2(yval);
+ plot.setData([ { data: d1 }, { data: d2 } ]);
+ plot.draw();
+ }
+
+ var source = new EventSource('/stream/periodic');
+ source.addEventListener(
+ 'graph', function(e) {
+ update_gplot(e.data);
+ }, false
+ );
+ source.addEventListener(
+ 'ping', function(e) {
+ $('#data').text(e.data);
+ }, false
+ );
+
+ var source2 = new EventSource('/stream/redis');
+ source2.addEventListener(
+ 'graph', function(e) {
+ update_gplot2(e.data);
+ }, false
+ );
+ } );
View
11 example/templates/home.html
@@ -0,0 +1,11 @@
+<!doctype html>
+<head>
+ <script src="//cdnjs.cloudflare.com/ajax/libs/jquery/1.8.0/jquery.min.js"></script>
+ <script src="//cdnjs.cloudflare.com/ajax/libs/flot/0.7/jquery.flot.min.js"></script>
+ <script type=text/javascript src="{{ url_for('static', filename='site.js') }}"></script>
+</head>
+<body>
+<div id="data">PING</div>
+<div id="gplot" style="width:600px;height:300px;"></div>
+</body>
+</html>
View
117 sse.py
@@ -0,0 +1,117 @@
+
+from __future__ import unicode_literals
+import time
+import sys
+from flask import Response
+from flask.views import View
+
+
+class Sse(object):
+ def __init__(self):
+ self._buffer = {}
+ self._buffer['messages'] = {}
+
+ def set_retry(self, num):
+ self._buffer['retry'] = num
+
+ def set_event_id(self, event_id):
+ self._buffer['id'] = event_id
+
+ def reset_event_id(self):
+ self.set_event_id(None)
+
+ def _parse_text(self, text, encoding='utf-8'):
+ if isinstance(text, (list, tuple, set)):
+ text = ''.join(self._parse_text(i) for i in text)
+
+ if isinstance(text, bytes):
+ text = text.decode(encoding)
+
+ return str(text) + '\n'
+
+ def add_message(self, text, event='message'):
+ """
+ Add message with eventname to the buffer.
+ """
+ event_list = self._buffer['messages'].setdefault(event, [])
+ event_list.append(self._parse_text(text))
+
+ def __str__(self):
+ if sys.version_info[0] >= 3: # Python 3
+ return self.__unicode__()
+ return self.__unicode__().encode('utf8')
+
+ def __unicode__(self):
+ return ''.join(i for i in self)
+
+ def flush(self):
+ """
+ Reset the internal buffer to initial state.
+ """
+ self._buffer.clear()
+ self._buffer['messages'] = {}
+
+ def __iter__(self):
+ if 'retry' in self._buffer:
+ yield "retry: {0}\n\n".format(self._buffer['retry'])
+
+ if 'id' in self._buffer:
+ if self._buffer['id']:
+ yield "id: {0}\n\n".format(self._buffer['id'])
+ else:
+ yield "id\n\n" # Reset event id
+
+ for eventname in self._buffer['messages']:
+ for message in self._buffer['messages'][eventname]:
+ yield "event: {0}\n".format(eventname)
+ yield "data: {0}\n".format(message)
+
+
+class SseStream(View):
+ def get_last_id(self):
+ if "HTTP_LAST_EVENT_ID" in self.request.META:
+ return self.request.META['HTTP_LAST_EVENT_ID']
+ return None
+
+ def _compose_message():
+ raise NotImplementedError
+
+ def _iterator(self):
+ while self._compose_message():
+ for line in self.sse:
+ yield line
+ self.sse.flush()
+
+ def dispatch_request(self):
+ self.sse = Sse()
+ response = Response(self._iterator(), mimetype="text/event-stream")
+ return response
+
+
+class PeriodicStream(SseStream):
+ def __init__(self, functions):
+ self.functions = functions
+ self.counter = 0
+
+ def _compose_message(self):
+ time.sleep(0.1)
+ self.counter += 1
+ if self.counter > 600:
+ self.counter = 0
+ for freq, func in self.functions.values():
+ if self.counter % freq == 0:
+ func(self, freq)
+ return True
+
+
+class RedisSseStream(SseStream):
+ def __init__(self, handlers, pubsub):
+ self.handlers = handlers
+ pubsub.subscribe('stream')
+ self._messages = pubsub.listen()
+
+ def _compose_message(self):
+ message = self._messages.next()['data']
+ if message in self.handlers.keys():
+ self.handlers[message](self)
+ return True

0 comments on commit 20ce1eb

Please sign in to comment.