Permalink
Browse files

pubsub server speaking websocket's

  • Loading branch information...
1 parent cb28fe6 commit 6db11578331175f1caeaaa26bdaa3854755f023a @jehiah jehiah committed May 14, 2010
Showing with 59 additions and 4 deletions.
  1. +59 −4 pubsub/pubsub.c
View
@@ -7,8 +7,11 @@
#define BUFSZ 1024
#define BOUNDARY "xXPubSubXx"
+int ps_debug = 0;
+
typedef struct cli {
int multipart;
+ int websocket;
struct evbuffer *buf;
struct evhttp_request *req;
TAILQ_ENTRY(cli) entries;
@@ -100,19 +103,29 @@ void pub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
msgRecv++;
totalConns++;
-
+
TAILQ_FOREACH(client, &clients, entries) {
msgSent++;
evbuffer_drain(client->buf, EVBUFFER_LENGTH(client->buf));
- if (client->multipart) {
+ if (client->websocket) {
+ // set to non-chunked so that send_reply_chunked doesn't add \r\n before/after this block
+ client->req->chunked = 0;
+ // write the frame. a websocket frame is \x00 + msg + \xFF
+ evbuffer_add(client->buf, "\0", 1);
+ evbuffer_add(client->buf, req->input_buffer->buffer, EVBUFFER_LENGTH(req->input_buffer));
+ evbuffer_add(client->buf, "\xFF", 1);
+ }
+ else if (client->multipart) {
+ /* chunked */
evbuffer_add_printf(client->buf,
"content-type: %s\r\ncontent-length: %d\r\n\r\n",
"*/*",
(int)EVBUFFER_LENGTH(req->input_buffer));
evbuffer_add(client->buf, req->input_buffer->buffer, EVBUFFER_LENGTH(req->input_buffer));
evbuffer_add_printf(client->buf, "\r\n--%s\r\n", BOUNDARY);
} else {
+ /* new line terminated */
evbuffer_add(client->buf, req->input_buffer->buffer, EVBUFFER_LENGTH(req->input_buffer));
evbuffer_add_printf(client->buf, "\n");
}
@@ -129,6 +142,10 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
struct cli *client;
struct evkeyvalq args;
char *uri;
+ char *ws_origin;
+ char *ws_upgrade;
+ char *host;
+ char buf[248];
currentConns++;
totalConns++;
@@ -139,7 +156,41 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
argtoi(&args, "multipart", &client->multipart, 1);
client->req = req;
client->buf = evbuffer_new();
- if (client->multipart) {
+
+ // Connection: Upgrade
+ // Upgrade: WebSocket
+ ws_upgrade = (char *) evhttp_find_header(req->input_headers, "Upgrade");
+ ws_origin = (char *) evhttp_find_header(req->input_headers, "Origin");
+ host = (char *) evhttp_find_header(req->input_headers, "Host");
+
+ if (ps_debug && ws_upgrade) {
+ fprintf(stderr, "upgrade header is %s\n", ws_upgrade);
+ fprintf(stderr, "multipart is %d\n", client->multipart);
+ }
+
+ if (ws_upgrade && strstr(ws_upgrade, "WebSocket") != NULL) {
+ if (ps_debug) {
+ fprintf(stderr, "upgrading connection to a websocket\n");
+ }
+ client->websocket = 1;
+ client->req->major = 1;
+ client->req->minor = 1;
+ evhttp_add_header(client->req->output_headers, "Upgrade", "WebSocket");
+ evhttp_add_header(client->req->output_headers, "Connection", "Upgrade");
+ evhttp_add_header(client->req->output_headers, "Server", "simplehttp/pubsub");
+ if (ws_origin) {
+ evhttp_add_header(client->req->output_headers, "WebSocket-Origin", ws_origin);
+ }
+ if (host) {
+ sprintf(buf, "ws://%s%s", host, req->uri);
+ if (ps_debug) {
+ fprintf(stderr, "setting WebSocket-Location to %s\n", buf);
+ }
+ evhttp_add_header(client->req->output_headers, "WebSocket-Location", buf);
+ }
+ // evbuffer_add_printf(client->buf, "\r\n");
+ }
+ else if (client->multipart) {
evhttp_add_header(client->req->output_headers, "content-type",
"multipart/x-mixed-replace; boundary=" BOUNDARY);
evbuffer_add_printf(client->buf, "--%s\r\n", BOUNDARY);
@@ -148,7 +199,11 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
"application/json");
evbuffer_add_printf(client->buf, "\r\n");
}
- evhttp_send_reply_start(client->req, HTTP_OK, "OK");
+ if (client->websocket) {
+ evhttp_send_reply_start(client->req, 101, "Web Socket Protocol Handshake");
+ } else {
+ evhttp_send_reply_start(client->req, HTTP_OK, "OK");
+ }
evhttp_send_reply_chunk(client->req, client->buf);
TAILQ_INSERT_TAIL(&clients, client, entries);
evhttp_connection_set_closecb(req->evcon, on_close, (void *)client);

0 comments on commit 6db1157

Please sign in to comment.