Skip to content

Commit

Permalink
Fix live-comm: merge TCP socket write-write sequence in a single write
Browse files Browse the repository at this point in the history
The live protocol implementation is often sending content
on TCP sockets in two separate writes. One to send a command header,
and the second one sending the command's payload. This was presumably
done under the assumption that it would not result in two separate
TCP packets being sent on the network (or that it would not matter).

Delayed ACK-induced delays were observed [1] on the second write of the
"write header, write payload" sequence and result in problematic
latency build-ups for live clients connected to moderately/highly
active sessions.

Fundamentaly, this problem arises due to the combination of Nagle's
algorithm and the delayed ACK mechanism which make write-write-read
sequences on TCP sockets problematic as near-constant latency is
expected when clients can keep-up with the event production rate.

In such a write-write-read sequence, the second write is held up until
the first write is acknowledged (TCP ACK). The solution implemented
by this patch bundles the writes into a single one [2].

[1] https://github.com/tbricks/wireshark-lttng-plugin
Basic Wireshark dissector for lttng-live by Anto Smyk from Itiviti
[2] https://lists.freebsd.org/pipermail/freebsd-net/2006-January/009527.html

Reported-by: Anton Smyk <anton.smyk@itiviti.com>
Signed-off-by: Jonathan Rajotte <jonathan.rajotte-julien@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
  • Loading branch information
