Skip to content

Commit

Permalink
http2: polish things around POST
Browse files Browse the repository at this point in the history
- added test cases for various code paths
- fixed handling of blocked write when stream had
  been closed inbetween attempts
- re-enabled DEBUGASSERT on send with smaller data size

- in debug builds, environment variables can be set to simulate a slow
  network when sending data. cf-socket.c and vquic.c support
  * CURL_DBG_SOCK_WBLOCK: percentage of send() calls that should be
    answered with a EAGAIN. TCP/UNIX sockets.
    This is chosen randomly.
  * CURL_DBG_SOCK_WPARTIAL: percentage of data that shall be written
    to the network. TCP/UNIX sockets.
    Example: 80 means a send with 1000 bytes would only send 800
    This is applied to every send.
  * CURL_DBG_QUIC_WBLOCK: percentage of send() calls that should be
    answered with EAGAIN. QUIC only.
    This is chosen randomly.

Closes #11756
  • Loading branch information
icing authored and bagder committed Sep 4, 2023
1 parent c9260cf commit 331b89a
Show file tree
Hide file tree
Showing 23 changed files with 897 additions and 164 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ngtcp2-linux.yml
Expand Up @@ -267,3 +267,11 @@ jobs:
name: 'run pytest'
env:
TFLAGS: "${{ matrix.build.tflags }}"

- run: pytest
name: 'run pytest with slowed network'
env:
# 33% of sends are EAGAINed
CURL_DBG_SOCK_WBLOCK: 33
# only 80% of data > 10 bytes is send
CURL_DBG_SOCK_WPARTIAL: 80
248 changes: 183 additions & 65 deletions lib/cf-h2-proxy.c

Large diffs are not rendered by default.

46 changes: 45 additions & 1 deletion lib/cf-socket.c
Expand Up @@ -71,6 +71,7 @@
#include "warnless.h"
#include "conncache.h"
#include "multihandle.h"
#include "rand.h"
#include "share.h"
#include "version_win32.h"

