Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
updating pubsub reader code; moving client code to pubsubclient folder
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Dec 23, 2010
1 parent d5089c8 commit 1a34ab1
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 131 deletions.
23 changes: 23 additions & 0 deletions pubsub/README.md
@@ -0,0 +1,23 @@
pubsub
======

pubsub (short for publish/subscribe) is a server that brokers new messages
to all connected subscribers at the time a message is received.
http://en.wikipedia.org/wiki/Publish/subscribe


API endpoints:

* /pub
parameter: body

* /sub
request parameter: multipart=(1|0). turns on/off chunked response format (on by default)
long lived connection which will stream back new messages.

* /stats
request parameter: reset=1 (resets the counters since last reset)
response: Active connections, Total connections, Messages received, Messages sent, Kicked clients.

* /clients
response: list of remote clients, their connect time, and their current outbound buffer size.
109 changes: 0 additions & 109 deletions pubsub/Subscription.py

This file was deleted.

20 changes: 13 additions & 7 deletions pubsub/pubsub.c
Expand Up @@ -30,6 +30,7 @@ TAILQ_HEAD(, cli) clients;

uint64_t totalConns = 0;
uint64_t currentConns = 0;
uint64_t kickedClients = 0;
uint64_t msgRecv = 0;
uint64_t msgSent = 0;

Expand All @@ -44,6 +45,7 @@ is_slow(struct cli *client) {
evcon = (struct evhttp_connection *)client->req->evcon;
output_buffer_length = (unsigned long)EVBUFFER_LENGTH(evcon->output_buffer);
if (output_buffer_length > MAX_PENDING_DATA) {
kickedClients+=1;
fprintf(stdout, "%llu >> kicking client with %llu pending data\n", client->connection_id, output_buffer_length);
client->kick_client = KICK_CLIENT;
// clear the clients output buffer
Expand Down Expand Up @@ -119,18 +121,22 @@ stats_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evhttp_parse_query(uri, &args);
free(uri);

sprintf(buf, "%d", totalConns);
sprintf(buf, "%llu", totalConns);
evhttp_add_header(req->output_headers, "X-PUBSUB-TOTAL-CONNECTIONS", buf);
sprintf(buf, "%d", currentConns);
sprintf(buf, "%llu", currentConns);
evhttp_add_header(req->output_headers, "X-PUBSUB-ACTIVE-CONNECTIONS", buf);
sprintf(buf, "%d", msgRecv);
sprintf(buf, "%llu", msgRecv);
evhttp_add_header(req->output_headers, "X-PUBSUB-MESSAGES-RECEIVED", buf);
sprintf(buf, "%d", msgSent);
sprintf(buf, "%llu", msgSent);
evhttp_add_header(req->output_headers, "X-PUBSUB-MESSAGES-SENT", buf);
sprintf(buf, "%llu", kickedClients);
evhttp_add_header(req->output_headers, "X-PUBSUB-KICKED-CLIENTS", buf);

evbuffer_add_printf(evb, "Active connections: %llu\nTotal connections: %llu\n"
"Messages received: %llu\nMessages sent: %llu\n",
currentConns, totalConns, msgRecv, msgSent);
evbuffer_add_printf(evb, "Active connections: %llu\n", currentConns);
evbuffer_add_printf(evb, "Total connections: %llu\n", totalConns);
evbuffer_add_printf(evb, "Messages received: %llu\n", msgRecv);
evbuffer_add_printf(evb, "Messages sent: %llu\n", msgSent);
evbuffer_add_printf(evb, "Kicked clients: %llu\n", kickedClients);
reset = (char *)evhttp_find_header(&args, "reset");

if (reset) {
Expand Down
15 changes: 0 additions & 15 deletions pubsub/test-subscriber.py

This file was deleted.

1 change: 1 addition & 0 deletions pubsubclient/pubsubclient_python_tornado/README.md
@@ -0,0 +1 @@
An example pubsub python client built upon the tornado iostream
62 changes: 62 additions & 0 deletions pubsubclient/pubsubclient_python_tornado/pubsub_reader.py
@@ -0,0 +1,62 @@
import tornado.iostream
import tornado.ioloop
import socket
import logging

class HTTPError(Exception):
def __init__(self, code, msg=None):
self.code = code
self.msg = msg
super(HTTPError, self).__init__('%s %s' % (code , msg))

class PubsubReader(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or tornado.ioloop.IOLoop.instance()
self.socket = None

def _callback(self, data):
try:
self.callback(data)
except:
logging.exception('failed in callback')
self.stream.read_until('\n', self._callback)

def callback(self, data):
raise

def close(self):
logging.info('closed')
self.io_loop.stop()

def http_get_line(self):
return "GET /sub?multipart=0 HTTP/1.0\r\n\r\n"

def open(self, host, port=80):
self.host = host
self.port = port
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
logging.info('opening socket to %s:%s' % (host, port))
self.socket.connect((host, port))
self.stream = tornado.iostream.IOStream(self.socket)
self.stream.set_close_callback(self.close)
get_line = self.http_get_line()
logging.info(get_line)
self.stream.write(get_line)
self.stream.read_until("\r\n\r\n", self.on_headers)

def on_headers(self, data):
headers = {}
lines = data.split("\r\n")
logging.info(lines)
status_line = lines[0]
if status_line.count(' ') < 2:
raise HTTPError(599, 'connect error')
status_code = status_line.split(' ', 2)[1]
if status_code != "200":
raise HTTPError(status_code)
for line in lines[1:]:
parts = line.split(":")
if len(parts) == 2:
headers[parts[0].strip()] = parts[1].strip()
self.stream.read_until('\n', self._callback)

14 changes: 14 additions & 0 deletions pubsubclient/pubsubclient_python_tornado/pubsub_reader_client.py
@@ -0,0 +1,14 @@
import logging
import tornado.options
import pubsub_reader

class MyReader(pubsub_reader.PubsubReader):
def callback(self, data):
"""handle each chunk of data from the pubsub server"""
logging.info(data)

if __name__ == "__main__":
tornado.options.parse_command_line()
reader = MyReader();
reader.open('127.0.0.1', 80)
reader.io_loop.start()
File renamed without changes.

0 comments on commit 1a34ab1

Please sign in to comment.