Skip to content

simpleleveldb dump_csv robustness against clients fixes #43

Merged
merged 5 commits into from Apr 11, 2012
View
161 simpleleveldb/http-internal.h
@@ -0,0 +1,161 @@
+/*
+NOTE: this is included copyied from libevent-1.4.13 with the addition
+of a definition for socklen_t so that we can give statistics on the
+client connection outgoing buffer size
+*/
+
+/*
+ * Copyright 2001 Niels Provos <provos@citi.umich.edu>
+ * All rights reserved.
+ *
+ * This header file contains definitions for dealing with HTTP requests
+ * that are internal to libevent. As user of the library, you should not
+ * need to know about these.
+ */
+
+#ifndef _HTTP_H_
+#define _HTTP_H_
+
+#define HTTP_CONNECT_TIMEOUT 45
+#define HTTP_WRITE_TIMEOUT 50
+#define HTTP_READ_TIMEOUT 50
+
+#define HTTP_PREFIX "http://"
+#define HTTP_DEFAULTPORT 80
+#define socklen_t unsigned int
+
+enum message_read_status {
+ ALL_DATA_READ = 1,
+ MORE_DATA_EXPECTED = 0,
+ DATA_CORRUPTED = -1,
+ REQUEST_CANCELED = -2
+};
+
+enum evhttp_connection_error {
+ EVCON_HTTP_TIMEOUT,
+ EVCON_HTTP_EOF,
+ EVCON_HTTP_INVALID_HEADER
+};
+
+struct evbuffer;
+struct addrinfo;
+struct evhttp_request;
+
+/* A stupid connection object - maybe make this a bufferevent later */
+
+enum evhttp_connection_state {
+ EVCON_DISCONNECTED, /**< not currently connected not trying either*/
+ EVCON_CONNECTING, /**< tries to currently connect */
+ EVCON_IDLE, /**< connection is established */
+ EVCON_READING_FIRSTLINE,/**< reading Request-Line (incoming conn) or
+ **< Status-Line (outgoing conn) */
+ EVCON_READING_HEADERS, /**< reading request/response headers */
+ EVCON_READING_BODY, /**< reading request/response body */
+ EVCON_READING_TRAILER, /**< reading request/response chunked trailer */
+ EVCON_WRITING /**< writing request/response headers/body */
+};
+
+struct event_base;
+
+struct evhttp_connection {
+ /* we use tailq only if they were created for an http server */
+ TAILQ_ENTRY(evhttp_connection) (next);
+
+ int fd;
+ struct event ev;
+ struct event close_ev;
+ struct evbuffer *input_buffer;
+ struct evbuffer *output_buffer;
+
+ char *bind_address; /* address to use for binding the src */
+ u_short bind_port; /* local port for binding the src */
+
+ char *address; /* address to connect to */
+ u_short port;
+
+ int flags;
+#define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */
+#define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */
+#define EVHTTP_CON_CLOSEDETECT 0x0004 /* detecting if persistent close */
+
+ int timeout; /* timeout in seconds for events */
+ int retry_cnt; /* retry count */
+ int retry_max; /* maximum number of retries */
+
+ enum evhttp_connection_state state;
+
+ /* for server connections, the http server they are connected with */
+ struct evhttp *http_server;
+
+ TAILQ_HEAD(evcon_requestq, evhttp_request) requests;
+
+ void (*cb)(struct evhttp_connection *, void *);
+ void *cb_arg;
+
+ void (*closecb)(struct evhttp_connection *, void *);
+ void *closecb_arg;
+
+ struct event_base *base;
+};
+
+struct evhttp_cb {
+ TAILQ_ENTRY(evhttp_cb) next;
+
+ char *what;
+
+ void (*cb)(struct evhttp_request *req, void *);
+ void *cbarg;
+};
+
+/* both the http server as well as the rpc system need to queue connections */
+TAILQ_HEAD(evconq, evhttp_connection);
+
+/* each bound socket is stored in one of these */
+struct evhttp_bound_socket {
+ TAILQ_ENTRY(evhttp_bound_socket) (next);
+
+ struct event bind_ev;
+};
+
+struct evhttp {
+ TAILQ_HEAD(boundq, evhttp_bound_socket) sockets;
+
+ TAILQ_HEAD(httpcbq, evhttp_cb) callbacks;
+ struct evconq connections;
+
+ int timeout;
+
+ void (*gencb)(struct evhttp_request *req, void *);
+ void *gencbarg;
+
+ struct event_base *base;
+};
+
+/* resets the connection; can be reused for more requests */
+void evhttp_connection_reset(struct evhttp_connection *);
+
+/* connects if necessary */
+int evhttp_connection_connect(struct evhttp_connection *);
+
+/* notifies the current request that it failed; resets connection */
+void evhttp_connection_fail(struct evhttp_connection *,
+ enum evhttp_connection_error error);
+
+void evhttp_get_request(struct evhttp *, int, struct sockaddr *, socklen_t);
+
+int evhttp_hostportfile(char *, char **, u_short *, char **);
+
+int evhttp_parse_firstline(struct evhttp_request *, struct evbuffer*);
+int evhttp_parse_headers(struct evhttp_request *, struct evbuffer*);
+
+void evhttp_start_read(struct evhttp_connection *);
+void evhttp_make_header(struct evhttp_connection *, struct evhttp_request *);
+
+void evhttp_write_buffer(struct evhttp_connection *,
+ void (*)(struct evhttp_connection *, void *), void *);
+
+/* response sending HTML the data in the buffer */
+void evhttp_response_code(struct evhttp_request *, int, const char *);
+void evhttp_send_page(struct evhttp_request *, struct evbuffer *);
+
+#endif /* _HTTP_H */
View
38 simpleleveldb/simpleleveldb.c
@@ -10,12 +10,16 @@
#include <json/json.h>
#include <leveldb/c.h>
+#include <sys/socket.h>
+#include "http-internal.h"
+
#define NAME "simpleleveldb"
-#define VERSION "0.4"
+#define VERSION "0.5"
#define DUMP_CSV_ITERS_CHECK 10
#define DUMP_CSV_MSECS_WORK 10
#define DUMP_CSV_MSECS_SLEEP 100
+#define DUMP_CSV_MAX_BUFFER (8*1024*1024)
void finalize_request(int response_code, char *error, struct evhttp_request *req, struct evbuffer *evb, struct evkeyvalq *args, struct json_object *jsobj);
int db_open();
@@ -32,6 +36,7 @@ void list_remove_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
void dump_csv_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx);
void do_dump_csv(int fd, short what, void *ctx);
void set_dump_csv_timer(struct evhttp_request *req);
+void cleanup_dump_csv_cb(struct evhttp_connection *evcon, void *arg);
leveldb_t *ldb;
leveldb_options_t *ldb_options;
@@ -573,6 +578,7 @@ void dump_csv_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
free(dump_fwmatch_key);
return;
}
+ is_currently_dumping = 1;
/* init the state for dumping data */
dump_read_options = leveldb_readoptions_create();
@@ -589,6 +595,7 @@ void dump_csv_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evhttp_clear_headers(&args);
json_object_put(jsobj);
evhttp_send_reply_start(req, 200, "OK");
+ evhttp_connection_set_closecb(req->evcon, cleanup_dump_csv_cb, NULL);
/* run the first dump loop */
do_dump_csv(0, 0, req);
@@ -602,10 +609,20 @@ void do_dump_csv(int fd, short what, void *ctx)
const char *key, *value;
size_t key_len, value_len;
struct timeval time_start, time_now;
+ struct evhttp_connection *evcon;
+ unsigned long output_buffer_length;
gettimeofday(&time_start, NULL);
evb = req->output_buffer;
+ // if backed up, continue later
+ evcon = (struct evhttp_connection *)req->evcon;
+ output_buffer_length = evcon->output_buffer ? (unsigned long)EVBUFFER_LENGTH(evcon->output_buffer) : 0;
+ if (output_buffer_length > DUMP_CSV_MAX_BUFFER) {
+ set_dump_csv_timer(req);
+ return;
+ }
+
while (leveldb_iter_valid(dump_iter)) {
key = leveldb_iter_key(dump_iter, &key_len);
if (dump_fwmatch_key) {
@@ -627,8 +644,8 @@ void do_dump_csv(int fd, short what, void *ctx)
if (c == DUMP_CSV_ITERS_CHECK) {
int64_t usecs;
gettimeofday(&time_now, NULL);
- usecs = ((int64_t)time_now.tv_sec * 1000000 + time_now.tv_usec )
- - ((int64_t)time_start.tv_sec * 1000000 + time_start.tv_usec);
+ usecs = 0 + ((int64_t)time_now .tv_sec * 1000000 + time_now .tv_usec)
@mreiferson
mreiferson added a note Apr 11, 2012

lol is all I really have to say about this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ - ((int64_t)time_start.tv_sec * 1000000 + time_start.tv_usec);
if (usecs > DUMP_CSV_MSECS_WORK * 1000) {
set_timer = 1;
break;
@@ -647,11 +664,7 @@ void do_dump_csv(int fd, short what, void *ctx)
set_dump_csv_timer(req);
} else {
evhttp_send_reply_end(req);
- is_currently_dumping = 0;
- leveldb_iter_destroy(dump_iter);
- leveldb_readoptions_destroy(dump_read_options);
- leveldb_release_snapshot(ldb, dump_snapshot);
- free(dump_fwmatch_key);
+ // cleanup_cump_csv_cb() automatically called
}
}
@@ -664,6 +677,15 @@ void set_dump_csv_timer(struct evhttp_request *req)
evtimer_add(&dump_ev, &tv);
}
+void cleanup_dump_csv_cb(struct evhttp_connection *evcon, void *arg)
+{
+ evtimer_del(&dump_ev);
+ leveldb_iter_destroy(dump_iter);
+ leveldb_readoptions_destroy(dump_read_options);
+ leveldb_release_snapshot(ldb, dump_snapshot);
+ free(dump_fwmatch_key);
+ is_currently_dumping = 0;
+}
void stats_cb(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
Something went wrong with that request. Please try again.