Permalink
Browse files

connection stats that include the size of outgoing buffers for connec…

…ted pubsub clients
  • Loading branch information...
jehiah committed Dec 22, 2010
1 parent a8173e2 commit ac22e1f8650f9a0d75f0d795501656ffe61ac137
Showing with 207 additions and 15 deletions.
  1. +161 −0 pubsub/http-internal.h
  2. +46 −15 pubsub/pubsub.c
View
@@ -0,0 +1,161 @@
+/*
+NOTE: this is included copyied from libevent-1.4.13 with the addition
+of a definition for socklen_t so that we can give statistics on the
+client connection outgoing buffer size
+*/
+
+/*
+ * Copyright 2001 Niels Provos <provos@citi.umich.edu>
+ * All rights reserved.
+ *
+ * This header file contains definitions for dealing with HTTP requests
+ * that are internal to libevent. As user of the library, you should not
+ * need to know about these.
+ */
+
+#ifndef _HTTP_H_
+#define _HTTP_H_
+
+#define HTTP_CONNECT_TIMEOUT 45
+#define HTTP_WRITE_TIMEOUT 50
+#define HTTP_READ_TIMEOUT 50
+
+#define HTTP_PREFIX "http://"
+#define HTTP_DEFAULTPORT 80
+#define socklen_t unsigned int
+
+enum message_read_status {
+ ALL_DATA_READ = 1,
+ MORE_DATA_EXPECTED = 0,
+ DATA_CORRUPTED = -1,
+ REQUEST_CANCELED = -2
+};
+
+enum evhttp_connection_error {
+ EVCON_HTTP_TIMEOUT,
+ EVCON_HTTP_EOF,
+ EVCON_HTTP_INVALID_HEADER
+};
+
+struct evbuffer;
+struct addrinfo;
+struct evhttp_request;
+
+/* A stupid connection object - maybe make this a bufferevent later */
+
+enum evhttp_connection_state {
+ EVCON_DISCONNECTED, /**< not currently connected not trying either*/
+ EVCON_CONNECTING, /**< tries to currently connect */
+ EVCON_IDLE, /**< connection is established */
+ EVCON_READING_FIRSTLINE,/**< reading Request-Line (incoming conn) or
+ **< Status-Line (outgoing conn) */
+ EVCON_READING_HEADERS, /**< reading request/response headers */
+ EVCON_READING_BODY, /**< reading request/response body */
+ EVCON_READING_TRAILER, /**< reading request/response chunked trailer */
+ EVCON_WRITING /**< writing request/response headers/body */
+};
+
+struct event_base;
+
+struct evhttp_connection {
+ /* we use tailq only if they were created for an http server */
+ TAILQ_ENTRY(evhttp_connection) (next);
+
+ int fd;
+ struct event ev;
+ struct event close_ev;
+ struct evbuffer *input_buffer;
+ struct evbuffer *output_buffer;
+
+ char *bind_address; /* address to use for binding the src */
+ u_short bind_port; /* local port for binding the src */
+
+ char *address; /* address to connect to */
+ u_short port;
+
+ int flags;
+#define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */
+#define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */
+#define EVHTTP_CON_CLOSEDETECT 0x0004 /* detecting if persistent close */
+
+ int timeout; /* timeout in seconds for events */
+ int retry_cnt; /* retry count */
+ int retry_max; /* maximum number of retries */
+
+ enum evhttp_connection_state state;
+
+ /* for server connections, the http server they are connected with */
+ struct evhttp *http_server;
+
+ TAILQ_HEAD(evcon_requestq, evhttp_request) requests;
+
+ void (*cb)(struct evhttp_connection *, void *);
+ void *cb_arg;
+
+ void (*closecb)(struct evhttp_connection *, void *);
+ void *closecb_arg;
+
+ struct event_base *base;
+};
+
+struct evhttp_cb {
+ TAILQ_ENTRY(evhttp_cb) next;
+
+ char *what;
+
+ void (*cb)(struct evhttp_request *req, void *);
+ void *cbarg;
+};
+
+/* both the http server as well as the rpc system need to queue connections */
+TAILQ_HEAD(evconq, evhttp_connection);
+
+/* each bound socket is stored in one of these */
+struct evhttp_bound_socket {
+ TAILQ_ENTRY(evhttp_bound_socket) (next);
+
+ struct event bind_ev;
+};
+
+struct evhttp {
+ TAILQ_HEAD(boundq, evhttp_bound_socket) sockets;
+
+ TAILQ_HEAD(httpcbq, evhttp_cb) callbacks;
+ struct evconq connections;
+
+ int timeout;
+
+ void (*gencb)(struct evhttp_request *req, void *);
+ void *gencbarg;
+
+ struct event_base *base;
+};
+
+/* resets the connection; can be reused for more requests */
+void evhttp_connection_reset(struct evhttp_connection *);
+
+/* connects if necessary */
+int evhttp_connection_connect(struct evhttp_connection *);
+
+/* notifies the current request that it failed; resets connection */
+void evhttp_connection_fail(struct evhttp_connection *,
+ enum evhttp_connection_error error);
+
+void evhttp_get_request(struct evhttp *, int, struct sockaddr *, socklen_t);
+
+int evhttp_hostportfile(char *, char **, u_short *, char **);
+
+int evhttp_parse_firstline(struct evhttp_request *, struct evbuffer*);
+int evhttp_parse_headers(struct evhttp_request *, struct evbuffer*);
+
+void evhttp_start_read(struct evhttp_connection *);
+void evhttp_make_header(struct evhttp_connection *, struct evhttp_request *);
+
+void evhttp_write_buffer(struct evhttp_connection *,
+ void (*)(struct evhttp_connection *, void *), void *);
+
+/* response sending HTML the data in the buffer */
+void evhttp_response_code(struct evhttp_request *, int, const char *);
+void evhttp_send_page(struct evhttp_request *, struct evbuffer *);
+
+#endif /* _HTTP_H */
View
@@ -1,8 +1,10 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
+#include <time.h>
#include "queue.h"
#include "simplehttp.h"
+#include "http-internal.h"
#define BUFSZ 1024
#define BOUNDARY "xXPubSubXx"
@@ -12,16 +14,18 @@ int ps_debug = 0;
typedef struct cli {
int multipart;
int websocket;
+ uint64_t connection_id;
+ time_t connect_time;
struct evbuffer *buf;
struct evhttp_request *req;
TAILQ_ENTRY(cli) entries;
} cli;
TAILQ_HEAD(, cli) clients;
-uint32_t totalConns = 0;
-uint32_t currentConns = 0;
-uint32_t msgRecv = 0;
-uint32_t msgSent = 0;
+uint64_t totalConns = 0;
+uint64_t currentConns = 0;
+uint64_t msgRecv = 0;
+uint64_t msgSent = 0;
void
@@ -40,9 +44,26 @@ void
clients_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
{
struct cli *client;
+ struct tm *time_struct;
+ char buf[248];
+ unsigned long output_buffer_length;
+ struct evhttp_connection *evcon;
+ if (TAILQ_EMPTY(&clients)) {
+ evbuffer_add_printf(evb, "no /sub connections\n");
+ }
TAILQ_FOREACH(client, &clients, entries) {
- evbuffer_add_printf(evb, "%s:%d\n", client->req->remote_host, client->req->remote_port);
+ evcon = (struct evhttp_connection *)client->req->evcon;
+
+ time_struct = gmtime(&client->connect_time);
+ strftime(buf, 248, "%Y-%m-%d %H:%M:%S", time_struct);
+ output_buffer_length = (unsigned long)EVBUFFER_LENGTH(evcon->output_buffer);
+ evbuffer_add_printf(evb, "%s:%d connected at %s. output buffer size:%lu state:%d\n",
+ client->req->remote_host,
+ client->req->remote_port,
+ buf,
+ output_buffer_length,
+ (int)evcon->state);
}
evhttp_send_reply(req, HTTP_OK, "OK", evb);
@@ -68,14 +89,12 @@ stats_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
sprintf(buf, "%d", msgSent);
evhttp_add_header(req->output_headers, "X-PUBSUB-MESSAGES-SENT", buf);
- evbuffer_add_printf(evb, "Active connections: %d\nTotal connections: %d\n"
- "Messages received: %d\nMessages sent: %d\n",
+ evbuffer_add_printf(evb, "Active connections: %lu\nTotal connections: %lu\n"
+ "Messages received: %lu\nMessages sent: %lu\n",
currentConns, totalConns, msgRecv, msgSent);
reset = (char *)evhttp_find_header(&args, "reset");
if (reset) {
- totalConns = 0;
- currentConns = 0;
msgRecv = 0;
msgSent = 0;
}
@@ -89,10 +108,13 @@ void on_close(struct evhttp_connection *evcon, void *ctx)
struct cli *client = (struct cli *)ctx;
if (client) {
+ fprintf(stdout, "%lu >> close from %s:%d\n", client->connection_id, evcon->address, evcon->port);
currentConns--;
TAILQ_REMOVE(&clients, client, entries);
evbuffer_free(client->buf);
free(client);
+ } else {
+ fprintf(stdout, "[unknown] >> close from %s:%d\n", evcon->address, evcon->port);
}
}
@@ -146,6 +168,7 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
char *ws_upgrade;
char *host;
char buf[248];
+ struct tm *time_struct;
currentConns++;
totalConns++;
@@ -155,22 +178,30 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
client = calloc(1, sizeof(*client));
argtoi(&args, "multipart", &client->multipart, 1);
client->req = req;
+ client->connection_id = totalConns;
+ client->connect_time = time(NULL);
+ time_struct = gmtime(&client->connect_time);
client->buf = evbuffer_new();
+ strftime(buf, 248, "%Y-%m-%d %H:%M:%S", time_struct);
+
+ // print out info about this connection
+ fprintf(stdout, "%lu >> /sub connection from %s:%d %s\n", client->connection_id, req->remote_host, req->remote_port, buf);
+
// Connection: Upgrade
// Upgrade: WebSocket
ws_upgrade = (char *) evhttp_find_header(req->input_headers, "Upgrade");
ws_origin = (char *) evhttp_find_header(req->input_headers, "Origin");
host = (char *) evhttp_find_header(req->input_headers, "Host");
if (ps_debug && ws_upgrade) {
- fprintf(stderr, "upgrade header is %s\n", ws_upgrade);
- fprintf(stderr, "multipart is %d\n", client->multipart);
+ fprintf(stderr, "%lu >> upgrade header is %s\n", client->connection_id, ws_upgrade);
+ fprintf(stderr, "%lu >> multipart is %d\n", client->connection_id, client->multipart);
}
if (ws_upgrade && strstr(ws_upgrade, "WebSocket") != NULL) {
if (ps_debug) {
- fprintf(stderr, "upgrading connection to a websocket\n");
+ fprintf(stderr, "%lu >> upgrading connection to a websocket\n", client->connection_id);
}
client->websocket = 1;
client->req->major = 1;
@@ -184,7 +215,7 @@ void sub_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
if (host) {
sprintf(buf, "ws://%s%s", host, req->uri);
if (ps_debug) {
- fprintf(stderr, "setting WebSocket-Location to %s\n", buf);
+ fprintf(stderr, "%lu >> setting WebSocket-Location to %s\n", client->connection_id, buf);
}
evhttp_add_header(client->req->output_headers, "WebSocket-Location", buf);
}
@@ -217,8 +248,8 @@ main(int argc, char **argv)
simplehttp_init();
simplehttp_set_cb("/pub*", pub_cb, NULL);
simplehttp_set_cb("/sub*", sub_cb, NULL);
- simplehttp_set_cb("/stats*", stats_cb, NULL);
- simplehttp_set_cb("/clients*", clients_cb, NULL);
+ simplehttp_set_cb("/stats", stats_cb, NULL);
+ simplehttp_set_cb("/clients", clients_cb, NULL);
simplehttp_main(argc, argv);
return 0;

0 comments on commit ac22e1f

Please sign in to comment.