Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
cleanup pubsubclient / introduce ps_to_sq:
Browse files Browse the repository at this point in the history
 * compatibility with non-multipart, chunked, newline terminated source messages
 * refactor async callback usage without group
 * rewrite/move parse_url into simplehttp
 * add simplehttp_encode_uri which correctly handles spaces
 * removed pubsub_to_pubsub (no longer used)
 * removed pubsub2simplequeue.sh (old, deprecated)
 * renamed sub.py to something meaningful
 * update ps_to_file for compatibility with updated pubsubclient
 * updated READMEs
  • Loading branch information
mreiferson committed Jul 13, 2011
1 parent ccc3517 commit de2a5a9
Show file tree
Hide file tree
Showing 21 changed files with 472 additions and 434 deletions.
18 changes: 10 additions & 8 deletions README.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ simplehttp is a library built upon libevent that makes high performance http bas


The following daemons are built on simplehttp and included The following daemons are built on simplehttp and included


* pubsub - a daemon that receives data via http POST events and writes that data to all currently connected long-lived http connections * `pubsub` - a daemon that receives data via HTTP POST events and writes to all subscribed long-lived HTTP connections
* pubsub_to_pubsub - a library for piping data from one pubsub stream to another pubsub server * `pubsubclient` - a library for writing clients that read from a pubsub
* simplequeue - an in memory queue with HTTP /get and /post endpoints to push/pop data * `ps_to_sq` - a daemon built on top of pubsubclient to write messages from a source pubsub to destination simplequeue(s)
* simpletokyo - a HTTP /get /post /del /fwmatch /incr interface in front of ttserver * `ps_to_file` - a daemon built on top of pubsubclient to write messages from a source pubsub to time rolled output files
* sortdb - Sorted database server * `simplequeue` - an in memory queue with HTTP /put and /get endpoints to push and pop data
* simplegeo * `simpletokyo` - a HTTP CRUD interface to front tokyo cabinet's ttserver
* simplememdb - an in-memory version of simpletokyo * `sortdb` - sorted database server
* qrencode * `simplegeo`
* `simplememdb` - an in-memory version of simpletokyo
* `qrencode`


simplehttp Install Instructions simplehttp Install Instructions
=============================== ===============================
Expand Down
8 changes: 4 additions & 4 deletions ps_to_file/Makefile
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ TARGET ?= /usr/local
LIBSIMPLEHTTP ?= ../simplehttp LIBSIMPLEHTTP ?= ../simplehttp
LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/.. LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/..
LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP) LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP)
LIBPUBSUBCLIENT ?= ../pubsubclient


CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -g CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -g
LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBEVENT)/lib -levent -lpubsubclient -lsimplehttp -lm LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBPUBSUBCLIENT) -L$(LIBEVENT)/lib -levent -lpubsubclient -lsimplehttp -lm


all: ps_to_file all: ps_to_file


ps_to_file: ps_to_file.c ps_to_file: ps_to_file.c
$(CC) $(CFLAGS) -o $@ $< $(LIBS) $(CC) $(CFLAGS) -o $@ $^ $(LIBS)


install: install:
/usr/bin/install -d $(TARGET)/bin/ /usr/bin/install -D ps_to_file $(TARGET)/bin/ps_to_file
/usr/bin/install ps_to_file $(TARGET)/bin/


clean: clean:
rm -f *.o *.a ps_to_file rm -f *.o *.a ps_to_file
16 changes: 16 additions & 0 deletions ps_to_file/README.md
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,16 @@
ps_to_file
======

helper application to subscribe to a pubsub and write incoming messages
to time rolled files.

source pubsub should output non-multipart, chunked data where each
message is newline terminated.

Commandline Options:

--pubsub-url=<str> source pubsub url in the form of
http://domain.com:port/path
--filename-format=<str> output filename format (strftime compatible)
/var/log/pubsub.%%Y-%%m-%%d_%%H.log
--version
86 changes: 57 additions & 29 deletions ps_to_file/ps_to_file.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <time.h> #include <time.h>
#include <simplehttp/pubsubclient.h>
#include <simplehttp/simplehttp.h> #include <simplehttp/simplehttp.h>
#include <pubsubclient/pubsubclient.h>


#define DEBUG 0 #ifdef DEBUG
#define VERSION "1.1" #define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
#else
#define _DEBUG(...) do {;} while (0)
#endif

#define VERSION "1.2"


