Permalink
Browse files

pubsubclient does not exit() to allow for clients to decide whether t…

…o reconnect or bail
  • Loading branch information...
1 parent 7f6f940 commit 95395ab08c65262b6c95274792b51fe07a9dc14c @mreiferson mreiferson committed with jehiah Aug 19, 2011
Showing with 101 additions and 46 deletions.
  1. +4 −1 ps_to_file/README.md
  2. +6 −1 ps_to_file/ps_to_file.c
  3. +4 −3 ps_to_http/README.md
  4. +6 −1 ps_to_http/ps_to_http.c
  5. +74 −39 pubsubclient/pubsubclient.c
  6. +7 −1 pubsubclient/pubsubclient.h
@@ -7,10 +7,13 @@ to time rolled files.
source pubsub should output non-multipart, chunked data where each
message is newline terminated.
-Commandline Options:
+OPTIONS
+-------
+```
--pubsub-url=<str> source pubsub url in the form of
http://domain.com:port/path
--filename-format=<str> output filename format (strftime compatible)
/var/log/pubsub.%%Y-%%m-%%d_%%H.log
--version
+```
@@ -56,6 +56,11 @@ void process_message_cb(char *message, void *cbarg)
fprintf(data->output_file, "%s\n", message);
}
+void error_cb(int status_code, void *cb_arg)
+{
+ event_loopbreak();
+}
+
int version_cb(int value)
{
fprintf(stdout, "Version: %s\n", VERSION);
@@ -87,7 +92,7 @@ int main(int argc, char **argv)
data->output_file = NULL;
if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
- pubsubclient_main(address, port, path, process_message_cb, data);
+ pubsubclient_main(address, port, path, process_message_cb, error_cb, data);
if (data->output_file) {
fclose(data->output_file);
@@ -1,5 +1,5 @@
ps_to_http
-======
+==========
helper application to subscribe to a pubsub and write incoming messages
to simplequeue(s).
@@ -9,9 +9,9 @@ supports multiple destination simplequeues via round robin.
source pubsub should output non-multipart, chunked data where each
message is newline terminated.
-Commandline Options:
-
OPTIONS
+-------
+```
--destination-get-url=<str> (multiple) url(s) to HTTP GET to
This URL must contain a %s for the message data
for a simplequeue use "http://127.0.0.1:8080/put?data=%s"
@@ -21,3 +21,4 @@ OPTIONS
--pubsub-url=<str> url of pubsub to read from
default: http://127.0.0.1:80/sub?multipart=0
--round-robin write round-robin to destination urls
+```
@@ -101,6 +101,11 @@ void process_message_cb(char *message, void *cb_arg)
}
}
+void error_cb(int status_code, void *cb_arg)
+{
+ event_loopbreak();
+}
+
int version_cb(int value)
{
fprintf(stdout, "Version: %s\n", VERSION);
@@ -167,7 +172,7 @@ int main(int argc, char **argv)
init_async_connection_pool(1);
if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
- pubsubclient_main(address, port, path, process_message_cb, NULL);
+ pubsubclient_main(address, port, path, process_message_cb, error_cb, NULL);
free(address);
free(path);
@@ -1,8 +1,8 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <simplehttp/queue.h>
-#include <simplehttp/simplehttp.h>
+#include <event.h>
+#include <evhttp.h>
#include <signal.h>
#include "pubsubclient.h"
#include "stream_request.h"
@@ -20,7 +20,8 @@ static struct evhttp_connection *evhttp_source_connection = NULL;
static struct evhttp_request *evhttp_source_request = NULL;
struct GlobalData {
- void (*cb)(char *data, void *cbarg);
+ void (*message_cb)(char *data, void *cbarg);
+ void (*error_cb)(int status_code, void *cbarg);
void *cbarg;
const char *source_address;
int source_port;
@@ -46,7 +47,7 @@ int pubsubclient_parse_one_message(struct evbuffer *evb, void *arg)
line = evbuffer_readline(evb);
if (line && (line_len = strlen(line))) {
_DEBUG("line (%p): %s (%d)\n", line, line, line_len);
- (*client_data->cb)(line, client_data->cbarg);
+ (*client_data->message_cb)(line, client_data->cbarg);
if (EVBUFFER_LENGTH(evb)) {
has_more = 1;
}
@@ -62,17 +63,30 @@ void pubsubclient_source_readcb(struct bufferevent *bev, void *arg)
void pubsubclient_errorcb(struct bufferevent *bev, void *arg)
{
+ struct GlobalData *client_data = (struct GlobalData *)arg;
+
fprintf(stderr, "ERROR: request failed\n");
- event_loopbreak();
+
+ if (client_data->error_cb) {
+ (*client_data->error_cb)(-1, client_data->cbarg);
+ }
}
void pubsubclient_source_request_done(struct evhttp_request *req, void *arg)
{
+ struct GlobalData *client_data = (struct GlobalData *)arg;
+ int status_code = -1;
+
_DEBUG("pubsubclient_source_request_done()\n");
- if (!req || (req->response_code != HTTP_OK)) {
- fprintf(stderr, "ERROR: request failed\n");
- event_loopbreak();
+ fprintf(stderr, "ERROR: request failed\n");
+
+ if (req) {
+ status_code = req->response_code;
+ }
+
+ if (client_data->error_cb) {
+ (*client_data->error_cb)(status_code, client_data->cbarg);
}
}
@@ -82,10 +96,50 @@ void pubsubclient_source_callback(struct evhttp_request *req, void *arg)
while (pubsubclient_parse_one_message(req->input_buffer, arg)) {}
}
+int pubsubclient_connect()
+{
+ fprintf(stdout, "CONNECTING TO SOURCE http://%s:%d%s\n", data->source_address, data->source_port, data->path);
+ if (chunked) {
+ if (evhttp_source_connection) {
+ evhttp_connection_free(evhttp_source_connection);
+ }
+
+ // use libevent's built in evhttp methods to parse chunked responses
+ evhttp_source_connection = evhttp_connection_new(data->source_address, data->source_port);
+ if (evhttp_source_connection == NULL) {
+ fprintf(stderr, "ERROR: evhttp_connection_new() failed for %s:%d\n", data->source_address, data->source_port);
+ return 0;
+ }
+
+ evhttp_source_request = evhttp_request_new(pubsubclient_source_request_done, data);
+ evhttp_add_header(evhttp_source_request->output_headers, "Host", data->source_address);
+ evhttp_request_set_chunked_cb(evhttp_source_request, pubsubclient_source_callback);
+
+ if (evhttp_make_request(evhttp_source_connection, evhttp_source_request, EVHTTP_REQ_GET, data->path) == -1) {
+ fprintf(stderr, "ERROR: evhttp_make_request() failed for %s\n", data->path);
+ evhttp_connection_free(evhttp_source_connection);
+ return 0;
+ }
+ } else {
+ if (stream_request) {
+ free_stream_request(stream_request);
+ }
+
+ // use our stream_request library to handle non-chunked
+ stream_request = new_stream_request("GET", data->source_address, data->source_port, data->path,
+ NULL, pubsubclient_source_readcb, pubsubclient_errorcb, data);
+ if (!stream_request) {
+ fprintf(stderr, "ERROR: new_stream_request() failed for %s:%d%s\n", data->source_address, data->source_port, data->path);
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
void pubsubclient_autodetect_headercb(struct bufferevent *bev, struct evkeyvalq *headers, void *arg)
{
const char *encoding_header = NULL;
- struct GlobalData *client_data = NULL;
_DEBUG("pubsubclient_autodetect_headercb() headers: %p\n", headers);
@@ -101,59 +155,40 @@ void pubsubclient_autodetect_headercb(struct bufferevent *bev, struct evkeyvalq
_DEBUG("chunked = %d\n", chunked);
- client_data = (struct GlobalData *)arg;
- if (chunked) {
- // use libevent's built in evhttp methods to parse chunked responses
- evhttp_source_connection = evhttp_connection_new(client_data->source_address, client_data->source_port);
- if (evhttp_source_connection == NULL) {
- fprintf(stdout, "FAILED CONNECT TO SOURCE %s:%d\n", client_data->source_address, client_data->source_port);
- exit(1);
- }
-
- evhttp_source_request = evhttp_request_new(pubsubclient_source_request_done, data);
- evhttp_add_header(evhttp_source_request->output_headers, "Host", client_data->source_address);
- evhttp_request_set_chunked_cb(evhttp_source_request, pubsubclient_source_callback);
-
- if (evhttp_make_request(evhttp_source_connection, evhttp_source_request, EVHTTP_REQ_GET, client_data->path) == -1) {
- fprintf(stdout, "FAILED make_request to source\n");
- evhttp_connection_free(evhttp_source_connection);
- exit(1);
- }
- } else {
- // use our stream_request library to handle non-chunked
- stream_request = new_stream_request("GET", client_data->source_address, client_data->source_port, client_data->path,
- NULL, pubsubclient_source_readcb, pubsubclient_errorcb, arg);
- if (!stream_request) {
- fprintf(stdout, "FAILED CONNECT TO SOURCE %s:%d\n", client_data->source_address, client_data->source_port);
- exit(1);
- }
+ if (!pubsubclient_connect(arg)) {
+ event_loopbreak();
}
}
-int pubsubclient_main(const char *source_address, int source_port, const char *path, void (*cb)(char *data, void *arg), void *cbarg)
+int pubsubclient_main(const char *source_address, int source_port, const char *path,
+ void (*message_cb)(char *data, void *arg),
+ void (*error_cb)(int status_code, void *arg),
+ void *cbarg)
{
struct StreamRequest *autodetect_sr = NULL;
signal(SIGINT, pubsubclient_termination_handler);
signal(SIGQUIT, pubsubclient_termination_handler);
signal(SIGTERM, pubsubclient_termination_handler);
+ signal(SIGHUP, pubsubclient_termination_handler);
event_init();
data = calloc(1, sizeof(struct GlobalData));
- data->cb = cb;
+ data->message_cb = message_cb;
+ data->error_cb = error_cb;
data->cbarg = cbarg;
data->source_address = source_address;
data->source_port = source_port;
data->path = path;
// perform a request for headers so we can autodetect whether or not we're
// getting a chunked response
- fprintf(stdout, "CONNECTING TO http://%s:%d%s\n", source_address, source_port, path);
+ fprintf(stdout, "AUTODETECTING ENCODING FOR http://%s:%d%s\n", source_address, source_port, path);
autodetect_sr = new_stream_request("HEAD", source_address, source_port, path,
pubsubclient_autodetect_headercb, NULL, pubsubclient_errorcb, data);
if (!autodetect_sr) {
- fprintf(stdout, "FAILED CONNECT TO SOURCE %s:%d\n", source_address, source_port);
+ fprintf(stderr, "ERROR: new_stream_request() failed for %s:%d%s\n", source_address, source_port, path);
exit(1);
}
@@ -1,9 +1,15 @@
#ifndef __pubsubclient_h
#define __pubsubclient_h
+#include <event.h>
+
struct StreamRequest;
-int pubsubclient_main(const char *source_address, int source_port, const char *path, void (*cb)(char *data, void *arg), void *cbarg);
+int pubsubclient_main(const char *source_address, int source_port, const char *path,
+ void (*message_cb)(char *data, void *arg),
+ void (*error_cb)(int status_code, void *arg),
+ void *cbarg);
+int pubsubclient_connect();
struct StreamRequest *new_stream_request(const char *method, const char *source_address, int source_port, const char *path,
void (*header_cb)(struct bufferevent *bev, struct evkeyvalq *headers, void *arg),

0 comments on commit 95395ab

Please sign in to comment.