Permalink
Browse files

Added websockets to pubsub & proper multi-messaging & extra pubsub fi…

…ltering

Websockets:
Now clients connecting through websockets can expect:

- Handshaking (we like to be friendly!)
- Proper (text) data-framing
- Data fragmentation to send long messages (in 64bite chuncks)

(as given by RFC 6455) This should be useful for getting data to
visualizations and client scripts.... if only we could get the
filtered_pubsub to also do the same! </foreshadowing>

Multi-message:
Now outgoing messages from pubsub are newline deliniated.  As such,
every message coming out of the pubsub gets it's own send.

Added websockets to pubsub_filtered:

Added a regex global filter to pubsub_filterable and cleaned up some code.

Now pubsub_filterable takes in a --expect-value-regex parameter that extends
the --expect-value parameter but with regex.  Also, I fixed a bug where the
--expect-value and --expect-key were not being tested for.
  • Loading branch information...
1 parent 46a346d commit 4d4c1ba18f07e369edd863a4f5c8d47a96a55940 @mynameisfiber mynameisfiber committed Jan 24, 2012
Showing with 293 additions and 75 deletions.
  1. +1 −1 pubsub/Makefile
  2. +120 −40 pubsub/pubsub.c
  3. +2 −2 pubsub_filtered/Makefile
  4. +170 −32 pubsub_filtered/pubsub_filtered.c