PSRCode authored and jgalar committed Jul 27, 2017
1 parent a0439ff commit de417d0
Showing 1 changed file with 72 additions and 55 deletions.
127 changes: 72 additions & 55 deletions formats/lttng-live/lttng-live-comm.c
Expand Up @@ -157,6 +157,8 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
{
struct lttng_viewer_cmd cmd;
struct lttng_viewer_connect connect;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(connect);
char cmd_buf[cmd_buf_len];
int ret;
ssize_t ret_len;

Expand All @@ -174,19 +176,20 @@ int lttng_live_establish_connection(struct lttng_live_ctx *ctx)
connect.minor = htobe32(LTTNG_LIVE_MINOR);
connect.type = htobe32(LTTNG_VIEWER_CLIENT_COMMAND);

ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
perror("[error] Error sending cmd");
goto error;
}
assert(ret_len == sizeof(cmd));
/*
* Merge the cmd and connection request to prevent a write-write
* sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
* second write to be performed quickly in presence of Nagle's algorithm.
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &connect, sizeof(connect));

ret_len = lttng_live_send(ctx->control_sock, &connect, sizeof(connect));
ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending version");
perror("[error] Error sending cmd for establishing session");
goto error;
}
assert(ret_len == sizeof(connect));
assert(ret_len == cmd_buf_len);

ret_len = lttng_live_recv(ctx->control_sock, &connect, sizeof(connect));
if (ret_len == 0) {
Expand Down Expand Up @@ -423,6 +426,8 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
struct lttng_viewer_attach_session_request rq;
struct lttng_viewer_attach_session_response rp;
struct lttng_viewer_stream stream;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
int ret, i;
ssize_t ret_len;

Expand All @@ -441,19 +446,20 @@ int lttng_live_attach_session(struct lttng_live_ctx *ctx, uint64_t id)
// rq.seek = htobe32(LTTNG_VIEWER_SEEK_BEGINNING);
rq.seek = htobe32(LTTNG_VIEWER_SEEK_LAST);

ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
perror("[error] Error sending cmd");
goto error;
}
assert(ret_len == sizeof(cmd));
/*
* Merge the cmd and connection request to prevent a write-write
* sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
* second write to be performed quickly in presence of Nagle's algorithm.
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));

ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending attach request");
perror("[error] Error sending attach command and request");
goto error;
}
assert(ret_len == sizeof(rq));
assert(ret_len == cmd_buf_len);

ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
Expand Down Expand Up @@ -641,6 +647,8 @@ int get_data_packet(struct lttng_live_ctx *ctx,
struct lttng_viewer_cmd cmd;
struct lttng_viewer_get_packet rq;
struct lttng_viewer_trace_packet rp;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];
ssize_t ret_len;
int ret;

Expand All @@ -659,19 +667,20 @@ int get_data_packet(struct lttng_live_ctx *ctx,
rq.offset = htobe64(offset);
rq.len = htobe32(len);

ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
perror("[error] Error sending cmd");
goto error;
}
assert(ret_len == sizeof(cmd));
/*
* Merge the cmd and connection request to prevent a write-write
* sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
* second write to be performed quickly in presence of Nagle's algorithm.
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));

ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending get_data_packet request");
perror("[error] Error sending get_data_packet cmd and request");
goto error;
}
assert(ret_len == sizeof(rq));
assert(ret_len == cmd_buf_len);

ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
Expand Down Expand Up @@ -795,6 +804,8 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx,
struct lttng_viewer_metadata_packet rp;
char *data = NULL;
ssize_t ret_len;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];

if (lttng_live_should_quit()) {
ret = -1;
Expand All @@ -806,19 +817,20 @@ int get_one_metadata_packet(struct lttng_live_ctx *ctx,
cmd.data_size = htobe64((uint64_t) sizeof(rq));
cmd.cmd_version = htobe32(0);

ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
perror("[error] Error sending cmd");
goto error;
}
assert(ret_len == sizeof(cmd));
/*
* Merge the cmd and connection request to prevent a write-write
* sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
* second write to be performed quickly in presence of Nagle's algorithm.
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));

ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending get_metadata request");
perror("[error] Error sending get_metadata cmd and request");
goto error;
}
assert(ret_len == sizeof(rq));
assert(ret_len == cmd_buf_len);

ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
Expand Down Expand Up @@ -976,6 +988,8 @@ int get_next_index(struct lttng_live_ctx *ctx,
int ret;
ssize_t ret_len;
struct lttng_viewer_index *rp = &viewer_stream->current_index;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];

cmd.cmd = htobe32(LTTNG_VIEWER_GET_NEXT_INDEX);
cmd.data_size = htobe64((uint64_t) sizeof(rq));
Expand All @@ -984,24 +998,24 @@ int get_next_index(struct lttng_live_ctx *ctx,
memset(&rq, 0, sizeof(rq));
rq.stream_id = htobe64(viewer_stream->id);

/*
* Merge the cmd and connection request to prevent a write-write
* sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
* second write to be performed quickly in presence of Nagle's algorithm.
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));
retry:
if (lttng_live_should_quit()) {
ret = -1;
goto end;
}
ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
ret_len = lttng_live_send(ctx->control_sock, &cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending cmd");
perror("[error] Error sending get_next_index cmd and request");
goto error;
}
assert(ret_len == sizeof(cmd));

ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
if (ret_len < 0) {
perror("[error] Error sending get_next_index request");
goto error;
}
assert(ret_len == sizeof(rq));
assert(ret_len == cmd_buf_len);

ret_len = lttng_live_recv(ctx->control_sock, rp, sizeof(*rp));
if (ret_len == 0) {
Expand Down Expand Up @@ -1515,6 +1529,8 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
int ret, i, nb_streams = 0;
ssize_t ret_len;
uint32_t stream_count;
const size_t cmd_buf_len = sizeof(cmd) + sizeof(rq);
char cmd_buf[cmd_buf_len];

if (lttng_live_should_quit()) {
ret = -1;
Expand All @@ -1528,19 +1544,20 @@ int lttng_live_get_new_streams(struct lttng_live_ctx *ctx, uint64_t id)
memset(&rq, 0, sizeof(rq));
rq.session_id = htobe64(id);

ret_len = lttng_live_send(ctx->control_sock, &cmd, sizeof(cmd));
if (ret_len < 0) {
perror("[error] Error sending cmd");
goto error;
}
assert(ret_len == sizeof(cmd));
/*
* Merge the cmd and connection request to prevent a write-write
* sequence on the TCP socket. Otherwise, a delayed ACK will prevent the
* second write to be performed quickly in presence of Nagle's algorithm.
*/
memcpy(cmd_buf, &cmd, sizeof(cmd));
memcpy(cmd_buf + sizeof(cmd), &rq, sizeof(rq));

ret_len = lttng_live_send(ctx->control_sock, &rq, sizeof(rq));
ret_len = lttng_live_send(ctx->control_sock, cmd_buf, cmd_buf_len);
if (ret_len < 0) {
perror("[error] Error sending get_new_streams request");
perror("[error] Error sending get_new_streams cmd and request");
goto error;
}
assert(ret_len == sizeof(rq));
assert(ret_len == cmd_buf_len);

ret_len = lttng_live_recv(ctx->control_sock, &rp, sizeof(rp));
if (ret_len == 0) {
Expand Down

0 comments on commit de417d0

Please sign in to comment.