Skip to content

Commit

Permalink
fix response for the command that is too long
Browse files Browse the repository at this point in the history
Before the patch, if a client sent too long command, server would
respond by BAD_FORMAT, followed by UNKNOWN_COMMAND. This resulted
in desync between client and server.

This patch adds another state STATE_WANT_ENDLINE,
whose purpose is to drop command line until the EOL is found.
Client can send as big command as he want and server will be able
to skip it and return only one error message: BAD_FORMAT.

Some STATE_* constants were renamed to improve readability.

Fixes #337
  • Loading branch information
ysmolski committed Sep 9, 2019
1 parent fae6722 commit d18aa23
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 44 deletions.
3 changes: 2 additions & 1 deletion doc/protocol.txt
Expand Up @@ -42,7 +42,8 @@ the server will reply with one of the following error messages:
http://groups.google.com/group/beanstalk-talk.

- "BAD_FORMAT\r\n" The client sent a command line that was not well-formed.
This can happen if the line does not end with \r\n, if non-numeric
This can happen if the line's length exceeds 224 bytes including \r\n,
if the name of a tube exceeds 200 bytes, if non-numeric
characters occur where an integer is expected, if the wrong number of
arguments are present, or if the command line is mal-formed in any other
way.
Expand Down
109 changes: 68 additions & 41 deletions prot.c
Expand Up @@ -96,13 +96,14 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
#define MSG_JOB_TOO_BIG "JOB_TOO_BIG\r\n"

// Connection can be in one of these states:
#define STATE_WANTCOMMAND 0 // conn expects a command from the client
#define STATE_WANTDATA 1 // conn expects a job data
#define STATE_SENDJOB 2 // conn sends job to the client
#define STATE_SENDWORD 3 // conn sends a line reply
#define STATE_WAIT 4 // client awaits for the job reservation
#define STATE_BITBUCKET 5 // conn discards content
#define STATE_CLOSE 6 // conn should be closed
#define STATE_WANT_COMMAND 0 // conn expects a command from the client
#define STATE_WANT_DATA 1 // conn expects a job data
#define STATE_SEND_JOB 2 // conn sends job to the client
#define STATE_SEND_WORD 3 // conn sends a line reply
#define STATE_WAIT 4 // client awaits for the job reservation
#define STATE_BITBUCKET 5 // conn discards content
#define STATE_CLOSE 6 // conn should be closed
#define STATE_WANT_ENDLINE 7 // skip until the end of a line

#define OP_UNKNOWN 0
#define OP_PUT 1
Expand Down Expand Up @@ -326,7 +327,7 @@ epollq_apply()
}

#define reply_msg(c, m) \
reply((c), (m), CONSTSTRLEN(m), STATE_SENDWORD)
reply((c), (m), CONSTSTRLEN(m), STATE_SEND_WORD)

#define reply_serr(c, e) \
(twarnx("server error: %s", (e)), reply_msg((c), (e)))
Expand Down Expand Up @@ -380,7 +381,7 @@ reply_job(Conn *c, Job *j, const char *msg)
{
c->out_job = j;
c->out_job_sent = 0;
reply_line(c, STATE_SENDJOB, "%s %"PRIu64" %u\r\n",
reply_line(c, STATE_SEND_JOB, "%s %"PRIu64" %u\r\n",
msg, j->r.id, j->r.body_size - 2);
}

Expand Down Expand Up @@ -839,7 +840,7 @@ _skip(Conn *c, int64 n, char *msg, int msglen)
fill_extra_data(c);

if (c->in_job_read == 0) {
reply(c, msg, msglen, STATE_SENDWORD);
reply(c, msg, msglen, STATE_SEND_WORD);
return;
}

Expand Down Expand Up @@ -898,13 +899,13 @@ enqueue_incoming_job(Conn *c)
j->tube->stat.total_jobs_ct++;

if (r == 1) {
reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j->r.id);
reply_line(c, STATE_SEND_WORD, MSG_INSERTED_FMT, j->r.id);
return;
}

/* out of memory trying to grow the queue, so it gets buried */
bury_job(c->srv, j, 0);
reply_line(c, STATE_SENDWORD, MSG_BURIED_FMT, j->r.id);
reply_line(c, STATE_SEND_WORD, MSG_BURIED_FMT, j->r.id);
}

static uint
Expand Down Expand Up @@ -1124,7 +1125,7 @@ do_stats(Conn *c, fmt_fn fmt, void *data)
}

