Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #40 from mreiferson/cstyle

cstyle repo
  • Loading branch information...
commit 4f4f54af7d14f3df19e16c03906c884decfcfb81 2 parents d95637c + d571968
Pierce Lopez ploxiln authored
30 buffered_socket/buffered_socket.c
View
@@ -19,13 +19,13 @@ static void buffered_socket_writecb(struct bufferevent *bev, void *arg);
static void buffered_socket_errorcb(struct bufferevent *bev, short what, void *arg);
static void buffered_socket_connectcb(int fd, short what, void *arg);
-struct BufferedSocket *new_buffered_socket(const char *address, int port,
- void (*connect_callback)(struct BufferedSocket *buffsock, void *arg),
- void (*close_callback)(struct BufferedSocket *buffsock, void *arg),
- void (*read_callback)(struct BufferedSocket *buffsock, struct evbuffer *evb, void *arg),
- void (*write_callback)(struct BufferedSocket *buffsock, void *arg),
- void (*error_callback)(struct BufferedSocket *buffsock, void *arg),
- void *cbarg)
+struct BufferedSocket *new_buffered_socket(const char *address, int port,
+ void (*connect_callback)(struct BufferedSocket *buffsock, void *arg),
+ void (*close_callback)(struct BufferedSocket *buffsock, void *arg),
+ void (*read_callback)(struct BufferedSocket *buffsock, struct evbuffer *evb, void *arg),
+ void (*write_callback)(struct BufferedSocket *buffsock, void *arg),
+ void (*error_callback)(struct BufferedSocket *buffsock, void *arg),
+ void *cbarg)
{
struct BufferedSocket *buffsock;
@@ -119,7 +119,7 @@ static void buffered_socket_connectcb(int fd, short what, void *arg)
return;
}
- if (getsockopt(buffsock->fd, SOL_SOCKET, SO_ERROR, (void*)&error, &errsz) == -1) {
+ if (getsockopt(buffsock->fd, SOL_SOCKET, SO_ERROR, (void *)&error, &errsz) == -1) {
_DEBUG("%s: getsockopt failed for \"%s:%d\" on %d\n",
__FUNCTION__, buffsock->address, buffsock->port, buffsock->fd);
buffered_socket_close(buffsock);
@@ -133,13 +133,13 @@ static void buffered_socket_connectcb(int fd, short what, void *arg)
return;
}
- _DEBUG("%s: connected to \"%s:%d\" on %d\n",
+ _DEBUG("%s: connected to \"%s:%d\" on %d\n",
__FUNCTION__, buffsock->address, buffsock->port, buffsock->fd);
-
+
buffsock->state = BS_CONNECTED;
- buffsock->bev = bufferevent_new(buffsock->fd,
- buffered_socket_readcb, buffered_socket_writecb, buffered_socket_errorcb,
- (void *)buffsock);
+ buffsock->bev = bufferevent_new(buffsock->fd,
+ buffered_socket_readcb, buffered_socket_writecb, buffered_socket_errorcb,
+ (void *)buffsock);
bufferevent_enable(buffsock->bev, EV_READ);
if (buffsock->connect_callback) {
@@ -149,9 +149,9 @@ static void buffered_socket_connectcb(int fd, short what, void *arg)
void buffered_socket_close(struct BufferedSocket *buffsock)
{
- _DEBUG("%s: closing \"%s:%d\" on %d\n",
+ _DEBUG("%s: closing \"%s:%d\" on %d\n",
__FUNCTION__, buffsock->address, buffsock->port, buffsock->fd);
-
+
buffsock->state = BS_DISCONNECTED;
if (event_initialized(&buffsock->conn_ev)) {
20 buffered_socket/buffered_socket.h
View
@@ -5,8 +5,8 @@
enum BufferedSocketStates {
BS_INIT,
- BS_CONNECTING,
- BS_CONNECTED,
+ BS_CONNECTING,
+ BS_CONNECTED,
BS_DISCONNECTED
};
@@ -20,18 +20,18 @@ struct BufferedSocket {
void (*connect_callback)(struct BufferedSocket *buffsock, void *arg);
void (*close_callback)(struct BufferedSocket *buffsock, void *arg);
void (*read_callback)(struct BufferedSocket *buffsock, struct evbuffer *evb, void *arg);
- void (*write_callback)(struct BufferedSocket *buffsock, void *arg);
+ void (*write_callback)(struct BufferedSocket *buffsock, void *arg);
void (*error_callback)(struct BufferedSocket *buffsock, void *arg);
void *cbarg;
};
-struct BufferedSocket *new_buffered_socket(const char *address, int port,
- void (*connect_callback)(struct BufferedSocket *buffsock, void *arg),
- void (*close_callback)(struct BufferedSocket *buffsock, void *arg),
- void (*read_callback)(struct BufferedSocket *buffsock, struct evbuffer *evb, void *arg),
- void (*write_callback)(struct BufferedSocket *buffsock, void *arg),
- void (*error_callback)(struct BufferedSocket *buffsock, void *arg),
- void *cbarg);
+struct BufferedSocket *new_buffered_socket(const char *address, int port,
+ void (*connect_callback)(struct BufferedSocket *buffsock, void *arg),
+ void (*close_callback)(struct BufferedSocket *buffsock, void *arg),
+ void (*read_callback)(struct BufferedSocket *buffsock, struct evbuffer *evb, void *arg),
+ void (*write_callback)(struct BufferedSocket *buffsock, void *arg),
+ void (*error_callback)(struct BufferedSocket *buffsock, void *arg),
+ void *cbarg);
void free_buffered_socket(struct BufferedSocket *socket);
int buffered_socket_connect(struct BufferedSocket *buffsock);
void buffered_socket_close(struct BufferedSocket *socket);
4 buffered_socket/demo.c
View
@@ -76,8 +76,8 @@ int main(int argc, char **argv)
event_init();
buffsock = new_buffered_socket("127.0.0.1", 5150,
- connect_cb, close_cb, read_cb, write_cb, error_cb, "hello world\n");
-
+ connect_cb, close_cb, read_cb, write_cb, error_cb, "hello world\n");
+
signal(SIGINT, termination_handler);
signal(SIGQUIT, termination_handler);
signal(SIGTERM, termination_handler);
6 domain_socket/demo.c
View
@@ -59,9 +59,9 @@ int main(int argc, char **argv)
{
event_init();
- if (!(uds = new_domain_socket("/tmp/domain_socket_test",
- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH,
- uds_on_read, uds_on_write, uds_on_error, 64))) {
+ if (!(uds = new_domain_socket("/tmp/domain_socket_test",
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH,
+ uds_on_read, uds_on_write, uds_on_error, 64))) {
fprintf(stdout, "ERROR: new_domain_socket() failed\n");
exit(1);
}
18 domain_socket/domain_socket.c
View
@@ -17,15 +17,15 @@
#endif
static struct DSClient *new_domain_socket_client(struct DomainSocket *uds,
- int client_fd, struct sockaddr *sa, socklen_t salen);
+ int client_fd, struct sockaddr *sa, socklen_t salen);
static void free_domain_socket_client(struct DSClient *client);
static void accept_socket(int fd, short what, void *arg);
-struct DomainSocket *new_domain_socket(const char *path, int access_mask,
- void (*read_callback)(struct DSClient *client),
- void (*write_callback)(struct DSClient *client),
- void (*error_callback)(struct DSClient *client),
- int listen_backlog)
+struct DomainSocket *new_domain_socket(const char *path, int access_mask,
+ void (*read_callback)(struct DSClient *client),
+ void (*write_callback)(struct DSClient *client),
+ void (*error_callback)(struct DSClient *client),
+ int listen_backlog)
{
struct linger ling = {0, 0};
struct sockaddr_un addr;
@@ -50,7 +50,7 @@ struct DomainSocket *new_domain_socket(const char *path, int access_mask,
}
// clean up a previous socket file if we left it around
- if (lstat(path, &tstat) == 0) {
+ if (lstat(path, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode)) {
unlink(path);
}
@@ -110,7 +110,7 @@ void free_domain_socket(struct DomainSocket *uds)
close(uds->fd);
}
- if (lstat(uds->path, &tstat) == 0) {
+ if (lstat(uds->path, &tstat) == 0) {
if (S_ISSOCK(tstat.st_mode)) {
unlink(uds->path);
}
@@ -198,7 +198,7 @@ void domain_socket_client_write(struct DSClient *client, void *data, size_t len)
}
static struct DSClient *new_domain_socket_client(struct DomainSocket *uds,
- int client_fd, struct sockaddr *sa, socklen_t salen)
+ int client_fd, struct sockaddr *sa, socklen_t salen)
{
struct DSClient *client;
10 domain_socket/domain_socket.h
View
@@ -18,11 +18,11 @@ struct DSClient {
struct DomainSocket *uds;
};
-struct DomainSocket *new_domain_socket(const char *path, int access_mask,
- void (*read_callback)(struct DSClient *client),
- void (*write_callback)(struct DSClient *client),
- void (*error_callback)(struct DSClient *client),
- int listen_backlog);
+struct DomainSocket *new_domain_socket(const char *path, int access_mask,
+ void (*read_callback)(struct DSClient *client),
+ void (*write_callback)(struct DSClient *client),
+ void (*error_callback)(struct DSClient *client),
+ int listen_backlog);
void free_domain_socket(struct DomainSocket *uds);
void domain_socket_client_write(struct DSClient *client, void *data, size_t len);
48 host_pool/host_pool.c
View
@@ -19,8 +19,8 @@
* max_retry_interval - the maximum seconds to wait between retries
* reset_on_all_failed - reset all hosts to alive when all are marked as failed.
*/
-struct HostPool *new_host_pool(int retry_failed_hosts, int retry_interval,
- int max_retry_interval, int reset_on_all_failed)
+struct HostPool *new_host_pool(int retry_failed_hosts, int retry_interval,
+ int max_retry_interval, int reset_on_all_failed)
{
struct HostPool *host_pool;
@@ -51,8 +51,8 @@ void free_host_pool(struct HostPool *host_pool)
}
}
-struct HostPoolEndpoint *new_host_pool_endpoint(struct HostPool *host_pool,
- char *address, int port, char *path)
+struct HostPoolEndpoint *new_host_pool_endpoint(struct HostPool *host_pool,
+ char *address, int port, char *path)
{
struct HostPoolEndpoint *host_pool_endpoint;
@@ -100,8 +100,8 @@ void host_pool_from_json(struct HostPool *host_pool, json_object *host_pool_endp
}
}
-struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
- enum HostPoolEndpointSelectionMode mode, int64_t state)
+struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
+ enum HostPoolEndpointSelectionMode mode, int64_t state)
{
struct HostPoolEndpoint *endpoint;
int c;
@@ -117,8 +117,8 @@ struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
return endpoint;
}
- if ((host_pool->retry_failed_hosts == -1) ||
- (endpoint->retry_count <= host_pool->retry_failed_hosts)) {
+ if ((host_pool->retry_failed_hosts == -1) ||
+ (endpoint->retry_count <= host_pool->retry_failed_hosts)) {
time(&now);
if (endpoint->next_retry < now) {
endpoint->retry_count++;
@@ -140,8 +140,8 @@ struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
//
// this ensures we always try each endpoint in the host pool
//
- // however, if we were asked to find a random endpoint, randomize once
- // more so that the endpoint following the failed endpoint won't get a
+ // however, if we were asked to find a random endpoint, randomize once
+ // more so that the endpoint following the failed endpoint won't get a
// disproportionate number of additional requests
if (mode == HOST_POOL_RANDOM) {
host_pool_next_endpoint(host_pool, mode, state);
@@ -157,8 +157,8 @@ struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
return NULL;
}
-struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
- enum HostPoolEndpointSelectionMode mode, int64_t state)
+struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
+ enum HostPoolEndpointSelectionMode mode, int64_t state)
{
int index;
@@ -171,17 +171,17 @@ struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
break;
case HOST_POOL_ROUND_ROBIN:
// round-robin through the endpoints for each request
- host_pool->current_endpoint = host_pool->current_endpoint ?
- (host_pool->current_endpoint->hh.next ? host_pool->current_endpoint->hh.next :
- host_pool->endpoints) : host_pool->endpoints;
+ host_pool->current_endpoint = host_pool->current_endpoint ?
+ (host_pool->current_endpoint->hh.next ? host_pool->current_endpoint->hh.next :
+ host_pool->endpoints) : host_pool->endpoints;
break;
case HOST_POOL_SINGLE:
// choose the same endpoint for all requests for this message
if (state != host_pool->checkpoint) {
host_pool->checkpoint = state;
- host_pool->current_endpoint = host_pool->current_endpoint ?
- (host_pool->current_endpoint->hh.next ? host_pool->current_endpoint->hh.next :
- host_pool->endpoints) : host_pool->endpoints;
+ host_pool->current_endpoint = host_pool->current_endpoint ?
+ (host_pool->current_endpoint->hh.next ? host_pool->current_endpoint->hh.next :
+ host_pool->endpoints) : host_pool->endpoints;
}
break;
}
@@ -198,9 +198,9 @@ void host_pool_mark_success(struct HostPool *host_pool, int id)
HASH_FIND_INT(host_pool->endpoints, &id, endpoint);
assert(endpoint != NULL);
- _DEBUG("HOST_POOL: marking endpoint #%d (%s:%d%s) as SUCCESS\n",
- endpoint->id, endpoint->address, endpoint->port, endpoint->path);
-
+ _DEBUG("HOST_POOL: marking endpoint #%d (%s:%d%s) as SUCCESS\n",
+ endpoint->id, endpoint->address, endpoint->port, endpoint->path);
+
endpoint->alive = 1;
}
@@ -212,9 +212,9 @@ void host_pool_mark_failed(struct HostPool *host_pool, int id)
HASH_FIND_INT(host_pool->endpoints, &id, endpoint);
assert(endpoint != NULL);
- _DEBUG("HOST_POOL: marking endpoint #%d (%s:%d%s) as FAILED\n",
- endpoint->id, endpoint->address, endpoint->port, endpoint->path);
-
+ _DEBUG("HOST_POOL: marking endpoint #%d (%s:%d%s) as FAILED\n",
+ endpoint->id, endpoint->address, endpoint->port, endpoint->path);
+
if (endpoint->alive) {
endpoint->alive = 0;
endpoint->retry_count = 0;
16 host_pool/host_pool.h
View
@@ -33,17 +33,17 @@ enum HostPoolEndpointSelectionMode {
HOST_POOL_SINGLE
};
-struct HostPool *new_host_pool(int retry_failed_hosts, int retry_interval,
- int max_retry_interval, int reset_on_all_failed);
+struct HostPool *new_host_pool(int retry_failed_hosts, int retry_interval,
+ int max_retry_interval, int reset_on_all_failed);
void free_host_pool(struct HostPool *host_pool);
-struct HostPoolEndpoint *new_host_pool_endpoint(struct HostPool *host_pool,
- char *address, int port, char *path);
+struct HostPoolEndpoint *new_host_pool_endpoint(struct HostPool *host_pool,
+ char *address, int port, char *path);
void free_host_pool_endpoint(struct HostPoolEndpoint *host_pool_endpoint);
void host_pool_from_json(struct HostPool *host_pool, json_object *host_pool_endpoint_list);
-struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
- enum HostPoolEndpointSelectionMode mode, int64_t state);
-struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
- enum HostPoolEndpointSelectionMode mode, int64_t state);
+struct HostPoolEndpoint *host_pool_get_endpoint(struct HostPool *host_pool,
+ enum HostPoolEndpointSelectionMode mode, int64_t state);
+struct HostPoolEndpoint *host_pool_next_endpoint(struct HostPool *host_pool,
+ enum HostPoolEndpointSelectionMode mode, int64_t state);
void host_pool_mark_success(struct HostPool *host_pool, int id);
void host_pool_mark_failed(struct HostPool *host_pool, int id);
void host_pool_reset(struct HostPool *host_pool);
74 profiler_stats/profiler_stats.c
View
@@ -237,48 +237,48 @@ struct ProfilerStat *profiler_stats_get_all()
#if _POSIX_TIMERS > 0
- inline void profiler_ts_get(struct timespec *ts)
- {
- clock_gettime(CLOCK_REALTIME, ts);
- }
+inline void profiler_ts_get(struct timespec *ts)
+{
+ clock_gettime(CLOCK_REALTIME, ts);
+}
+
+inline unsigned int profiler_ts_diff(struct timespec start, struct timespec end)
+{
+ struct timespec temp;
- inline unsigned int profiler_ts_diff(struct timespec start, struct timespec end)
- {
- struct timespec temp;
-
- if ((end.tv_nsec - start.tv_nsec) < 0) {
- temp.tv_sec = end.tv_sec - start.tv_sec - 1;
- temp.tv_nsec = 1000000000 + end.tv_nsec - start.tv_nsec;
- } else {
- temp.tv_sec = end.tv_sec - start.tv_sec;
- temp.tv_nsec = end.tv_nsec - start.tv_nsec;
- }
-
- // return usec as int
- return (temp.tv_sec * 1000000) + (temp.tv_nsec / 1000);
+ if ((end.tv_nsec - start.tv_nsec) < 0) {
+ temp.tv_sec = end.tv_sec - start.tv_sec - 1;
+ temp.tv_nsec = 1000000000 + end.tv_nsec - start.tv_nsec;
+ } else {
+ temp.tv_sec = end.tv_sec - start.tv_sec;
+ temp.tv_nsec = end.tv_nsec - start.tv_nsec;
}
+
+ // return usec as int
+ return (temp.tv_sec * 1000000) + (temp.tv_nsec / 1000);
+}
#else
- inline void profiler_ts_get(struct timeval *ts)
- {
- gettimeofday(ts, NULL);
- }
-
- inline unsigned int profiler_ts_diff(struct timeval start, struct timeval end)
- {
- struct timeval temp;
-
- if ((end.tv_usec - start.tv_usec) < 0) {
- temp.tv_sec = end.tv_sec - start.tv_sec - 1;
- temp.tv_usec = 1000000 + end.tv_usec - start.tv_usec;
- } else {
- temp.tv_sec = end.tv_sec - start.tv_sec;
- temp.tv_usec = end.tv_usec - start.tv_usec;
- }
-
- // return usec as int
- return (temp.tv_sec * 1000000) + temp.tv_usec;
+inline void profiler_ts_get(struct timeval *ts)
+{
+ gettimeofday(ts, NULL);
+}
+
+inline unsigned int profiler_ts_diff(struct timeval start, struct timeval end)
+{
+ struct timeval temp;
+
+ if ((end.tv_usec - start.tv_usec) < 0) {
+ temp.tv_sec = end.tv_sec - start.tv_sec - 1;
+ temp.tv_usec = 1000000 + end.tv_usec - start.tv_usec;
+ } else {
+ temp.tv_sec = end.tv_sec - start.tv_sec;
+ temp.tv_usec = end.tv_usec - start.tv_usec;
}
+ // return usec as int
+ return (temp.tv_sec * 1000000) + temp.tv_usec;
+}
+
#endif
16 profiler_stats/profiler_stats.h
View
@@ -9,17 +9,17 @@
#if _POSIX_TIMERS > 0
- typedef struct timespec profiler_ts;
-
- inline void profiler_ts_get(struct timespec *ts);
- inline unsigned int profiler_ts_diff(struct timespec start, struct timespec end);
+typedef struct timespec profiler_ts;
+
+inline void profiler_ts_get(struct timespec *ts);
+inline unsigned int profiler_ts_diff(struct timespec start, struct timespec end);
#else
- typedef struct timeval profiler_ts;
-
- inline void profiler_ts_get(struct timeval *ts);
- inline unsigned int profiler_ts_diff(struct timeval start, struct timeval end);
+typedef struct timeval profiler_ts;
+
+inline void profiler_ts_get(struct timeval *ts);
+inline unsigned int profiler_ts_diff(struct timeval start, struct timeval end);
#endif
57 ps_to_http/ps_to_http.c
View
@@ -30,12 +30,13 @@ time_t last_message_timestamp = 0;
struct timeval max_silence_time = {0, 0};
struct event silence_ev;
-struct destination_url *new_destination_url(char *url) {
+struct destination_url *new_destination_url(char *url)
+{
struct destination_url *sq_dest;
char *address;
int port;
char *path;
-
+
sq_dest = malloc(sizeof(struct destination_url));
simplehttp_parse_url(url, strlen(url), &address, &port, &path);
_DEBUG("destination_url: %s\n", url);
@@ -47,7 +48,7 @@ struct destination_url *new_destination_url(char *url) {
sq_dest->path = path;
sq_dest->next = NULL;
sq_dest->method = EVHTTP_REQ_GET;
-
+
return sq_dest;
}
@@ -75,8 +76,8 @@ void silence_cb(int fd, short what, void *ctx)
_DEBUG("Testing for infinite silence\n");
if ( time(NULL) - last_message_timestamp > max_silence_time.tv_sec ) {
_DEBUG("Things are too quiet... time to quit!\n");
- fprintf(stderr, "Exiting: No messages recieved for %lu seconds (limit: %lu seconds)\n", (time(NULL)-last_message_timestamp), max_silence_time.tv_sec);
- error_cb(-127, (void*)NULL);
+ fprintf(stderr, "Exiting: No messages recieved for %lu seconds (limit: %lu seconds)\n", (time(NULL) - last_message_timestamp), max_silence_time.tv_sec);
+ error_cb(-127, (void *)NULL);
} else {
evtimer_del(&silence_ev);
evtimer_set(&silence_ev, silence_cb, NULL);
@@ -89,17 +90,17 @@ void process_message_cb(char *message, void *cb_arg)
struct evbuffer *evb;
char *encoded_message;
struct destination_url *destination;
-
+
_DEBUG("process_message_cb()\n");
-
+
if (message == NULL || strlen(message) < 3) {
return;
}
-
+
if (option_get_int("max_silence") > 0) {
last_message_timestamp = time(NULL);
}
-
+
if (!current_destination) {
// start loop over again for round-robin
current_destination = destinations;
@@ -110,16 +111,16 @@ void process_message_cb(char *message, void *cb_arg)
encoded_message = simplehttp_encode_uri(message);
evbuffer_add_printf(evb, destination->path, encoded_message);
//_DEBUG("process_message_cb(GET %s)\n", (char *)EVBUFFER_DATA(evb));
- new_async_request(destination->address, destination->port, (char *)EVBUFFER_DATA(evb),
- finish_destination_cb, NULL);
+ new_async_request(destination->address, destination->port, (char *)EVBUFFER_DATA(evb),
+ finish_destination_cb, NULL);
evbuffer_free(evb);
free(encoded_message);
} else {
//_DEBUG("process_message_cb(POST %s:%d%s)\n", destination->address, destination->port, destination->path);
- new_async_request_with_body(EVHTTP_REQ_POST, destination->address, destination->port, destination->path,
- NULL, message, finish_destination_cb, NULL);
+ new_async_request_with_body(EVHTTP_REQ_POST, destination->address, destination->port, destination->path,
+ NULL, message, finish_destination_cb, NULL);
}
-
+
if (round_robin) {
// break and set the next loop to start at the next destination
current_destination = destination->next;
@@ -137,34 +138,34 @@ int version_cb(int value)
int destination_get_url_cb(char *value)
{
struct destination_url *sq_dest;
-
+
if (strstr(value, "%s") == NULL) {
fprintf(stderr, "ERROR: --destination-get-url=\"%s\" must contain a '%%s' for message data\n", value);
return 0;
}
-
+
sq_dest = new_destination_url(value);
sq_dest->method = EVHTTP_REQ_GET;
LL_APPEND(destinations, sq_dest);
-
+
return 1;
}
int destination_post_url_cb(char *value)
{
struct destination_url *sq_dest;
-
+
sq_dest = new_destination_url(value);
sq_dest->method = EVHTTP_REQ_POST;
LL_APPEND(destinations, sq_dest);
-
+
return 1;
}
void free_destination_urls()
{
struct destination_url *destination, *tmp;
-
+
LL_FOREACH_SAFE(destinations, destination, tmp) {
LL_DELETE(destinations, destination);
free_destination_url(destination);
@@ -177,14 +178,14 @@ int main(int argc, char **argv)
char *address;
int port;
char *path;
-
+
option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
option_define_str("pubsub_url", OPT_REQUIRED, "http://127.0.0.1:80/sub?multipart=0", &pubsub_url, NULL, "url of pubsub to read from");
option_define_bool("round_robin", OPT_OPTIONAL, 0, &round_robin, NULL, "write round-robin to destination urls");
option_define_str("destination_get_url", OPT_OPTIONAL, NULL, NULL, destination_get_url_cb, "(multiple) url(s) to HTTP GET to\n\t\t\t This URL must contain a %s for the message data\n\t\t\t for a simplequeue use \"http://127.0.0.1:8080/put?data=%s\"");
option_define_str("destination_post_url", OPT_OPTIONAL, NULL, NULL, destination_post_url_cb, "(multiple) url(s) to HTTP POST to\n\t\t\t For a pubsub endpoint use \"http://127.0.0.1:8080/pub\"");
option_define_int("max_silence", OPT_OPTIONAL, -1, NULL, NULL, "Maximum time between pubsub messages before we disconnect and quit");
-
+
if (!option_parse_command_line(argc, argv)) {
return 1;
}
@@ -193,30 +194,30 @@ int main(int argc, char **argv)
return 1;
}
init_async_connection_pool(1);
-
+
if (simplehttp_parse_url(pubsub_url, strlen(pubsub_url), &address, &port, &path)) {
pubsubclient_init(address, port, path, process_message_cb, error_cb, NULL);
-
+
if (option_get_int("max_silence") > 0) {
_DEBUG("Registering timer.\n");
max_silence_time.tv_sec = option_get_int("max_silence");
evtimer_set(&silence_ev, silence_cb, NULL);
evtimer_add(&silence_ev, &max_silence_time);
}
-
+
pubsubclient_run();
pubsubclient_free();
-
+
free(address);
free(path);
} else {
fprintf(stderr, "ERROR: failed to parse pubsub_url\n");
}
-
+
free_destination_urls();
free_async_connection_pool();
free_options();
free(pubsub_url);
-
+
return 0;
}
10 pubsub/pubsub.c
View
@@ -196,13 +196,11 @@ void pub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
int message_offset = 0;
int num_messages = 0;
char *current_message;
-
+
evhttp_parse_query(req->uri, &args);
-
- for (j=0; j<=EVBUFFER_LENGTH(req->input_buffer); j++)
- {
- if (j == EVBUFFER_LENGTH(req->input_buffer) || *(EVBUFFER_DATA(req->input_buffer) + j) == '\n')
- {
+
+ for (j = 0; j <= EVBUFFER_LENGTH(req->input_buffer); j++) {
+ if (j == EVBUFFER_LENGTH(req->input_buffer) || *(EVBUFFER_DATA(req->input_buffer) + j) == '\n') {
message_length = j - message_offset ;
current_message = EVBUFFER_DATA(req->input_buffer) + message_offset;
3  pubsubclient/stream_request.c
View
@@ -22,7 +22,8 @@ struct StreamRequest *new_stream_request(const char *method, const char *source_
void (*header_cb)(struct bufferevent *bev, struct evkeyvalq *headers, void *arg),
void (*read_cb)(struct bufferevent *bev, void *arg),
void (*error_cb)(struct bufferevent *bev, void *arg),
- void *arg) {
+ void *arg)
+{
struct StreamRequest *sr;
int fd;
struct evbuffer *http_request;
28 queuereader/queuereader.c
View
@@ -80,8 +80,8 @@ void queuereader_decrement_backoff()
void queuereader_increment_backoff()
{
// bounded increment of backoff counter
- backoff_counter = ((backoff_counter + 1) > MAX_BACKOFF_COUNTER) ?
- MAX_BACKOFF_COUNTER : (backoff_counter + 1);
+ backoff_counter = ((backoff_counter + 1) > MAX_BACKOFF_COUNTER) ?
+ MAX_BACKOFF_COUNTER : (backoff_counter + 1);
}
void queuereader_finish_message(int return_code)
@@ -113,8 +113,8 @@ void queuereader_finish_message(int return_code)
void queuereader_requeue_message_cb(struct evhttp_request *req, void *cbarg)
{
// TODO: this should limit (and perhaps backoff) failed requeue requests
- queuereader_finish_message((req && req->response_code == 200) ?
- QR_CONT_SOURCE_REQUEST : QR_REQUEUE_WITHOUT_BACKOFF);
+ queuereader_finish_message((req && req->response_code == 200) ?
+ QR_CONT_SOURCE_REQUEST : QR_REQUEUE_WITHOUT_BACKOFF);
}
void queuereader_requeue_message_request()
@@ -126,8 +126,8 @@ void queuereader_requeue_message_request()
evb = evbuffer_new();
encoded_message = simplehttp_encode_uri(message);
evbuffer_add_printf(evb, "/put?data=%s", encoded_message);
- new_async_request((char *)data->source_address, data->source_port,
- (char *)EVBUFFER_DATA(evb), queuereader_requeue_message_cb, (void *)NULL);
+ new_async_request((char *)data->source_address, data->source_port,
+ (char *)EVBUFFER_DATA(evb), queuereader_requeue_message_cb, (void *)NULL);
evbuffer_free(evb);
free(encoded_message);
}
@@ -166,8 +166,8 @@ void queuereader_source_request(int fd, short what, void *cbarg)
evb = evbuffer_new();
evbuffer_add_printf(evb, client_data->path);
- new_async_request((char *)client_data->source_address, client_data->source_port,
- (char *)EVBUFFER_DATA(evb), queuereader_source_cb, cbarg);
+ new_async_request((char *)client_data->source_address, client_data->source_port,
+ (char *)EVBUFFER_DATA(evb), queuereader_source_cb, cbarg);
evbuffer_free(evb);
}
@@ -177,9 +177,9 @@ void queuereader_set_sleeptime_queue_empty_ms(int milliseconds)
}
void queuereader_init(const char *source_address, int source_port, const char *path,
- int (*message_cb)(char *data, void *arg),
- void (*error_cb)(int status_code, void *arg),
- void *cbarg)
+ int (*message_cb)(char *data, void *arg),
+ void (*error_cb)(int status_code, void *arg),
+ void *cbarg)
{
int timeout_seconds = floor((sleeptime_queue_empty_ms * 1000) / 1000000);
int timeout_microseconds = (sleeptime_queue_empty_ms * 1000) - (timeout_seconds * 1000000);
@@ -208,9 +208,9 @@ void queuereader_init(const char *source_address, int source_port, const char *p
}
int queuereader_main(const char *source_address, int source_port, const char *path,
- int (*message_cb)(char *data, void *arg),
- void (*error_cb)(int status_code, void *arg),
- void *cbarg)
+ int (*message_cb)(char *data, void *arg),
+ void (*error_cb)(int status_code, void *arg),
+ void *cbarg)
{
queuereader_init(source_address, source_port, path, message_cb, error_cb, cbarg);
queuereader_run();
12 queuereader/queuereader.h
View
@@ -9,15 +9,15 @@
#define QR_REQUEUE_WITHOUT_BACKOFF 5
int queuereader_main(const char *source_address, int source_port, const char *path,
- int (*message_cb)(char *data, void *arg),
- void (*error_cb)(int status_code, void *arg),
- void *cbarg);
+ int (*message_cb)(char *data, void *arg),
+ void (*error_cb)(int status_code, void *arg),
+ void *cbarg);
void queuereader_run();
void queuereader_free();
void queuereader_init(const char *source_address, int source_port, const char *path,
- int (*message_cb)(char *data, void *arg),
- void (*error_cb)(int status_code, void *arg),
- void *cbarg);
+ int (*message_cb)(char *data, void *arg),
+ void (*error_cb)(int status_code, void *arg),
+ void *cbarg);
void queuereader_finish_message(int return_code);
void queuereader_set_sleeptime_queue_empty_ms(int milliseconds);
12 simplehttp/async_simplehttp.c
View
@@ -75,8 +75,8 @@ struct evhttp_connection *get_connection(char *address, int port, struct Connect
}
struct AsyncCallbackGroup *new_async_callback_group(struct evhttp_request *req,
- void (*finished_cb)(struct evhttp_request *, void *),
- void *finished_cb_arg)
+ void (*finished_cb)(struct evhttp_request *, void *),
+ void *finished_cb_arg)
{
struct AsyncCallbackGroup *callback_group = NULL;
@@ -103,15 +103,15 @@ void free_async_callback_group(struct AsyncCallbackGroup *callback_group)
}
}
-struct AsyncCallback *new_async_request(char *address, int port, char *path,
+struct AsyncCallback *new_async_request(char *address, int port, char *path,
void (*cb)(struct evhttp_request *, void *), void *cb_arg)
{
return new_async_request_with_body(EVHTTP_REQ_GET, address, port, path, NULL, NULL, cb, cb_arg);
}
-struct AsyncCallback *new_async_request_with_body(int request_method, char *address, int port, char *path,
- struct RequestHeader *header_list, char *body,
- void (*cb)(struct evhttp_request *, void *), void *cb_arg)
+struct AsyncCallback *new_async_request_with_body(int request_method, char *address, int port, char *path,
+ struct RequestHeader *header_list, char *body,
+ void (*cb)(struct evhttp_request *, void *), void *cb_arg)
{
static uint64_t counter = 0;
// create new connection to endpoint
3  simplehttp/options.c
View
@@ -256,7 +256,8 @@ char option_get_char(const char *option_name)
return option->default_char;
}
-struct Option *new_option(const char *option_name, int required, const char *help) {
+struct Option *new_option(const char *option_name, int required, const char *help)
+{
struct Option *option;
char *tmp_option_name = strdup(option_name);
if (format_option_name(tmp_option_name)) {
9 simplehttp/request.c
View
@@ -10,7 +10,8 @@
extern int simplehttp_logging;
-struct simplehttp_request *simplehttp_request_new(struct evhttp_request *req, uint64_t id) {
+struct simplehttp_request *simplehttp_request_new(struct evhttp_request *req, uint64_t id)
+{
struct simplehttp_request *s_req;
simplehttp_ts start_ts;
@@ -28,7 +29,8 @@ struct simplehttp_request *simplehttp_request_new(struct evhttp_request *req, ui
return s_req;
}
-struct simplehttp_request *simplehttp_request_get(struct evhttp_request *req) {
+struct simplehttp_request *simplehttp_request_get(struct evhttp_request *req)
+{
struct simplehttp_request *entry;
TAILQ_FOREACH(entry, &simplehttp_reqs, entries) {
@@ -49,7 +51,8 @@ uint64_t simplehttp_request_id(struct evhttp_request *req)
return entry ? entry->id : 0;
}
-struct simplehttp_request *simplehttp_async_check(struct evhttp_request *req) {
+struct simplehttp_request *simplehttp_async_check(struct evhttp_request *req)
+{
struct simplehttp_request *entry;
entry = simplehttp_request_get(req);
8 simplehttp/simplehttp.h
View
@@ -75,11 +75,11 @@ struct AsyncCallbackGroup *new_async_callback_group(struct evhttp_request *req,
/* create a new AsyncCallback. delegation of memory for this callback
will be passed to callback_group */
int new_async_callback(struct AsyncCallbackGroup *callback_group, char *address, int port, char *path, void (*cb)(struct evhttp_request *, void *), void *cb_arg);
-struct AsyncCallback *new_async_request(char *address, int port, char *path,
+struct AsyncCallback *new_async_request(char *address, int port, char *path,
void (*cb)(struct evhttp_request *, void *), void *cb_arg);
-struct AsyncCallback *new_async_request_with_body(int request_method, char *address, int port, char *path,
- struct RequestHeader *header_list, char *body,
- void (*cb)(struct evhttp_request *, void *), void *cb_arg);
+struct AsyncCallback *new_async_request_with_body(int request_method, char *address, int port, char *path,
+ struct RequestHeader *header_list, char *body,
+ void (*cb)(struct evhttp_request *, void *), void *cb_arg);
void free_async_callback_group(struct AsyncCallbackGroup *callback_group);
void init_async_connection_pool(int enable_request_logging);
void free_async_connection_pool();
3  simplehttp/stat.c
View
@@ -40,7 +40,8 @@ void simplehttp_stats_destruct()
free(stats_counts);
}
-struct simplehttp_stats *simplehttp_stats_new() {
+struct simplehttp_stats *simplehttp_stats_new()
+{
struct simplehttp_stats *st;
st = malloc(sizeof(struct simplehttp_stats));
70 simplehttp/util.c
View
@@ -132,39 +132,39 @@ char *simplehttp_encode_uri(const char *uri)
*/
char *simplehttp_strnstr(const char *s, const char *find, size_t slen)
{
- char c, sc;
- size_t len;
-
- // exit if the end of the strung
- if ((c = *find++) != '\0') {
-
- // get the length of the string to find, shortens as we loop
- len = strlen(find);
-
- // compare the passed and find string at the current position. we would
- // have iterated up to and including the first char of the find string in
- // the search string, now compare from there, if no match loop again
- do {
-
- // go until we get the starting char of the find string or until
- // we run out of chars to search either by count or end of string
- do {
- if (slen-- < 1 || (sc = *s++) == '\0') {
- return NULL;
- }
- } while (sc != c);
-
- // no more chars to search then exit
- if (len > slen) {
- return NULL;
- }
-
- } while (strncmp(s, find, len) != 0);
-
- // when find string matches go one position back to be at start index
- s--;
- }
-
- // return pointer to start pos of found string
- return (char *)s;
+ char c, sc;
+ size_t len;
+
+ // exit if the end of the strung
+ if ((c = *find++) != '\0') {
+
+ // get the length of the string to find, shortens as we loop
+ len = strlen(find);
+
+ // compare the passed and find string at the current position. we would
+ // have iterated up to and including the first char of the find string in
+ // the search string, now compare from there, if no match loop again
+ do {
+
+ // go until we get the starting char of the find string or until
+ // we run out of chars to search either by count or end of string
+ do {
+ if (slen-- < 1 || (sc = *s++) == '\0') {
+ return NULL;
+ }
+ } while (sc != c);
+
+ // no more chars to search then exit
+ if (len > slen) {
+ return NULL;
+ }
+
+ } while (strncmp(s, find, len) != 0);
+
+ // when find string matches go one position back to be at start index
+ s--;
+ }
+
+ // return pointer to start pos of found string
+ return (char *)s;
}
36 simpleleveldb/simpleleveldb.c
View
@@ -311,11 +311,11 @@ void fwmatch_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
leveldb_readoptions_t *fw_read_options;
leveldb_iterator_t *fw_iter;
int result_count = 0, result_limit = 0;
-
+
evhttp_parse_query(req->uri, &args);
fw_key = (char *)evhttp_find_header(&args, "key");
result_limit = get_int_argument(&args, "limit", 500);
-
+
jsobj = json_object_new_object();
result_array = json_object_new_array();
tmp_obj = NULL;
@@ -324,19 +324,19 @@ void fwmatch_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
finalize_request(400, "MISSING_ARG_KEY", req, evb, &args, jsobj);
return;
}
-
+
fw_read_options = leveldb_readoptions_create();
fw_snapshot = leveldb_create_snapshot(ldb);
leveldb_readoptions_set_snapshot(fw_read_options, fw_snapshot);
fw_iter = leveldb_create_iterator(ldb, fw_read_options);
-
+
leveldb_iter_seek(fw_iter, fw_key, strlen(fw_key));
-
+
while (leveldb_iter_valid(fw_iter) && (result_limit == 0 || result_count < result_limit)) {
key = leveldb_iter_key(fw_iter, &key_len);
- key_clean = (char*)key;
+ key_clean = (char *)key;
DUPE_N_TERMINATE(key_clean, key_len, tmp);
-
+
// this is the case where we are only fwing keys of this prefix
// so we need to break out of the loop at the last key
if (strlen(fw_key) > key_len || strncmp(key_clean, fw_key, strlen(fw_key)) != 0 ) {
@@ -344,22 +344,22 @@ void fwmatch_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
break;
}
value = leveldb_iter_value(fw_iter, &value_len);
- value_clean = (char*)value;
+ value_clean = (char *)value;
DUPE_N_TERMINATE(value_clean, value_len, tmp);
tmp_obj = json_object_new_object();
json_object_object_add(tmp_obj, key_clean, json_object_new_string(value_clean));
json_object_array_add(result_array, tmp_obj);
-
+
leveldb_iter_next(fw_iter);
result_count ++;
-
+
free(key_clean);
free(value_clean);
}
json_object_object_add(jsobj, "data", result_array);
json_object_object_add(jsobj, "status", json_object_new_string(result_count ? "ok" : "no results"));
-
+
finalize_request(200, NULL, req, evb, &args, jsobj);
leveldb_iter_destroy(fw_iter);
@@ -492,7 +492,7 @@ void list_remove_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
free(orig_value);
orig_value = terminated_value;
}
-
+
if (orig_value) {
new_value = evbuffer_new();
token = strtok(orig_value, ",");
@@ -524,9 +524,9 @@ void list_remove_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evbuffer_add_printf(evb, "%s,%s\n", key, (char *)EVBUFFER_DATA(new_value));
}
}
-
+
evbuffer_free(new_value);
-
+
} else {
if (echo_data) {
if (format == json_format) {
@@ -545,8 +545,8 @@ void list_remove_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
}
-/*
-return a txt format'd csv (key,value\n) reponse based on a forward match of the keys
+/*
+return a txt format'd csv (key,value\n) reponse based on a forward match of the keys
note: this makes a snapshot of the database and may return after other data has been added
*/
void dump_csv_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
@@ -585,7 +585,7 @@ void dump_csv_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evhttp_clear_headers(&args);
json_object_put(jsobj);
evhttp_send_reply_start(req, 200, "OK");
-
+
/* run the first dump loop */
do_dump_csv(0, 0, req);
}
@@ -624,7 +624,7 @@ void do_dump_csv(int fd, short what, void *ctx)
break;
}
}
-
+
// leveldb_iter_get_error(dump_iter, &err);
if (send_reply) {
32 simplequeue/simplequeue.c
View
@@ -102,7 +102,8 @@ void stats(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evhttp_clear_headers(&args);
}
-struct queue_entry *get_queue_entry() {
+struct queue_entry *get_queue_entry()
+{
struct queue_entry *entry;
entry = TAILQ_FIRST(&queues);
if (entry != NULL) {
@@ -174,7 +175,7 @@ void mget(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evhttp_clear_headers(&args);
}
-void put_queue_entry(const char *data, size_t record_size)
+void put_queue_entry(const char *data, size_t record_size)
{
struct queue_entry *entry;
@@ -193,8 +194,8 @@ void put_queue_entry(const char *data, size_t record_size)
if (depth > depth_high_water) {
depth_high_water = depth;
}
- while ((max_depth > 0 && depth > max_depth)
- || (max_bytes > 0 && n_bytes > max_bytes)) {
+ while ((max_depth > 0 && depth > max_depth)
+ || (max_bytes > 0 && n_bytes > max_bytes)) {
overflow_one();
}
}
@@ -209,7 +210,7 @@ void put(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
n_puts++;
// try to get the data from get first, then from post
- evhttp_parse_query(req->uri, &args);
+ evhttp_parse_query(req->uri, &args);
if ((data = evhttp_find_header(&args, "data")) != NULL) {
data_size = strlen(data);
} else if ((data_size = EVBUFFER_LENGTH(req->input_buffer)) > 0) {
@@ -241,11 +242,10 @@ void mput(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
size_t record_size = 0;
// try to get the data from get first, then from post
- evhttp_parse_query(req->uri, &args);
+ evhttp_parse_query(req->uri, &args);
if ((data = evhttp_find_header(&args, "data")) != NULL) {
data_size = strlen(data);
- }
- else if ((data_size = EVBUFFER_LENGTH(req->input_buffer)) > 0) {
+ } else if ((data_size = EVBUFFER_LENGTH(req->input_buffer)) > 0) {
data = (char *)EVBUFFER_DATA(req->input_buffer);
}
@@ -255,32 +255,32 @@ void mput(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
sep = evhttp_find_header(&args, "separator");
if (sep == NULL) {
sep = mput_item_sep;
- }
+ }
sep_size = strlen(sep);
record_start = data;
data_left = data_size;
- // loop through to find the next record but only up to the size of the
+ // loop through to find the next record but only up to the size of the
// post, the request input buffer can hold much more data, we only want
// the post part of it
while ((sep_start = simplehttp_strnstr(record_start, sep, data_left)) != NULL) {
// put each record on the queue
record_size = sep_start - record_start;
- if (record_size > 0) {
+ if (record_size > 0) {
if (record_size > data_left) {
- record_size = data_left;
- }
+ record_size = data_left;
+ }
put_queue_entry(record_start, record_size);
record_start = sep_start + sep_size;
data_left -= (record_size + sep_size);
- n_puts++;
+ n_puts++;
}
}
// any ending record
if (data_left > 0) {
put_queue_entry(record_start, data_left);
- n_puts++;
+ n_puts++;
}
evhttp_send_reply(req, HTTP_OK, "OK", evb);
@@ -365,7 +365,7 @@ int main(int argc, char **argv)
simplehttp_set_cb("/exit*", exit_cb, NULL);
simplehttp_main();
free_options();
-
+
if (overflow_log_fp) {
while (depth) {
overflow_one();
2  sortdb/sortdb.c
View
@@ -379,7 +379,7 @@ int main(int argc, char **argv)
option_define_bool("memory_lock", OPT_OPTIONAL, 0, NULL, NULL, "lock data file pages into memory");
option_define_char("field_separator", OPT_OPTIONAL, '\0', &deliminator, NULL, "field separator (eg: comma, tab, pipe). default: TAB");
option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
-
+
if (!option_parse_command_line(argc, argv)) {
return 1;
}
Please sign in to comment.
Something went wrong with that request. Please try again.