Expand Down Expand Up @@ -777,6 +778,10 @@ struct cf_socket_ctx {
struct curltime connected_at; /* when socket connected/got first byte */
struct curltime first_byte_at; /* when first byte was recvd */
int error; /* errno of last failure or 0 */
#ifdef DEBUGBUILD
int wblock_percent; /* percent of writes doing EAGAIN */
int wpartial_percent; /* percent of bytes written in send */
#endif
BIT(got_first_byte); /* if first byte was received */
BIT(accepted); /* socket was accepted, not connected */
BIT(active);
Expand All @@ -792,6 +797,22 @@ static void cf_socket_ctx_init(struct cf_socket_ctx *ctx,
ctx->transport = transport;
Curl_sock_assign_addr(&ctx->addr, ai, transport);
Curl_bufq_init(&ctx->recvbuf, NW_RECV_CHUNK_SIZE, NW_RECV_CHUNKS);
#ifdef DEBUGBUILD
{
char *p = getenv("CURL_DBG_SOCK_WBLOCK");
if(p) {
long l = strtol(p, NULL, 10);
if(l >= 0 && l <= 100)
ctx->wblock_percent = (int)l;
}
p = getenv("CURL_DBG_SOCK_WPARTIAL");
if(p) {
long l = strtol(p, NULL, 10);
if(l >= 0 && l <= 100)
ctx->wpartial_percent = (int)l;
}
}
#endif
}

struct reader_ctx {
Expand Down Expand Up @@ -1253,11 +1274,34 @@ static ssize_t cf_socket_send(struct Curl_cfilter *cf, struct Curl_easy *data,
struct cf_socket_ctx *ctx = cf->ctx;
curl_socket_t fdsave;
ssize_t nwritten;
size_t orig_len = len;

*err = CURLE_OK;
fdsave = cf->conn->sock[cf->sockindex];
cf->conn->sock[cf->sockindex] = ctx->sock;

#ifdef DEBUGBUILD
/* simulate network blocking/partial writes */
if(ctx->wblock_percent > 0) {
unsigned char c;
Curl_rand(data, &c, 1);
if(c >= ((100-ctx->wblock_percent)*256/100)) {
CURL_TRC_CF(data, cf, "send(len=%zu) SIMULATE EWOULDBLOCK", orig_len);
*err = CURLE_AGAIN;
nwritten = -1;
cf->conn->sock[cf->sockindex] = fdsave;
return nwritten;
}
}
if(cf->cft != &Curl_cft_udp && ctx->wpartial_percent > 0 && len > 8) {
len = len * ctx->wpartial_percent / 100;
if(!len)
len = 1;
CURL_TRC_CF(data, cf, "send(len=%zu) SIMULATE partial write of %zu bytes",
orig_len, len);
}
#endif

#if defined(MSG_FASTOPEN) && !defined(TCP_FASTOPEN_CONNECT) /* Linux */
if(cf->conn->bits.tcp_fastopen) {
nwritten = sendto(ctx->sock, buf, len, MSG_FASTOPEN,
Expand Down Expand Up @@ -1297,7 +1341,7 @@ static ssize_t cf_socket_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}

CURL_TRC_CF(data, cf, "send(len=%zu) -> %d, err=%d",
len, (int)nwritten, *err);
orig_len, (int)nwritten, *err);
cf->conn->sock[cf->sockindex] = fdsave;
return nwritten;
}
Expand Down
76 changes: 53 additions & 23 deletions lib/http2.c
Expand Up @@ -187,6 +187,7 @@ struct stream_ctx {
int status_code; /* HTTP response status code */
uint32_t error; /* stream error code */
uint32_t local_window_size; /* the local recv window size */
bool resp_hds_complete; /* we have a complete, final response */
bool closed; /* TRUE on stream close */
bool reset; /* TRUE on stream reset */
bool close_handled; /* TRUE if stream closure is handled by libcurl */
Expand Down Expand Up @@ -1044,6 +1045,9 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
if(result)
return result;

if(stream->status_code / 100 != 1) {
stream->resp_hds_complete = TRUE;
}
drain_stream(cf, data, stream);
break;
case NGHTTP2_PUSH_PROMISE:
Expand All @@ -1064,7 +1068,9 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
break;
case NGHTTP2_RST_STREAM:
stream->closed = TRUE;
stream->reset = TRUE;
if(frame->rst_stream.error_code) {
stream->reset = TRUE;
}
stream->send_closed = TRUE;
data->req.keepon &= ~KEEP_SEND_HOLD;
drain_stream(cf, data, stream);
Expand Down Expand Up @@ -1109,8 +1115,9 @@ static int fr_print(const nghttp2_frame *frame, char *buffer, size_t blen)
}
case NGHTTP2_RST_STREAM: {
return msnprintf(buffer, blen,
"FRAME[RST_STREAM, len=%d, flags=%d]",
(int)frame->hd.length, frame->hd.flags);
"FRAME[RST_STREAM, len=%d, flags=%d, error=%u]",
(int)frame->hd.length, frame->hd.flags,
frame->rst_stream.error_code);
}
case NGHTTP2_SETTINGS: {
if(frame->hd.flags & NGHTTP2_FLAG_ACK) {
Expand Down Expand Up @@ -1166,7 +1173,7 @@ static int on_frame_send(nghttp2_session *session, const nghttp2_frame *frame,
if(data && Curl_trc_cf_is_verbose(cf, data)) {
char buffer[256];
int len;
len = fr_print(frame, buffer, (sizeof(buffer)/sizeof(buffer[0]))-1);
len = fr_print(frame, buffer, sizeof(buffer)-1);
buffer[len] = 0;
CURL_TRC_CF(data, cf, "[%d] -> %s", frame->hd.stream_id, buffer);
}
Expand All @@ -1187,7 +1194,7 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
if(Curl_trc_cf_is_verbose(cf, data)) {
char buffer[256];
int len;
len = fr_print(frame, buffer, (sizeof(buffer)/sizeof(buffer[0]))-1);
len = fr_print(frame, buffer, sizeof(buffer)-1);
buffer[len] = 0;
CURL_TRC_CF(data, cf, "[%d] <- %s",frame->hd.stream_id, buffer);
}
Expand Down Expand Up @@ -1975,7 +1982,14 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,

out:
result = h2_progress_egress(cf, data);
if(result && result != CURLE_AGAIN) {
if(result == CURLE_AGAIN) {
/* pending data to send, need to be called again. Ideally, we'd
* monitor the socket for POLLOUT, but we might not be in SENDING
* transfer state any longer and are unable to make this happen.
*/
drain_stream(cf, data, stream);
}
else if(result) {
*err = result;
nread = -1;
}
Expand Down Expand Up @@ -2151,27 +2165,17 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
int rv;
ssize_t nwritten;
CURLcode result;
int blocked = 0;
int blocked = 0, was_blocked = 0;

CF_DATA_SAVE(save, cf, data);

if(stream && stream->id != -1) {
if(stream->close_handled) {
infof(data, "stream %u closed", stream->id);
*err = CURLE_HTTP2_STREAM;
nwritten = -1;
goto out;
}
else if(stream->closed) {
nwritten = http2_handle_stream_close(cf, data, stream, err);
goto out;
}
else if(stream->upload_blocked_len) {
if(stream->upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
/* TODO: this assertion triggers in OSSFuzz runs and it is not
* clear why. Disable for now to let OSSFuzz continue its tests.
DEBUGASSERT(len >= stream->upload_blocked_len); */
* clear why. Disable for now to let OSSFuzz continue its tests. */
DEBUGASSERT(len >= stream->upload_blocked_len);
if(len < stream->upload_blocked_len) {
/* Did we get called again with a smaller `len`? This should not
* happen. We are not prepared to handle that. */
Expand All @@ -2182,6 +2186,25 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
nwritten = (ssize_t)stream->upload_blocked_len;
stream->upload_blocked_len = 0;
was_blocked = 1;
}
else if(stream->closed) {
if(stream->resp_hds_complete) {
/* Server decided to close the stream after having sent us a findl
* response. This is valid if it is not interested in the request
* body. This happens on 30x or 40x responses.
* We silently discard the data sent, since this is not a transport
* error situation. */
CURL_TRC_CF(data, cf, "[%d] discarding data"
"on closed stream with response", stream->id);
*err = CURLE_OK;
nwritten = (ssize_t)len;
goto out;
}
infof(data, "stream %u closed", stream->id);
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
}
else {
/* If stream_id != -1, we have dispatched request HEADERS and
Expand Down Expand Up @@ -2218,8 +2241,10 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
result = h2_progress_egress(cf, data);
/* if the stream has been closed in egress handling (nghttp2 does that
* when it does not like the headers, for example */
if(stream && stream->closed) {
nwritten = http2_handle_stream_close(cf, data, stream, err);
if(stream && stream->closed && !was_blocked) {
infof(data, "stream %u closed", stream->id);
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
}
else if(result == CURLE_AGAIN) {
Expand Down Expand Up @@ -2367,15 +2392,20 @@ static CURLcode cf_h2_connect(struct Curl_cfilter *cf,
if(result)
goto out;

/* Send out our SETTINGS and ACKs and such. If that blocks, we
* have it buffered and can count this filter as being connected */
result = h2_progress_egress(cf, data);
if(result)
if(result == CURLE_AGAIN)
result = CURLE_OK;
else if(result)
goto out;

*done = TRUE;
cf->connected = TRUE;
result = CURLE_OK;

out:
CURL_TRC_CF(data, cf, "cf_connect() -> %d, %d, ", result, *done);
CF_DATA_RESTORE(cf, save);
return result;
}
Expand Down
6 changes: 6 additions & 0 deletions lib/multi.c
Expand Up @@ -667,8 +667,14 @@ static CURLcode multi_done(struct Curl_easy *data,
struct connectdata *conn = data->conn;
unsigned int i;

#if defined(DEBUGBUILD) && !defined(CURL_DISABLE_VERBOSE_STRINGS)
DEBUGF(infof(data, "multi_done[%s]: status: %d prem: %d done: %d",
multi_statename[data->mstate],
(int)status, (int)premature, data->state.done));
#else
DEBUGF(infof(data, "multi_done: status: %d prem: %d done: %d",
(int)status, (int)premature, data->state.done));
#endif

if(data->state.done)
/* Stop if multi_done() has already been called */
Expand Down
6 changes: 3 additions & 3 deletions lib/transfer.c
Expand Up @@ -431,6 +431,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
curl_off_t max_recv = data->set.max_recv_speed?
data->set.max_recv_speed : CURL_OFF_T_MAX;
char *buf = data->state.buffer;
bool data_eof_handled = FALSE;
DEBUGASSERT(buf);

*done = FALSE;
Expand All @@ -448,8 +449,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
to ensure that http2_handle_stream_close is called when we read all
incoming bytes for a particular stream. */
bool is_http3 = Curl_conn_is_http3(data, conn, FIRSTSOCKET);
bool data_eof_handled = is_http3
|| Curl_conn_is_http2(data, conn, FIRSTSOCKET);
data_eof_handled = is_http3 || Curl_conn_is_http2(data, conn, FIRSTSOCKET);

if(!data_eof_handled && k->size != -1 && !k->header) {
/* make sure we don't read too much */
Expand Down Expand Up @@ -761,7 +761,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
}

if(((k->keepon & (KEEP_RECV|KEEP_SEND)) == KEEP_SEND) &&
conn->bits.close) {
(conn->bits.close || data_eof_handled)) {
/* When we've read the entire thing and the close bit is set, the server
may now close the connection. If there's now any kind of sending going
on from our side, we need to stop that immediately. */
Expand Down
24 changes: 22 additions & 2 deletions lib/vquic/curl_ngtcp2.c
Expand Up @@ -1106,6 +1106,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
else {
CURL_TRC_CF(data, cf, "[%" PRId64 "] CLOSED", stream->id);
}
data->req.keepon &= ~KEEP_SEND_HOLD;
h3_drain_stream(cf, data);
return 0;
}
Expand Down Expand Up @@ -1785,6 +1786,18 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
stream->upload_blocked_len = 0;
}
else if(stream->closed) {
if(stream->resp_hds_complete) {
/* Server decided to close the stream after having sent us a final
* response. This is valid if it is not interested in the request
* body. This happens on 30x or 40x responses.
* We silently discard the data sent, since this is not a transport
* error situation. */
CURL_TRC_CF(data, cf, "[%" PRId64 "] discarding data"
"on closed stream with response", stream->id);
*err = CURLE_OK;
sent = (ssize_t)len;
goto out;
}
*err = CURLE_HTTP3;
sent = -1;
goto out;
Expand Down Expand Up @@ -2245,9 +2258,16 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
}
break;
}
case CF_CTRL_DATA_IDLE:
result = check_and_set_expiry(cf, data, NULL);
case CF_CTRL_DATA_IDLE: {
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
CURL_TRC_CF(data, cf, "data idle");
if(stream && !stream->closed) {
result = check_and_set_expiry(cf, data, NULL);
if(result)
CURL_TRC_CF(data, cf, "data idle, check_and_set_expiry -> %d", result);
}
break;
}
default:
break;
}
Expand Down

0 comments on commit 331b89a

Please sign in to comment.