View
@@ -5,7 +5,7 @@ LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/..
LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP)
CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -O2 -g
-LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBEVENT)/lib -levent -lsimplehttp -lpcre -lm
+LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBEVENT)/lib -levent -lsimplehttp -lpcre -lm -lcrypto
pubsub: pubsub.c
$(CC) $(CFLAGS) -o $@ $< $(LIBS)
View
@@ -6,9 +6,13 @@
#include <simplehttp/simplehttp.h>
#include "http-internal.h"
+#include <openssl/sha.h>
+#include <openssl/evp.h>
+#include <openssl/buffer.h>
+
#define BOUNDARY "xXPubSubXx"
#define MAX_PENDING_DATA 1024*1024*50
-#define VERSION "1.1"
+#define VERSION "1.2"
int ps_debug = 0;
@@ -35,6 +39,27 @@ uint64_t kickedClients = 0;
uint64_t msgRecv = 0;
uint64_t msgSent = 0;
+char *base64(const unsigned char *input, int length)
+{
+ BIO *bmem, *b64;
+ BUF_MEM *bptr;
+
+ b64 = BIO_new(BIO_f_base64());
+ bmem = BIO_new(BIO_s_mem());
+ b64 = BIO_push(b64, bmem);
+ BIO_write(b64, input, length);
+ BIO_flush(b64);
+ BIO_get_mem_ptr(b64, &bptr);
+
+ char *buff = (char *)malloc(bptr->length);
+ memcpy(buff, bptr->data, bptr->length - 1);
+ buff[bptr->length - 1] = 0;
+
+ BIO_free_all(b64);
+
+ return buff;
+}
+
int is_slow(struct cli *client)
{
if (client->kick_client == KICK_CLIENT) {
@@ -164,47 +189,83 @@ void on_close(struct evhttp_connection *evcon, void *ctx)
void pub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
{
- int i = 0;
+ int i = 0, j = 0;
struct cli *client;
-
- msgRecv++;
- totalConns++;
-
- TAILQ_FOREACH(client, &clients, entries) {
- msgSent++;
- evbuffer_drain(client->buf, EVBUFFER_LENGTH(client->buf));
- if (is_slow(client)) {
- if (can_kick(client)) {
- evhttp_connection_free(client->req->evcon);
- continue;
+ struct evkeyvalq args;
+ int message_length = 0;
+ int message_offset = 0;
+ int num_messages = 0;
+ char *current_message;
+
+ evhttp_parse_query(req->uri, &args);
+
+ for (j=0; j<=EVBUFFER_LENGTH(req->input_buffer); j++)
+ {
+ if (j == EVBUFFER_LENGTH(req->input_buffer) || *(EVBUFFER_DATA(req->input_buffer) + j) == '\n')
+ {
+ message_length = j - message_offset ;
+ current_message = EVBUFFER_DATA(req->input_buffer) + message_offset;
+
+ msgRecv++;
+ totalConns++;
+
+ i = 0;
+ TAILQ_FOREACH(client, &clients, entries) {
+ msgSent++;
+ evbuffer_drain(client->buf, EVBUFFER_LENGTH(client->buf));
+ if (is_slow(client)) {
+ if (can_kick(client)) {
+ evhttp_connection_free(client->req->evcon);
+ continue;
+ }
+ continue;
+ }
+ if (client->websocket) {
+ // set to non-chunked so that send_reply_chunked doesn't add \r\n before/after this block
+ client->req->chunked = 0;
+ int ws_m = 0;
+ int ws_message_length = message_length;
+ int ws_frame_size = 64; // Size for data fragmentation for websocket
+ while (ws_m < ws_message_length) {
+ int ws_cur_size = (ws_message_length - ws_m > ws_frame_size ? ws_frame_size : ws_message_length - ws_m);
+
+ int ws_code = 0;
+ if (ws_m == 0) {
+ ws_code += 0x01;
+ }
+ if (ws_m + ws_cur_size >= ws_message_length) {
+ ws_code += 0x80;
+ }
+ evbuffer_add_printf(client->buf, "%c", ws_code);
+
+ evbuffer_add_printf(client->buf, "%c", ws_cur_size);
+ evbuffer_add(client->buf, current_message + ws_m, ws_cur_size);
+ ws_m += ws_cur_size;
+ }
+
+ } else if (client->multipart) {
+ /* chunked */
+ evbuffer_add_printf(client->buf,
+ "content-type: %s\r\ncontent-length: %d\r\n\r\n",
+ "*/*",
+ (int)message_length);
+ evbuffer_add(client->buf, current_message, message_length);
+ evbuffer_add_printf(client->buf, "\r\n--%s\r\n", BOUNDARY);
+ } else {
+ /* new line terminated */
+ evbuffer_add(client->buf, current_message, message_length);
+ evbuffer_add_printf(client->buf, "\n");
+ }
+ evhttp_send_reply_chunk(client->req, client->buf);
+ i++;
}
- continue;
- }
- if (client->websocket) {
- // set to non-chunked so that send_reply_chunked doesn't add \r\n before/after this block
- client->req->chunked = 0;
- // write the frame. a websocket frame is \x00 + msg + \xFF
- evbuffer_add(client->buf, "\0", 1);
- evbuffer_add(client->buf, EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer));
- evbuffer_add(client->buf, "\xFF", 1);
- } else if (client->multipart) {
- /* chunked */
- evbuffer_add_printf(client->buf,
- "content-type: %s\r\ncontent-length: %d\r\n\r\n",
- "*/*",
- (int)EVBUFFER_LENGTH(req->input_buffer));
- evbuffer_add(client->buf, EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer));
- evbuffer_add_printf(client->buf, "\r\n--%s\r\n", BOUNDARY);
- } else {
- /* new line terminated */
- evbuffer_add(client->buf, EVBUFFER_DATA(req->input_buffer), EVBUFFER_LENGTH(req->input_buffer));
- evbuffer_add_printf(client->buf, "\n");
+
+ message_offset = j + 1;
+ num_messages ++;
}
- evhttp_send_reply_chunk(client->req, client->buf);
- i++;
}
- evbuffer_add_printf(evb, "Published to %d clients.\n", i);
+ evbuffer_add_printf(evb, "Published %d messages to %d clients.\n", num_messages, i);
evhttp_send_reply(req, HTTP_OK, "OK", evb);
}
@@ -215,6 +276,8 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
char *uri;
char *ws_origin;
char *ws_upgrade;
+ char *ws_key;
+ char *ws_response;
char *host;
char buf[248];
struct tm *time_struct;
@@ -240,17 +303,20 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
// Upgrade: WebSocket
ws_upgrade = (char *) evhttp_find_header(req->input_headers, "Upgrade");
ws_origin = (char *) evhttp_find_header(req->input_headers, "Origin");
+ ws_key = (char *) evhttp_find_header(req->input_headers, "Sec-WebSocket-Key");
host = (char *) evhttp_find_header(req->input_headers, "Host");
if (ps_debug && ws_upgrade) {
fprintf(stderr, "%llu >> upgrade header is %s\n", client->connection_id, ws_upgrade);
fprintf(stderr, "%llu >> multipart is %d\n", client->connection_id, client->multipart);
}
- if (ws_upgrade && strstr(ws_upgrade, "WebSocket") != NULL) {
+ if (ws_upgrade && strcasestr(ws_upgrade, "WebSocket")) {
if (ps_debug) {
fprintf(stderr, "%llu >> upgrading connection to a websocket\n", client->connection_id);
}
+ client->req->chunked = 0;
+ client->multipart = 0;
client->websocket = 1;
client->req->major = 1;
client->req->minor = 1;
@@ -267,6 +333,17 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
}
evhttp_add_header(client->req->output_headers, "WebSocket-Location", buf);
}
+ if (ws_key != NULL) {
+ char ws_response_tmp[SHA_DIGEST_LENGTH];
+ char ws_buf[128];
+
+ sprintf(ws_buf, "%s258EAFA5-E914-47DA-95CA-C5AB0DC85B11", ws_key);
+ SHA1(ws_buf, strlen(ws_buf), ws_response_tmp);
+
+ ws_response = base64(ws_response_tmp, SHA_DIGEST_LENGTH);
+ evhttp_add_header(client->req->output_headers, "Sec-WebSocket-Accept", ws_response);
+ }
+
// evbuffer_add_printf(client->buf, "\r\n");
} else if (client->multipart) {
evhttp_add_header(client->req->output_headers, "content-type",
@@ -278,11 +355,14 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evbuffer_add_printf(client->buf, "\r\n");
}
if (client->websocket) {
- evhttp_send_reply_start(client->req, 101, "Web Socket Protocol Handshake");
+ evhttp_send_reply_start(client->req, 101, "Switching Protocols");
} else {
evhttp_send_reply_start(client->req, HTTP_OK, "OK");
}
- evhttp_send_reply_chunk(client->req, client->buf);
+ if (!client->websocket) {
+ evhttp_send_reply_chunk(client->req, client->buf);
+ }
+
TAILQ_INSERT_TAIL(&clients, client, entries);
evhttp_connection_set_closecb(req->evcon, on_close, (void *)client);
evhttp_clear_headers(&args);
@@ -2,8 +2,8 @@ LIBEVENT ?= /usr/local
TARGET ?= /usr/local
LIBSIMPLEHTTP ?= /usr/local
-CFLAGS = -I. -I$(LIBSIMPLEHTTP)/include -I.. -I$(LIBEVENT)/include -g
-LIBS = -L. -L$(LIBSIMPLEHTTP)/lib -L../simplehttp -L$(LIBEVENT)/lib -levent -lsimplehttp -ljson -lpcre -lm -lpubsubclient
+CFLAGS = -I. -I$(LIBSIMPLEHTTP)/include -I.. -I$(LIBEVENT)/include -g
+LIBS = -L. -L$(LIBSIMPLEHTTP)/lib -L../simplehttp -L$(LIBEVENT)/lib -levent -lsimplehttp -ljson -lpcre -lm -lpubsubclient -lcrypto
pubsub_filtered: pubsub_filtered.c md5.c
$(CC) $(CFLAGS) -o $@ md5.c $< $(LIBS)
Oops, something went wrong.

0 comments on commit 4d4c1ba

Please sign in to comment.