Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge pull request #71 from mynameisfiber/master
Browse files Browse the repository at this point in the history
Changed ps_to_http to select random source
  • Loading branch information
mreiferson committed Aug 12, 2012
2 parents 708d167 + d23251f commit 8d69112
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 32 deletions.
2 changes: 2 additions & 0 deletions ps_to_http/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.sw[po]
ps_to_http
2 changes: 1 addition & 1 deletion ps_to_http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ OPTIONS
--destination-post-url=<str> (multiple) url(s) to HTTP POST to
For a pubsub endpoint use "http://127.0.0.1:8080/pub"
--help list usage
--pubsub-url=<str> url of pubsub to read from
--pubsub-url=<str> (multiple) pubsub url(s) to read from
default: http://127.0.0.1:80/sub?multipart=0
--round-robin write round-robin to destination urls
--max-silence Maximum amount of time (in seconds) between messages from
Expand Down
91 changes: 60 additions & 31 deletions ps_to_http/ps_to_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#define _DEBUG(...) do {;} while (0)
#endif

#define VERSION "0.5.3"
#define VERSION "0.5.4"

struct destination_url {
char *address;
Expand All @@ -23,6 +23,15 @@ struct destination_url {
struct destination_url *next;
};

struct pubsub_url {
char *address;
int id;
struct pubsub_url *next;
};

struct pubsub_url *pubsub_urls = NULL;
int num_pubsub_urls = 0;

struct destination_url *destinations = NULL;
struct destination_url *current_destination = NULL;
int round_robin = 0;
Expand All @@ -37,8 +46,12 @@ struct destination_url *new_destination_url(char *url)
int port;
char *path;

if (simplehttp_parse_url(url, strlen(url), &address, &port, &path) == 0) {
fprintf(stderr, "ERROR: invalid destination URL: %s\n", url);
return NULL;
}

sq_dest = malloc(sizeof(struct destination_url));
simplehttp_parse_url(url, strlen(url), &address, &port, &path);
_DEBUG("destination_url: %s\n", url);
_DEBUG("\taddress: %s\n", address);
_DEBUG("\tport: %d\n", port);
Expand Down Expand Up @@ -144,13 +157,38 @@ int destination_get_url_cb(char *value)
return 0;
}

sq_dest = new_destination_url(value);
if ((sq_dest = new_destination_url(value)) == NULL) {
return 0;
}
sq_dest->method = EVHTTP_REQ_GET;
LL_APPEND(destinations, sq_dest);

return 1;
}

int pubsub_url_cb(char *value)
{
struct pubsub_url *ps_url;
ps_url = malloc(sizeof(struct pubsub_url));
ps_url->address = strdup(value);
ps_url->id = num_pubsub_urls;
LL_APPEND(pubsub_urls, ps_url);
num_pubsub_urls++;

return 1;
}

void free_pubsub_urls()
{
struct pubsub_url *ps_url, *tmp;

LL_FOREACH_SAFE(pubsub_urls, ps_url, tmp) {
LL_DELETE(pubsub_urls, ps_url);
free(ps_url->address);
free(ps_url);
}
}

int destination_post_url_cb(char *value)
{
struct destination_url *sq_dest;
Expand All @@ -174,15 +212,16 @@ void free_destination_urls()

int main(int argc, char **argv)
{
char *pubsub_url;
char *secondary_pubsub_url;
struct pubsub_url *current_pubsub_url = NULL;
char *address;
int port;
char *path;

struct timeval cur_time;
int choice;

option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
option_define_str("pubsub_url", OPT_REQUIRED, "http://127.0.0.1:80/sub?multipart=0", &pubsub_url, NULL, "url of pubsub to read from");
option_define_str("secondary_pubsub_url", OPT_OPTIONAL, NULL, &secondary_pubsub_url, NULL, "url of pubsub to read from");
option_define_str("pubsub_url", OPT_OPTIONAL, NULL, NULL, pubsub_url_cb, "(multiple) pubsub url(s) to read from (default: http://127.0.0.1:80/sub?multipart=0)");
option_define_bool("round_robin", OPT_OPTIONAL, 0, &round_robin, NULL, "write round-robin to destination urls");
option_define_str("destination_get_url", OPT_OPTIONAL, NULL, NULL, destination_get_url_cb, "(multiple) url(s) to HTTP GET to\n\t\t\t This URL must contain a %s for the message data\n\t\t\t for a simplequeue use \"http://127.0.0.1:8080/put?data=%s\"");
option_define_str("destination_post_url", OPT_OPTIONAL, NULL, NULL, destination_post_url_cb, "(multiple) url(s) to HTTP POST to\n\t\t\t For a pubsub endpoint use \"http://127.0.0.1:8080/pub\"");
Expand All @@ -197,50 +236,40 @@ int main(int argc, char **argv)
}
init_async_connection_pool(1);

if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
if (pubsub_urls == NULL) {
pubsub_url_cb("http://127.0.0.1:80/sub?multipart=0");
}

gettimeofday(&cur_time, NULL);
srand((unsigned)(cur_time.tv_sec * cur_time.tv_usec));
choice = rand() % num_pubsub_urls;
LL_SEARCH_SCALAR(pubsub_urls, current_pubsub_url, id, choice) ;

fprintf(stderr, "Selecting pubsub: %s\n", current_pubsub_url->address);
if (simplehttp_parse_url(current_pubsub_url->address, strlen(current_pubsub_url->address), &address, &port, &path)) {
pubsubclient_init(address, port, path, process_message_cb, error_cb, NULL);

if (option_get_int("max_silence") > 0) {
_DEBUG("Registering timer.\n");
last_message_timestamp = time(NULL);
max_silence_time.tv_sec = option_get_int("max_silence");
evtimer_set(&silence_ev, silence_cb, NULL);
evtimer_add(&silence_ev, &max_silence_time);
}

pubsubclient_run();
pubsubclient_free();

free(address);
free(path);
free(pubsub_url);
} else {
fprintf(stderr, "ERROR: failed to parse pubsub_url\n");
}
if (secondary_pubsub_url) {
if (simplehttp_parse_url(secondary_pubsub_url, strlen(secondary_pubsub_url), &address, &port, &path)) {
pubsubclient_init(address, port, path, process_message_cb, error_cb, NULL);

if (option_get_int("max_silence") > 0) {
_DEBUG("Registering timer.\n");
max_silence_time.tv_sec = option_get_int("max_silence");
evtimer_del(&silence_ev);
evtimer_set(&silence_ev, silence_cb, NULL);
evtimer_add(&silence_ev, &max_silence_time);
}

pubsubclient_run();

free(address);
free(path);
free(secondary_pubsub_url);
} else {
fprintf(stderr, "ERROR: failed to parse secondary_pubsub_url\n");
}
}

free_pubsub_urls();
free_destination_urls();
free_async_connection_pool();
free_options();
pubsubclient_free();

return 0;
}

0 comments on commit 8d69112

Please sign in to comment.