Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
btubbs committed Dec 10, 2012
0 parents commit 53c41a7
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 0 deletions.
9 changes: 9 additions & 0 deletions .hgignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
syntax: glob
*pyc
*swp
env/*
._*
*egg-info*
tmp/*
.tox/*
__pycache__/*
73 changes: 73 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
=================
Python SSE Client
=================

This is a Python client library for iterating over http Server Sent Event (SSE)
streams. The SSEClient class accepts a url on init, and is then an iterator
over messages coming from the server.

Installation
------------

Use pip::

pip install sseclient

Usage
-----

::
from sseclient import SSEClient

messages = SSEClient('http://mysite.com/sse_stream/')
for msg in messages:
do_something_useful(msg)

Each message object will have a 'data' attribute, as well as optional 'event',
'id', and 'retry' attributes.

Optional init parameters:

- last_id: If provided, this parameter will be sent to the server to tell it to
return only messages more recent than this ID.

- retry: Number of milliseconds to wait after disconnects before attempting to
reconnect. The server may change this by including a 'retry' line in a
message. Retries are handled automatically by the SSEClient object.

You may also provide any additional keyword arguments supported by the
Requests_ library, such as a 'headers' dict and a (username, password) tuple
for 'auth'.

Development
-----------

Install the test dependencies::

pip install pytest mock

And run the tests::

$ py.test
================== test session starts ===================
platform linux2 -- Python 2.7.3 -- pytest-2.3.4
collected 9 items

test_sseclient.py .........

================ 9 passed in 0.31 seconds ================

There are a couple TODO items in the code for getting the implementation
completely in line with the finer points of the SSE spec.

Additional Resources
--------------------

- `HTML5Rocks Tutorial`_
- `Official SSE Spec`_

.. _Requests: http://docs.python-requests.org/en/latest/
.. _HTML5Rocks Tutorial: http://www.html5rocks.com/en/tutorials/eventsource/basics/
.. _Official SSE Spec: http://www.whatwg.org/specs/web-apps/current-work/multipage/comms.html#server-sent-events

14 changes: 14 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/usr/bin/python
from setuptools import setup

setup(
name='sseclient',
version='0.0.1',
author='Brent Tubbs',
author_email='brent.tubbs@gmail.com',
py_modules=['sseclient'],
install_requires=['requests==0.14.2'],
description=(
'Python client library for reading Server Sent Event streams.'),
long_description=open('README.rst').read(),
)
138 changes: 138 additions & 0 deletions sseclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import re
import time
import warnings

import requests


class SSEClient(object):
def __init__(self, url, last_id=None, retry=3000, **kwargs):
self.url = url
self.last_id = last_id
self.retry = retry

# Any extra kwargs will be fed into the requests.get call later.
self.requests_kwargs = kwargs

# The SSE spec requires making requests with Cache-Control: nocache
if 'headers' not in self.requests_kwargs:
self.requests_kwargs['headers'] = {}
self.requests_kwargs['headers']['Cache-Control'] = 'no-cache'

# The 'Accept' header is not required, but explicit > implicit
self.requests_kwargs['headers']['Accept'] = 'text/event-stream'

# Keep data here as it streams in
self.buf = u''

self._connect()

def _connect(self):
if self.last_id:
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id
self.resp = requests.get(self.url, prefetch=False,
**self.requests_kwargs)

# TODO: Ensure we're handling redirects. Might also stick the 'origin'
# attribute on Events like the Javascript spec requires.
self.resp.raise_for_status()

def __iter__(self):
while True:
yield self.next()

def next(self):
# TODO: additionally support CR and CRLF-style newlines.
while '\n\n' not in self.buf:
try:
nextchar = next(self.resp.iter_content(decode_unicode=True))
self.buf += nextchar
except StopIteration:
time.sleep(self.retry / 1000.0)
self._connect()

# The SSE spec only supports resuming from a whole message, so
# if we have half a message we should throw it out.
head, sep, tail = self.buf.rpartition('\n\n')
self.buf = head + sep
continue

head, sep, tail = self.buf.partition('\n\n')
self.buf = tail
msg = Event.parse(head)

# If the server requests a specific retry delay, we need to honor it.
if msg.retry:
self.retry = msg.retry

# last_id should only be set if included in the message. It's not
# forgotten if a message omits it.
if msg.id:
self.last_id = msg.id

return msg


class Event(object):

sse_line_pattern = re.compile('(?P<name>[^:]*):?( ?(?P<value>.*))?')

def __init__(self, data='', event='message', id=None, retry=None):
self.data = data
self.event = event
self.id = id
self.retry = retry

def dump(self):
lines = []
if self.id:
lines.append('id: %s' % self.id)

# Only include an event line if it's not the default already.
if self.event != 'message':
lines.append('event: %s' % self.event)

if self.retry:
lines.append('retry: %s' % self.retry)

lines.extend('data: %s' % d for d in self.data.split('\n'))
return '\n'.join(lines) + '\n\n'

@classmethod
def parse(cls, raw):
"""
Given a possibly-multiline string representing an SSE message, parse it
and return a Event object.
"""
msg = cls()
for line in raw.split('\n'):
m = cls.sse_line_pattern.match(line)
if m is None:
# Malformed line. Discard but warn.
warnings.warn('Invalid SSE line: "%s"' % line, SyntaxWarning)
continue

name = m.groupdict()['name']
value = m.groupdict()['value']
if name == '':
# line began with a ":", so is a comment. Ignore
continue

if name == 'data':
# If we already have some data, then join to it with a newline.
# Else this is it.
if msg.data:
msg.data = '%s\n%s' % (msg.data, value)
else:
msg.data = value
elif name == 'event':
msg.event = value
elif name == 'id':
msg.id = value
elif name == 'retry':
msg.retry = int(value)

return msg

def __str__(self):
return self.data
138 changes: 138 additions & 0 deletions test_sseclient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from mock import patch

import sseclient
from sseclient import Event as E


# Some tests of parsing a single event string
def test_round_trip_parse():
m1 = E(
data='hi there\nsexy developer',
event='salutation',
id='abcdefg',
retry=10000
)

dumped = m1.dump()
m2 = E.parse(dumped)
assert m1.id == m2.id
assert m1.data == m2.data
assert m1.retry == m2.retry
assert m1.event == m2.event


def test_no_colon():
m = E.parse('data')
assert m.data == ''


def test_no_space():
m = E.parse('data:hi')
assert m.data == 'hi'


def test_comment():
raw = ":this is a comment\ndata: this is some data"
m = E.parse(raw)
assert m.data == 'this is some data'


def test_retry_is_integer():
m = E.parse('data: hi\nretry: 4000')
assert m.retry == 4000


def test_default_event():
m = E.parse('data: blah')
assert m.event == 'message'


# A couple mocks for Requests
class FakeRequests(object):
def __init__(self, status_code, content):
self.status_code = status_code
self.content = content

def get(self, url, *args, **kwargs):
return FakeResponse(self.status_code, self.content)


class FakeResponse(object):
def __init__(self, status_code, content):
self.status_code = status_code
if not isinstance(content, unicode):
content = content.decode('utf8')
self.stream = content

def iter_content(self, chunk_size=1, *args, **kwargs):
try:
c = self.stream[0]
self.stream = self.stream[1:]
yield c
except IndexError:
raise StopIteration

def raise_for_status(self):
pass


def join_events(*events):
"""
Given a bunch of Event objects, dump them all to strings and join them
together.
"""
return ''.join(e.dump() for e in events)


# Tests of parsing a multi event stream
def test_last_id_remembered():
content = "data: message 1\nid: abcdef\n\ndata: message 2\n\n"
with patch('sseclient.requests', FakeRequests(200, content)):
c = sseclient.SSEClient('http://blah.com')
m1 = next(c)
m2 = next(c)

assert m1.id == u'abcdef'
assert m2.id is None
assert c.last_id == u'abcdef'


def test_retry_remembered():
content = "data: message 1\nretry: 5000\n\ndata: message 2\n\n"
with patch('sseclient.requests', FakeRequests(200, content)):
c = sseclient.SSEClient('http://blah.com')
m1 = next(c)
m2 = next(c)
assert m1.retry == 5000
assert m2.retry is None
assert c.retry == 5000


def test_multiple_messages():

content = join_events(
E(data='message 1', id='first', retry='2000', event='blah'),
E(data='message 2', id='second', retry='4000', event='blerg'),
E(data='message 3\nhas two lines', id='third'),
)

with patch('sseclient.requests', FakeRequests(200, content)):
c = sseclient.SSEClient('http://blah.com')
m1 = next(c)
m2 = next(c)
m3 = next(c)

assert m1.data == u'message 1'
assert m1.id == u'first'
assert m1.retry == 2000
assert m1.event == u'blah'

assert m2.data == u'message 2'
assert m2.id == u'second'
assert m2.retry == 4000
assert m2.event == u'blerg'

assert m3.data == u'message 3\nhas two lines'

assert c.retry == m2.retry
assert c.last_id == m3.id

0 comments on commit 53c41a7

Please sign in to comment.