Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Add mget call to simplequeue #9

Merged
merged 1 commit into from Aug 18, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion simplequeue/SimpleQueue.py
Expand Up @@ -44,7 +44,18 @@ def get(self, timeout_ms=500):
except: except:
traceback.print_tb(sys.exc_info()[2]) traceback.print_tb(sys.exc_info()[2])
return False return False


def mget(self, timeout_ms=500, num_items=1, separator=None):
url = "http://%s:%d/mget?items=%d" % (self.address, self.port, num_items)
if separator:
url += "&separator=" + separator

try:
return self.request(url, timeout_ms)
except:
traceback.print_tb(sys.exc_info()[2])
return False

def dump(self, timeout_ms=500): def dump(self, timeout_ms=500):
url = "http://%s:%d/dump" % (self.address, self.port) url = "http://%s:%d/dump" % (self.address, self.port)


Expand Down
75 changes: 69 additions & 6 deletions simplequeue/simplequeue.c
Expand Up @@ -21,6 +21,8 @@ char *overflow_log = NULL;
FILE *overflow_log_fp = NULL; FILE *overflow_log_fp = NULL;
uint64_t max_depth = 0; uint64_t max_depth = 0;
size_t max_bytes = 0; size_t max_bytes = 0;
int max_mget = 0;
char *mget_item_sep = "\n";
uint64_t depth = 0; uint64_t depth = 0;
uint64_t depth_high_water = 0; uint64_t depth_high_water = 0;
uint64_t n_puts = 0; uint64_t n_puts = 0;
Expand Down Expand Up @@ -101,23 +103,80 @@ stats(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
evhttp_clear_headers(&args); evhttp_clear_headers(&args);
} }


struct queue_entry*
get_queue_entry()
{
struct queue_entry *entry;
entry = TAILQ_FIRST(&queues);
if (entry != NULL) {
TAILQ_REMOVE(&queues, entry, entries);
depth--;
}
return entry;
}

