diff --git a/conn.c b/conn.c index db959be2..9c5a60cd 100644 --- a/conn.c +++ b/conn.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "conn.h" #include "net.h" @@ -84,6 +85,7 @@ make_conn(int fd, char start_state, tube use, tube watch) c->state = start_state; c->type = 0; c->cmd_read = 0; + c->pending_timeout = -1; c->in_job = c->out_job = NULL; c->in_job_read = c->out_job_sent = 0; c->prev = c->next = c; /* must be out of a linked list right now */ @@ -146,8 +148,8 @@ has_reserved_job(conn c) int conn_set_evq(conn c, const int events, evh handler) { - int r, margin = 0; - struct timeval tv = {0, 0}; + int r, margin = 0, should_timeout=0; + struct timeval tv = {INT_MAX, 0}; event_set(&c->evq, c->fd, events, handler, c); @@ -155,9 +157,14 @@ conn_set_evq(conn c, const int events, evh handler) if (has_reserved_job(c)) { time_t t = soonest_job(c)->deadline - time(NULL) - margin; tv.tv_sec = t > 0 ? t : 0; + should_timeout = 1; + } + if (c->pending_timeout >= 0) { + tv.tv_sec = min(tv.tv_sec, c->pending_timeout); + should_timeout = 1; } - r = event_add(&c->evq, has_reserved_job(c) ? &tv : NULL); + r = event_add(&c->evq, should_timeout ? &tv : NULL); if (r == -1) return twarn("event_add() err %d", errno), -1; return 0; diff --git a/conn.h b/conn.h index b1af7638..3d7d4c4a 100644 --- a/conn.h +++ b/conn.h @@ -44,6 +44,7 @@ struct conn { char state; char type; struct event evq; + int pending_timeout; /* we cannot share this buffer with the reply line because we might read in * command line data for a subsequent command, and we need to store it diff --git a/doc/protocol.txt b/doc/protocol.txt index 2087d690..e2550287 100644 --- a/doc/protocol.txt +++ b/doc/protocol.txt @@ -194,6 +194,10 @@ A process that wants to consume jobs from the queue uses "reserve", "delete", reserve\r\n +Alternatively, you can specify a timeout as follows: + +reserve-with-timeout \r\n + This will return a newly-reserved job. If no job is available to be reserved, beanstalkd will wait to send a response until one becomes available. Once a job is reserved for the client, the client has limited time to run (TTR) the @@ -201,6 +205,11 @@ job before the job times out. When the job times out, the server will put the job back into the ready queue. Both the TTR and the actual time left can be found in response to the stats-job command. +A timeout value of 0 will cause the server to immediately return either a +response or TIMED_OUT. A positive value of timeout will limit the amount of +time the client will block on the reserve request until a job becomes +available. + During the TTR of a reserved job, the last second is kept by the server as a safety margin, during which the client will not be made to wait for another job. If the client issues a reserve command during the safety margin, or if @@ -212,8 +221,13 @@ DEADLINE_SOON\r\n This gives the client a chance to delete or release its reserved job before the server automatically releases it. -Otherwise, the only response to this command is a successful reservation in -the form of a text line followed by the job body: +TIMED_OUT\r\n + +If a non-negative timeout was specified and the timeout exceeded before a job +became available, the server will respond with TIMED_OUT. + +Otherwise, the only other response to this command is a successful reservation +in the form of a text line followed by the job body: RESERVED \r\n \r\n diff --git a/prot.c b/prot.c index e4a9e16c..65f053dc 100644 --- a/prot.c +++ b/prot.c @@ -50,6 +50,7 @@ size_t job_data_size_limit = ((1 << 16) - 1); #define CMD_PEEK_DELAYED "peek-delayed" #define CMD_PEEK_BURIED "peek-buried" #define CMD_RESERVE "reserve" +#define CMD_RESERVE_TIMEOUT "reserve-with-timeout " #define CMD_DELETE "delete " #define CMD_RELEASE "release " #define CMD_BURY "bury " @@ -71,6 +72,7 @@ size_t job_data_size_limit = ((1 << 16) - 1); #define CMD_PEEK_BURIED_LEN CONSTSTRLEN(CMD_PEEK_BURIED) #define CMD_PEEKJOB_LEN CONSTSTRLEN(CMD_PEEKJOB) #define CMD_RESERVE_LEN CONSTSTRLEN(CMD_RESERVE) +#define CMD_RESERVE_TIMEOUT_LEN CONSTSTRLEN(CMD_RESERVE_TIMEOUT) #define CMD_DELETE_LEN CONSTSTRLEN(CMD_DELETE) #define CMD_RELEASE_LEN CONSTSTRLEN(CMD_RELEASE) #define CMD_BURY_LEN CONSTSTRLEN(CMD_BURY) @@ -89,6 +91,7 @@ size_t job_data_size_limit = ((1 << 16) - 1); #define MSG_NOTFOUND "NOT_FOUND\r\n" #define MSG_RESERVED "RESERVED" #define MSG_DEADLINE_SOON "DEADLINE_SOON\r\n" +#define MSG_TIMED_OUT "TIMED_OUT\r\n" #define MSG_DELETED "DELETED\r\n" #define MSG_RELEASED "RELEASED\r\n" #define MSG_BURIED "BURIED\r\n" @@ -137,7 +140,8 @@ size_t job_data_size_limit = ((1 << 16) - 1); #define OP_STATS_TUBE 17 #define OP_PEEK_READY 18 #define OP_PEEK_DELAYED 19 -#define TOTAL_OPS 20 +#define OP_RESERVE_TIMEOUT 20 +#define TOTAL_OPS 21 #define STATS_FMT "---\n" \ "current-jobs-urgent: %u\n" \ @@ -151,6 +155,7 @@ size_t job_data_size_limit = ((1 << 16) - 1); "cmd-peek-delayed: %llu\n" \ "cmd-peek-buried: %llu\n" \ "cmd-reserve: %llu\n" \ + "cmd-reserve-with-timeout: %llu\n" \ "cmd-delete: %llu\n" \ "cmd-release: %llu\n" \ "cmd-use: %llu\n" \ @@ -704,6 +709,7 @@ which_cmd(conn c) TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY); TEST_CMD(c->cmd, CMD_PEEK_DELAYED, OP_PEEK_DELAYED); TEST_CMD(c->cmd, CMD_PEEK_BURIED, OP_PEEK_BURIED); + TEST_CMD(c->cmd, CMD_RESERVE_TIMEOUT, OP_RESERVE_TIMEOUT); TEST_CMD(c->cmd, CMD_RESERVE, OP_RESERVE); TEST_CMD(c->cmd, CMD_DELETE, OP_DELETE); TEST_CMD(c->cmd, CMD_RELEASE, OP_RELEASE); @@ -809,6 +815,7 @@ fmt_stats(char *buf, size_t size, void *x) op_ct[OP_PEEK_DELAYED], op_ct[OP_PEEK_BURIED], op_ct[OP_RESERVE], + op_ct[OP_RESERVE_TIMEOUT], op_ct[OP_DELETE], op_ct[OP_RELEASE], op_ct[OP_USE], @@ -882,13 +889,16 @@ read_ttr(unsigned int *ttr, const char *buf, char **end) } static void -wait_for_job(conn c) +wait_for_job(conn c, int timeout) { int r; c->state = STATE_WAIT; enqueue_waiting_conn(c); + /* Set the pending timeout to the requested timeout amount */ + c->pending_timeout = timeout; + /* this conn is waiting, but we want to know if they hang up */ r = conn_update_evq(c, EV_READ | EV_PERSIST); if (r == -1) return twarnx("update events failed"), conn_close(c); @@ -1068,7 +1078,7 @@ find_or_make_tube(const char *name) static void dispatch_cmd(conn c) { - int r, i; + int r, i, timeout = -1; unsigned int count; job j; unsigned char type; @@ -1189,9 +1199,13 @@ dispatch_cmd(conn c) reply_job(c, j, MSG_FOUND); break; - case OP_RESERVE: + case OP_RESERVE_TIMEOUT: + errno = 0; + timeout = strtol(c->cmd + CMD_RESERVE_TIMEOUT_LEN, &end_buf, 10); + if (errno) return reply_msg(c, MSG_BAD_FORMAT); + case OP_RESERVE: /* FALLTHROUGH */ /* don't allow trailing garbage */ - if (c->cmd_len != CMD_RESERVE_LEN + 2) { + if (type == OP_RESERVE && c->cmd_len != CMD_RESERVE_LEN + 2) { return reply_msg(c, MSG_BAD_FORMAT); } @@ -1203,7 +1217,7 @@ dispatch_cmd(conn c) } /* try to get a new job for this guy */ - wait_for_job(c); + wait_for_job(c, timeout); process_queue(); break; case OP_DELETE: @@ -1415,6 +1429,10 @@ h_conn_timeout(conn c) if (should_timeout) { dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c)); return reply_msg(remove_waiting_conn(c), MSG_DEADLINE_SOON); + } else if (conn_waiting(c) && c->pending_timeout >= 0) { + dprintf("conn_waiting(%p) = %d\n", c, conn_waiting(c)); + c->pending_timeout=-1; + return reply_msg(remove_waiting_conn(c), MSG_TIMED_OUT); } }