c->out_job_sent = 0;
reply_line(c, STATE_SENDJOB, "OK %d\r\n", r - 2);
reply_line(c, STATE_SEND_JOB, "OK %d\r\n", r - 2);
}

static void
Expand Down Expand Up @@ -1161,7 +1162,7 @@ do_list_tubes(Conn *c, Ms *l)
buf[1] = '\n';

c->out_job_sent = 0;
reply_line(c, STATE_SENDJOB, "OK %zu\r\n", resp_z - 2);
reply_line(c, STATE_SEND_JOB, "OK %zu\r\n", resp_z - 2);
}

static int
Expand Down Expand Up @@ -1236,7 +1237,7 @@ maybe_enqueue_incoming_job(Conn *c)
}

/* otherwise we have incomplete data, so just keep waiting */
c->state = STATE_WANTDATA;
c->state = STATE_WANT_DATA;
}

/* j can be NULL */
Expand Down Expand Up @@ -1284,6 +1285,7 @@ dispatch_cmd(Conn *c)

/* NUL-terminate this string so we can use strtol and friends */
c->cmd[c->cmd_len - 2] = '\0';
printf("%zu %s\n", c->cmd_len, c->cmd);

/* check for possible maliciousness */
if (strlen(c->cmd) != c->cmd_len - 2) {
Expand Down Expand Up @@ -1563,7 +1565,7 @@ dispatch_cmd(Conn *c)
op_ct[type]++;

i = kick_jobs(c->srv, c->use, count);
reply_line(c, STATE_SENDWORD, "KICKED %u\r\n", i);
reply_line(c, STATE_SEND_WORD, "KICKED %u\r\n", i);
return;

case OP_KICKJOB:
Expand Down Expand Up @@ -1669,7 +1671,7 @@ dispatch_cmd(Conn *c)
return;
}
op_ct[type]++;
reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
reply_line(c, STATE_SEND_WORD, "USING %s\r\n", c->use->name);
return;

case OP_LIST_TUBES_WATCHED:
Expand Down Expand Up @@ -1701,7 +1703,7 @@ dispatch_cmd(Conn *c)
TUBE_ASSIGN(t, NULL);
c->use->using_ct++;

reply_line(c, STATE_SENDWORD, "USING %s\r\n", c->use->name);
reply_line(c, STATE_SEND_WORD, "USING %s\r\n", c->use->name);
return;

case OP_WATCH:
Expand All @@ -1726,7 +1728,7 @@ dispatch_cmd(Conn *c)
reply_serr(c, MSG_OUT_OF_MEMORY);
return;
}
reply_line(c, STATE_SENDWORD, "WATCHING %zu\r\n", c->watch.len);
reply_line(c, STATE_SEND_WORD, "WATCHING %zu\r\n", c->watch.len);
return;

case OP_IGNORE:
Expand All @@ -1753,7 +1755,7 @@ dispatch_cmd(Conn *c)
if (t)
ms_remove(&c->watch, t); /* may free t if refcount => 0 */
t = NULL;
reply_line(c, STATE_SENDWORD, "WATCHING %zu\r\n", c->watch.len);
reply_line(c, STATE_SEND_WORD, "WATCHING %zu\r\n", c->watch.len);
return;

case OP_QUIT:
Expand Down Expand Up @@ -1789,7 +1791,7 @@ dispatch_cmd(Conn *c)
t->pause = delay;
t->stat.pause_ct++;

reply_line(c, STATE_SENDWORD, "PAUSED\r\n");
reply_line(c, STATE_SEND_WORD, "PAUSED\r\n");
return;

default:
Expand Down Expand Up @@ -1865,7 +1867,7 @@ conn_want_command(Conn *c)
c->out_job = NULL;

c->reply_sent = 0; /* now that we're done, reset this */
c->state = STATE_WANTCOMMAND;
c->state = STATE_WANT_COMMAND;
}

static void
Expand All @@ -1877,7 +1879,7 @@ conn_process_io(Conn *c)
struct iovec iov[2];