void void
get(struct evhttp_request *req, struct evbuffer *evb, void *ctx) get(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
{ {
struct queue_entry *entry; struct queue_entry *entry;

n_gets++; n_gets++;
entry = TAILQ_FIRST(&queues);
entry = get_queue_entry();
if (entry != NULL) { if (entry != NULL) {
evbuffer_add_printf(evb, "%s", entry->data); evbuffer_add_printf(evb, "%s", entry->data);
TAILQ_REMOVE(&queues, entry, entries);
free(entry); free(entry);
depth--;
} }


evhttp_send_reply(req, HTTP_OK, "OK", evb); evhttp_send_reply(req, HTTP_OK, "OK", evb);
} }


void
mget(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
{
struct evkeyvalq args;
const char *items_arg;
const char *separator;
struct queue_entry *entry;
int num_items = 1;
int i = 0;

// parse the number of items to return, defaults to 1
evhttp_parse_query(req->uri, &args);
items_arg = evhttp_find_header(&args, "items");

// if arg, must be > 0, it is constrained to max
if (items_arg != NULL) {
num_items = atoi(items_arg);
if (num_items <= 0) {
evbuffer_add_printf(evb, "%s\n", "number of items must be > 0");
evhttp_send_reply(req, HTTP_BADREQUEST, "ERROR", evb);
evhttp_clear_headers(&args);
return;
}
}
if (max_mget > 0 && num_items > max_mget) {
num_items = max_mget;
}

// allow dynamically setting separator for items, defaults to newline
separator = evhttp_find_header(&args, "separator");
if (separator == NULL) {
separator = mget_item_sep;
}

// get n number of items from the queue to return
for (i = 0; i < num_items && (entry = get_queue_entry()); n_gets++, i++) {
evbuffer_add_printf(evb, "%s", entry->data);
if (i < (num_items - 1)) {
evbuffer_add_printf(evb, "%s", separator);
}
free(entry);
}

evhttp_send_reply(req, HTTP_OK, "OK", evb);
evhttp_clear_headers(&args);
}

void void
put(struct evhttp_request *req, struct evbuffer *evb, void *ctx) put(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
{ {
Expand All @@ -131,7 +190,7 @@ put(struct evhttp_request *req, struct evbuffer *evb, void *ctx)
data = evhttp_find_header(&args, "data"); data = evhttp_find_header(&args, "data");
if (data == NULL) { if (data == NULL) {
evbuffer_add_printf(evb, "%s\n", "missing data"); evbuffer_add_printf(evb, "%s\n", "missing data");
evhttp_send_reply(req, HTTP_BADREQUEST, "OK", evb); evhttp_send_reply(req, HTTP_BADREQUEST, "ERROR", evb);
evhttp_clear_headers(&args); evhttp_clear_headers(&args);
return; return;
} }
Expand Down Expand Up @@ -187,16 +246,19 @@ main(int argc, char **argv)


define_simplehttp_options(); define_simplehttp_options();
option_define_str("overflow_log", OPT_OPTIONAL, NULL, &overflow_log, NULL, "file to write data beyond --max-depth or --max-bytes"); option_define_str("overflow_log", OPT_OPTIONAL, NULL, &overflow_log, NULL, "file to write data beyond --max-depth or --max-bytes");
option_define_str("mget_item_sep", OPT_OPTIONAL, "\n", &mget_item_sep, NULL, "separator between items in mget, defaults to newline");
// float? // float?
option_define_int("max_bytes", OPT_OPTIONAL, 0, NULL, NULL, "memory limit"); option_define_int("max_bytes", OPT_OPTIONAL, 0, NULL, NULL, "memory limit");
option_define_int("max_depth", OPT_OPTIONAL, 0, NULL, NULL, "maximum items in queue"); option_define_int("max_depth", OPT_OPTIONAL, 0, NULL, NULL, "maximum items in queue");
option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION); option_define_bool("version", OPT_OPTIONAL, 0, NULL, version_cb, VERSION);
option_define_int("max_mget", OPT_OPTIONAL, 0, NULL, NULL, "maximum items to return in a single mget");


if (!option_parse_command_line(argc, argv)){ if (!option_parse_command_line(argc, argv)){
return 1; return 1;
} }
max_bytes = (size_t)option_get_int("max_bytes"); max_bytes = (size_t)option_get_int("max_bytes");
max_depth = (uint64_t)option_get_int("max_depth"); max_depth = (uint64_t)option_get_int("max_depth");
max_mget = (int)option_get_int("max_mget");


if (overflow_log) { if (overflow_log) {
overflow_log_fp = fopen(overflow_log, "a"); overflow_log_fp = fopen(overflow_log, "a");
Expand All @@ -213,6 +275,7 @@ main(int argc, char **argv)
signal(SIGHUP, hup_handler); signal(SIGHUP, hup_handler);
simplehttp_set_cb("/put*", put, NULL); simplehttp_set_cb("/put*", put, NULL);
simplehttp_set_cb("/get*", get, NULL); simplehttp_set_cb("/get*", get, NULL);
simplehttp_set_cb("/mget*", mget, NULL);
simplehttp_set_cb("/dump*", dump, NULL); simplehttp_set_cb("/dump*", dump, NULL);
simplehttp_set_cb("/stats*", stats, NULL); simplehttp_set_cb("/stats*", stats, NULL);
simplehttp_main(); simplehttp_main();
Expand Down
26 changes: 26 additions & 0 deletions simplequeue/test_simplequeue.py
Expand Up @@ -26,6 +26,32 @@ def test_put_get(data):
d = c.get() d = c.get()
assert d == data assert d == data


def test_put_mget():
c = SimpleQueue(port=PORT, debug=True)
c.put('test1')
c.put('test2')
items = c.mget(num_items=10).splitlines()
assert len(items) == 2
assert items[0] == 'test1'
assert items[1] == 'test2'
c.put('test3')
c.put('test4')
items = c.mget().splitlines()
assert len(items) == 1
assert items[0] == 'test3'
assert c.get() == 'test4'

def test_mget_badindex():
c = SimpleQueue(port=PORT, debug=True)
msg = c.mget(num_items=-2)
assert msg == 'number of items must be > 0\n'
msg2 = c.mget(num_items=0)
assert msg2 == 'number of items must be > 0\n'
c.put('test1')
items = c.mget().splitlines()
assert len(items) == 1
assert items[0] == 'test1'

def test_order(): def test_order():
# first thing put in, should be first thing out # first thing put in, should be first thing out
c = SimpleQueue(port=PORT, debug=True) c = SimpleQueue(port=PORT, debug=True)
Expand Down