struct output_metadata { struct output_metadata {
char *filename_format; char *filename_format;
Expand All @@ -17,62 +22,85 @@ struct output_metadata {
FILE *output_file; FILE *output_file;
}; };


void void process_message_cb(char *message, void *cbarg)
process_message_cb(char *source, void *cbarg){ {
if(DEBUG) fprintf(stdout, "processing message\n"); struct output_metadata *data;
if (source == NULL || strlen(source) < 3){return;} time_t timer;

struct tm *time_struct;
struct output_metadata *data = (struct output_metadata *)cbarg;
_DEBUG("process_message_cb()\n");

if (message == NULL || strlen(message) < 3) {
return;
}

data = (struct output_metadata *)cbarg;


time_t timer = time(NULL); timer = time(NULL);
struct tm *time_struct = gmtime(&timer); time_struct = gmtime(&timer);
if (DEBUG) fprintf(stdout, "strftime format %s\n", data->filename_format); _DEBUG("strftime format %s\n", data->filename_format);
strftime(data->temp_filename, 255, data->filename_format, time_struct); strftime(data->temp_filename, 255, data->filename_format, time_struct);
if (DEBUG) fprintf(stdout, "after strftime %s\n", data->temp_filename); _DEBUG("after strftime %s\n", data->temp_filename);
if (strcmp(data->temp_filename, data->current_filename) != 0){ if (strcmp(data->temp_filename, data->current_filename) != 0) {
if (DEBUG) fprintf(stdout, "rolling file\n"); _DEBUG("rolling file\n");
// roll file or open file // roll file or open file
if (data->output_file){ if (data->output_file) {
if(DEBUG) fprintf(stdout, "closing file %s\n", data->current_filename); _DEBUG("closing file %s\n", data->current_filename);
fclose(data->output_file); fclose(data->output_file);
} }
if (DEBUG) fprintf(stdout, "opening file %s\n", data->temp_filename); _DEBUG("opening file %s\n", data->temp_filename);
strcpy(data->current_filename, data->temp_filename); strcpy(data->current_filename, data->temp_filename);
data->output_file = fopen(data->current_filename, "ab"); data->output_file = fopen(data->current_filename, "ab");
} }


fprintf(data->output_file,"%s\n",source); fprintf(data->output_file, "%s\n", message);
} }


int version_cb(int value) { int version_cb(int value)
{
fprintf(stdout, "Version: %s\n", VERSION); fprintf(stdout, "Version: %s\n", VERSION);
return 0; return 0;
} }


int int main(int argc, char **argv)
main(int argc, char **argv)
{ {
char *source_address = "127.0.0.1"; char *pubsub_url;
int source_port = 80; char *address;
int port;
char *path;
char *filename_format = NULL; char *filename_format = NULL;
struct output_metadata *data;


define_simplehttp_options(); define_simplehttp_options();
option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION); option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
option_define_str("source_host", OPT_OPTIONAL, "127.0.0.1", &source_address, NULL, NULL); option_define_str("pubsub_url", OPT_REQUIRED, "http://127.0.0.1:80/sub?multipart=0", &pubsub_url, NULL, "url of pubsub to read from");
option_define_int("source_port", OPT_OPTIONAL, 80, &source_port, NULL, NULL);
option_define_str("filename_format", OPT_REQUIRED, NULL, &filename_format, NULL, "/var/log/pubsub.%%Y-%%m-%%d_%%H.log"); option_define_str("filename_format", OPT_REQUIRED, NULL, &filename_format, NULL, "/var/log/pubsub.%%Y-%%m-%%d_%%H.log");


if (!option_parse_command_line(argc, argv)){ if (!option_parse_command_line(argc, argv)){
return 1; return 1;
} }


struct output_metadata *data; data = calloc(1, sizeof(struct output_metadata));
data = calloc(1,sizeof(*data));
data->filename_format = filename_format; data->filename_format = filename_format;
data->current_filename[0] = '\0'; data->current_filename[0] = '\0';
data->temp_filename[0] = '\0'; data->temp_filename[0] = '\0';
data->output_file = NULL; data->output_file = NULL;


return pubsub_to_pubsub_main(source_address, source_port, process_message_cb, data); if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
pubsub_to_pubsub_main(address, port, path, process_message_cb, NULL);

if (data->output_file) {
fclose(data->output_file);
}

free(address);
free(path);
} else {
fprintf(stderr, "ERROR: failed to parse pubsub_url\n");
}

free(data);
free_options();


return 0;
} }
20 changes: 20 additions & 0 deletions ps_to_sq/Makefile
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,20 @@
LIBEVENT ?= /usr/local
TARGET ?= /usr/local
LIBSIMPLEHTTP ?= ../simplehttp
LIBSIMPLEHTTP_INC ?= $(LIBSIMPLEHTTP)/..
LIBSIMPLEHTTP_LIB ?= $(LIBSIMPLEHTTP)
LIBPUBSUBCLIENT ?= ../pubsubclient

CFLAGS = -I. -I$(LIBSIMPLEHTTP_INC) -I$(LIBEVENT)/include -g -Wall -O2
LIBS = -L. -L$(LIBSIMPLEHTTP_LIB) -L$(LIBPUBSUBCLIENT) -L$(LIBEVENT)/lib -levent -lpubsubclient -lsimplehttp -lm

all: ps_to_sq

ps_to_sq: ps_to_sq.c
$(CC) $(CFLAGS) -o $@ $^ $(LIBS)