switch (c->state) {
case STATE_WANTCOMMAND:
case STATE_WANT_COMMAND:
r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
if (r == -1) {
check_err(c, "read()");
Expand All @@ -1888,28 +1890,53 @@ conn_process_io(Conn *c)
return;
}

c->cmd_read += r; /* we got some bytes */
c->cmd_read += r;
c->cmd_len = scan_line_end(c->cmd, c->cmd_read);
if (c->cmd_len) {
// We found complete command line. Bail out to h_conn.
return;
}

// c->cmd_read > LINE_BUF_SIZE can't happen

if (c->cmd_read == LINE_BUF_SIZE) {
// Command line too long.
// Put connection into special state that discards
// the command line until the end line is found.
c->cmd_read = 0; // discard the input so far
c->state = STATE_WANT_ENDLINE;
}
// We have an incomplete line, so just keep waiting.
return;

c->cmd_len = scan_line_end(c->cmd, c->cmd_read); /* find the EOL */
case STATE_WANT_ENDLINE:
r = read(c->sock.fd, c->cmd + c->cmd_read, LINE_BUF_SIZE - c->cmd_read);
if (r == -1) {
check_err(c, "read()");
return;
}
if (r == 0) {
c->state = STATE_CLOSE;
return;
}

/* yay, complete command line */
c->cmd_read += r;
c->cmd_len = scan_line_end(c->cmd, c->cmd_read);
if (c->cmd_len) {
dispatch_cmd(c);
// Found the EOL. Reply and reuse whatever was read afer the EOL.
reply_msg(c, MSG_BAD_FORMAT);
fill_extra_data(c);
return;
}

/* c->cmd_read > LINE_BUF_SIZE can't happen */
// c->cmd_read > LINE_BUF_SIZE can't happen

/* command line too long? */
if (c->cmd_read == LINE_BUF_SIZE) {
c->cmd_read = 0; /* discard the input so far */
reply_msg(c, MSG_BAD_FORMAT);
return;
// Keep discarding the input since no EOL was found.
c->cmd_read = 0;
}
return;

/* otherwise we have an incomplete line, so just keep waiting */
break;
case STATE_BITBUCKET: {
/* Invert the meaning of in_job_read while throwing away data -- it
* counts the bytes that remain to be thrown away. */
Expand All @@ -1930,11 +1957,11 @@ conn_process_io(Conn *c)
/* (c->in_job_read < 0) can't happen */

if (c->in_job_read == 0) {
reply(c, c->reply, c->reply_len, STATE_SENDWORD);
reply(c, c->reply, c->reply_len, STATE_SEND_WORD);
}
return;
}
case STATE_WANTDATA:
case STATE_WANT_DATA:
j = c->in_job;

r = read(c->sock.fd, j->body + c->in_job_read, j->r.body_size -c->in_job_read);
Expand All @@ -1952,8 +1979,8 @@ conn_process_io(Conn *c)
/* (j->in_job_read > j->r.body_size) can't happen */

maybe_enqueue_incoming_job(c);
break;
case STATE_SENDWORD:
return;
case STATE_SEND_WORD:
r= write(c->sock.fd, c->reply + c->reply_sent, c->reply_len - c->reply_sent);
if (r == -1) {
check_err(c, "write()");
Expand All @@ -1975,7 +2002,7 @@ conn_process_io(Conn *c)

/* otherwise we sent an incomplete reply, so just keep waiting */
break;
case STATE_SENDJOB:
case STATE_SEND_JOB:
j = c->out_job;

iov[0].iov_base = (void *)(c->reply + c->reply_sent);
Expand Down Expand Up @@ -2024,7 +2051,7 @@ conn_process_io(Conn *c)
}
}

#define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANTCOMMAND))
#define want_command(c) ((c)->sock.fd && ((c)->state == STATE_WANT_COMMAND))
#define cmd_data_ready(c) (want_command(c) && (c)->cmd_read)

static void
Expand Down Expand Up @@ -2159,7 +2186,7 @@ h_accept(const int fd, const short which, Server *s)
return;
}

Conn *c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
Conn *c = make_conn(cfd, STATE_WANT_COMMAND, default_tube, default_tube);
if (!c) {
twarnx("make_conn() failed");
close(cfd);
Expand Down
9 changes: 7 additions & 2 deletions testserv.c
Expand Up @@ -380,10 +380,15 @@ cttest_too_long_commandline()
int port = SERVER();
int fd = mustdiallocal(port);
int i;
for (i = 0; i < 5; i++)
mustsend(fd, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
for (i = 0; i < 10; i++)
mustsend(fd, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); // 50 bytes
mustsend(fd, "\r\n");
ckresp(fd, "BAD_FORMAT\r\n");
// Issue another command and check that reponse is not "UNKNOWN_COMMAND"
// as described in issue #337
mustsend(fd, "put 0 0 1 1\r\n");
mustsend(fd, "A\r\n");
ckresp(fd, "INSERTED 1\r\n");
}

void
Expand Down

0 comments on commit d18aa23

Please sign in to comment.