install:
/usr/bin/install -D ps_to_sq $(TARGET)/bin/ps_to_sq

clean:
rm -rf *.o ps_to_sq *.dSYM
18 changes: 18 additions & 0 deletions ps_to_sq/README.md
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,18 @@
ps_to_sq
======

helper application to subscribe to a pubsub and write incoming messages
to simplequeue(s).

supports multiple destination simplequeues via round robin.

source pubsub should output non-multipart, chunked data where each
message is newline terminated.

Commandline Options:

--pubsub-url=<str> source pubsub url in the form of
http://domain.com:port/path
--simplequeue-url=<str> destination simplequeue url in the form of
http://domain.com:port/ (multiple)
--version
147 changes: 147 additions & 0 deletions ps_to_sq/ps_to_sq.c
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,147 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <string.h>
#include <time.h>
#include <simplehttp/simplehttp.h>
#include <pubsubclient/pubsubclient.h>
#include <simplehttp/utlist.h>

#ifdef DEBUG
#define _DEBUG(...) fprintf(stdout, __VA_ARGS__)
#else
#define _DEBUG(...) do {;} while (0)
#endif

#define VERSION "0.2"

struct simplequeue_destination {
char *address;
int port;
char *path;
struct simplequeue_destination *next;
};

struct simplequeue_destination *sqs = NULL;
struct simplequeue_destination *cur_sq_dest = NULL;

struct simplequeue_destination *new_simplequeue_destination(char *url)
{
struct simplequeue_destination *sq_dest;
char *address;
int port;
char *path;

sq_dest = malloc(sizeof(struct simplequeue_destination));
simplehttp_parse_url(url, strlen(url), &address, &port, &path);
_DEBUG("url: %s\n", url);
_DEBUG("address: %s\n", address);
_DEBUG("port: %d\n", port);
_DEBUG("path: %s\n", path);
free(path);
sq_dest->address = address;
sq_dest->port = port;
sq_dest->path = strdup("/put?data=");
sq_dest->next = NULL;

return sq_dest;
}

void free_simplequeue_destination(struct simplequeue_destination *sq_dest)
{
if (sq_dest) {
free(sq_dest->address);
free(sq_dest->path);
free(sq_dest);
}
}

void finish_simplequeue_put_cb(struct evhttp_request *req, void *cb_arg)
{
_DEBUG("finish_simplequeue_put_cb()\n");
}

void process_message_cb(char *message, void *cb_arg)
{
char *path;
char *encoded_message;

_DEBUG("process_message_cb()\n");

if (message == NULL || strlen(message) < 3) {
return;
}

if (cur_sq_dest && cur_sq_dest->next) {
cur_sq_dest = cur_sq_dest->next;
} else {
cur_sq_dest = sqs;
}

encoded_message = simplehttp_encode_uri(message);
path = malloc(10 + strlen(encoded_message) + 1); // /put?data= + encoded_message + NULL
strcpy(path, "/put?data=");
strcpy(path + 10, encoded_message);
new_async_request(cur_sq_dest->address, cur_sq_dest->port, path, finish_simplequeue_put_cb, NULL);
free(encoded_message);
free(path);
}

int version_cb(int value)
{
fprintf(stdout, "Version: %s\n", VERSION);
return 0;
}

int simplequeue_url_cb(char *value)
{
struct simplequeue_destination *sq_dest;

sq_dest = new_simplequeue_destination(value);
LL_APPEND(sqs, sq_dest);

return 1;
}

void free_simplequeue_destinations()
{
struct simplequeue_destination *sq_dest, *tmp;

LL_FOREACH_SAFE(sqs, sq_dest, tmp) {
LL_DELETE(sqs, sq_dest);
free_simplequeue_destination(sq_dest);
}
}

int main(int argc, char **argv)
{
char *pubsub_url;
char *address;
int port;
char *path;

option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
option_define_str("pubsub_url", OPT_REQUIRED, "http://127.0.0.1:80/sub?multipart=0", &pubsub_url, NULL, "url of pubsub to read from");
option_define_str("simplequeue_url", OPT_REQUIRED, NULL, NULL, simplequeue_url_cb, "(multiple) url(s) of simplequeue(s) to write to");

if (!option_parse_command_line(argc, argv)) {
return 1;
}

init_async_connection_pool(1);

if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
pubsub_to_pubsub_main(address, port, path, process_message_cb, NULL);

free(address);
free(path);
} else {
fprintf(stderr, "ERROR: failed to parse pubsub_url\n");
}

free_simplequeue_destinations();
free_async_connection_pool();
free_options();

return 0;
}
1 change: 0 additions & 1 deletion pubsub/README.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -59,4 +59,3 @@ and turn off buffering. Note: this has been tested with Nginx 0.7 series.
proxy_next_upstream off; proxy_next_upstream off;
charset utf-8; charset utf-8;
} }

Loading

0 comments on commit de2a5a9

Please sign in to comment.