Skip to content

Commit

Permalink
HTTP/2 and HTTP/2 upload handling fixes
Browse files Browse the repository at this point in the history
- fixes curl#11242 where 100% CPU on uploads was reported
- fixes possible stalls on last part of a request body when
  that information could not be fully send on the connection
  due to an EAGAIN
- applies the same EGAIN handling to HTTP/2 proxying
  • Loading branch information
icing committed Jun 19, 2023
1 parent 355f414 commit e6e84e9
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 187 deletions.
1 change: 1 addition & 0 deletions lib/bufq.c
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ ssize_t Curl_bufq_write_pass(struct bufq *q,
*err = CURLE_AGAIN;
return -1;
}
*err = CURLE_OK;
return nwritten;
}

Expand Down
215 changes: 131 additions & 84 deletions lib/cf-h2-proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,17 @@
#include "curl_memory.h"
#include "memdebug.h"

#define H2_NW_CHUNK_SIZE (128*1024)
#define PROXY_H2_NW_RECV_CHUNKS 1
#define H2_CHUNK_SIZE (16*1024)

#define PROXY_HTTP2_HUGE_WINDOW_SIZE (100 * 1024 * 1024)
#define H2_TUNNEL_WINDOW_SIZE (10 * 1024 * 1024)

#define PROXY_H2_NW_RECV_CHUNKS (H2_TUNNEL_WINDOW_SIZE / H2_CHUNK_SIZE)
#define PROXY_H2_NW_SEND_CHUNKS 1

#define PROXY_HTTP2_HUGE_WINDOW_SIZE (32 * 1024 * 1024) /* 32 MB */
#define H2_TUNNEL_RECV_CHUNKS (H2_TUNNEL_WINDOW_SIZE / H2_CHUNK_SIZE)
#define H2_TUNNEL_SEND_CHUNKS ((128 * 1024) / H2_CHUNK_SIZE)

#define H2_TUNNEL_WINDOW_SIZE (1024 * 1024)
#define H2_TUNNEL_CHUNK_SIZE (32 * 1024)
#define H2_TUNNEL_RECV_CHUNKS \
(H2_TUNNEL_WINDOW_SIZE / H2_TUNNEL_CHUNK_SIZE)
#define H2_TUNNEL_SEND_CHUNKS \
(H2_TUNNEL_WINDOW_SIZE / H2_TUNNEL_CHUNK_SIZE)

typedef enum {
H2_TUNNEL_INIT, /* init/default/no tunnel state */
Expand All @@ -72,10 +71,11 @@ struct tunnel_stream {
char *authority;
int32_t stream_id;
uint32_t error;
size_t upload_blocked_len;
h2_tunnel_state state;
bool has_final_response;
bool closed;
bool reset;
BIT(has_final_response);
BIT(closed);
BIT(reset);
};

static CURLcode tunnel_stream_init(struct Curl_cfilter *cf,
Expand All @@ -87,9 +87,9 @@ static CURLcode tunnel_stream_init(struct Curl_cfilter *cf,

ts->state = H2_TUNNEL_INIT;
ts->stream_id = -1;
Curl_bufq_init2(&ts->recvbuf, H2_TUNNEL_CHUNK_SIZE, H2_TUNNEL_RECV_CHUNKS,
Curl_bufq_init2(&ts->recvbuf, H2_CHUNK_SIZE, H2_TUNNEL_RECV_CHUNKS,
BUFQ_OPT_SOFT_LIMIT);
Curl_bufq_init(&ts->sendbuf, H2_TUNNEL_CHUNK_SIZE, H2_TUNNEL_SEND_CHUNKS);
Curl_bufq_init(&ts->sendbuf, H2_CHUNK_SIZE, H2_TUNNEL_SEND_CHUNKS);

if(cf->conn->bits.conn_to_host)
hostname = cf->conn->conn_to_host.name;
Expand Down Expand Up @@ -191,6 +191,7 @@ struct cf_h2_proxy_ctx {
int32_t last_stream_id;
BIT(conn_closed);
BIT(goaway);
BIT(nw_out_blocked);
};

/* How to access `call_data` from a cf_h2 filter */
Expand Down Expand Up @@ -302,8 +303,8 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf,
DEBUGASSERT(!ctx->h2);
memset(&ctx->tunnel, 0, sizeof(ctx->tunnel));

Curl_bufq_init(&ctx->inbufq, H2_NW_CHUNK_SIZE, PROXY_H2_NW_RECV_CHUNKS);
Curl_bufq_init(&ctx->outbufq, H2_NW_CHUNK_SIZE, PROXY_H2_NW_SEND_CHUNKS);
Curl_bufq_init(&ctx->inbufq, H2_CHUNK_SIZE, PROXY_H2_NW_RECV_CHUNKS);
Curl_bufq_init(&ctx->outbufq, H2_CHUNK_SIZE, PROXY_H2_NW_SEND_CHUNKS);

if(tunnel_stream_init(cf, &ctx->tunnel))
goto out;
Expand Down Expand Up @@ -368,28 +369,35 @@ static CURLcode cf_h2_proxy_ctx_init(struct Curl_cfilter *cf,
return result;
}

static int should_close_session(struct cf_h2_proxy_ctx *ctx)
{
return !nghttp2_session_want_read(ctx->h2) &&
!nghttp2_session_want_write(ctx->h2);
}

static CURLcode proxy_h2_nw_out_flush(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
size_t buflen = Curl_bufq_len(&ctx->outbufq);
ssize_t nwritten;
CURLcode result;

(void)data;
if(!buflen)
if(Curl_bufq_is_empty(&ctx->outbufq))
return CURLE_OK;

DEBUGF(LOG_CF(data, cf, "h2 conn flush %zu bytes", buflen));
nwritten = Curl_bufq_pass(&ctx->outbufq, proxy_h2_nw_out_writer, cf,
&result);
if(nwritten < 0) {
if(result == CURLE_AGAIN) {
DEBUGF(LOG_CF(data, cf, "flush nw send buffer(%zu) -> EAGAIN",
Curl_bufq_len(&ctx->outbufq)));
ctx->nw_out_blocked = 1;
}
return result;
}
if((size_t)nwritten < buflen) {
return CURLE_AGAIN;
}
return CURLE_OK;
DEBUGF(LOG_CF(data, cf, "nw send buffer flushed"));
return Curl_bufq_is_empty(&ctx->outbufq)? CURLE_OK: CURLE_AGAIN;
}

/*
Expand Down Expand Up @@ -488,19 +496,16 @@ static CURLcode proxy_h2_progress_ingress(struct Curl_cfilter *cf,
return CURLE_OK;
}

/*
* Check if there's been an update in the priority /
* dependency settings and if so it submits a PRIORITY frame with the updated
* info.
* Flush any out data pending in the network buffer.
*/
static CURLcode proxy_h2_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
int rv = 0;

rv = nghttp2_session_send(ctx->h2);
ctx->nw_out_blocked = 0;
while(!rv && !ctx->nw_out_blocked && nghttp2_session_want_write(ctx->h2))
rv = nghttp2_session_send(ctx->h2);

if(nghttp2_is_fatal(rv)) {
DEBUGF(LOG_CF(data, cf, "nghttp2_session_send error (%s)%d",
nghttp2_strerror(rv), rv));
Expand Down Expand Up @@ -972,7 +977,7 @@ static CURLcode H2_CONNECT(struct Curl_cfilter *cf,
result = proxy_h2_progress_ingress(cf, data);
if(!result)
result = proxy_h2_progress_egress(cf, data);
if(result) {
if(result && result != CURLE_AGAIN) {
h2_tunnel_go_state(cf, ts, H2_TUNNEL_FAILED, data);
break;
}
Expand Down Expand Up @@ -1219,7 +1224,7 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf,
}

result = proxy_h2_progress_egress(cf, data);
if(result) {
if(result && result != CURLE_AGAIN) {
*err = result;
nread = -1;
}
Expand All @@ -1233,89 +1238,131 @@ static ssize_t cf_h2_proxy_recv(struct Curl_cfilter *cf,

static ssize_t cf_h2_proxy_send(struct Curl_cfilter *cf,
struct Curl_easy *data,
const void *mem, size_t len, CURLcode *err)
const void *buf, size_t len, CURLcode *err)
{
struct cf_h2_proxy_ctx *ctx = cf->ctx;
struct cf_call_data save;
ssize_t nwritten = -1;
const unsigned char *buf = mem;
size_t start_len = len;
int rv;
ssize_t nwritten;
CURLcode result;
int blocked = 0;

if(ctx->tunnel.state != H2_TUNNEL_ESTABLISHED) {
*err = CURLE_SEND_ERROR;
return -1;
}
CF_DATA_SAVE(save, cf, data);

while(len) {
if(ctx->tunnel.closed) {
nwritten = -1;
*err = CURLE_SEND_ERROR;
goto out;
}
else if(ctx->tunnel.upload_blocked_len) {
/* the data in `buf` has alread been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
DEBUGASSERT(len >= ctx->tunnel.upload_blocked_len);
if(len < ctx->tunnel.upload_blocked_len) {
/* Did we get called again with a smaller `len`? This should not
* happend. We are not prepared to handle that. */
failf(data, "HTTP/2 proxy, send again with decreased length");
*err = CURLE_HTTP2;
nwritten = -1;
goto out;
}
nwritten = (ssize_t)ctx->tunnel.upload_blocked_len;
ctx->tunnel.upload_blocked_len = 0;
}
else {
nwritten = Curl_bufq_write(&ctx->tunnel.sendbuf, buf, len, err);
if(nwritten <= 0) {
if(*err && *err != CURLE_AGAIN) {
DEBUGF(LOG_CF(data, cf, "error adding data to tunnel sendbuf: %d",
*err));
nwritten = -1;
if(nwritten < 0) {
if(*err != CURLE_AGAIN)
goto out;
}
/* blocked */
nwritten = 0;
}
else {
DEBUGASSERT((size_t)nwritten <= len);
buf += (size_t)nwritten;
len -= (size_t)nwritten;
}
}

/* resume the tunnel stream and let the h2 session send, which
* triggers reading from tunnel.sendbuf */
if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
/* req body data is buffered, resume the potentially suspended stream */
rv = nghttp2_session_resume_data(ctx->h2, ctx->tunnel.stream_id);
if(nghttp2_is_fatal(rv)) {
*err = CURLE_SEND_ERROR;
nwritten = -1;
goto out;
}
*err = proxy_h2_progress_egress(cf, data);
if(*err) {
nwritten = -1;
goto out;
}

if(!nwritten && Curl_bufq_is_full(&ctx->tunnel.sendbuf)) {
size_t rwin;
/* we could not add to the buffer and after session processing,
* it is still full. */
rwin = nghttp2_session_get_stream_remote_window_size(
ctx->h2, ctx->tunnel.stream_id);
DEBUGF(LOG_CF(data, cf, "cf_send: tunnel win %u/%zu",
nghttp2_session_get_remote_window_size(ctx->h2), rwin));
if(rwin == 0) {
/* We cannot upload more as the stream's remote window size
* is 0. We need to receive WIN_UPDATEs before we can continue.
*/
data->req.keepon |= KEEP_SEND_HOLD;
DEBUGF(LOG_CF(data, cf, "pausing send as remote flow "
"window is exhausted"));
}
break;
}
}

nwritten = start_len - len;
if(nwritten > 0) {
*err = CURLE_OK;
/* Call the nghttp2 send loop and flush to write ALL buffered data,
* headers and/or request body completely out to the network */
result = proxy_h2_progress_egress(cf, data);
if(result == CURLE_AGAIN) {
blocked = 1;
}
else if(ctx->tunnel.closed) {
else if(result) {
*err = result;
nwritten = -1;
*err = CURLE_SEND_ERROR;
goto out;
}
else {
nwritten = -1;
else if(!Curl_bufq_is_empty(&ctx->tunnel.sendbuf)) {
/* although we wrote everything that nghttp2 wants to send now,
* there is data left in our stream send buffer unwritten. This may
* be due to the stream's HTTP/2 flow window being exhausted. */
blocked = 1;
}

if(blocked) {
/* Unable to send all data, due to connection blocked or H2 window
* exhaustion. Data is left in our stream buffer, or nghttp2's internal
* frame buffer or our network out buffer. */
size_t rwin = nghttp2_session_get_stream_remote_window_size(
ctx->h2, ctx->tunnel.stream_id);
if(rwin == 0) {
/* H2 flow window exhaustion.
* FIXME: there is no way to HOLD all transfers that use this
* proxy connection AND to UNHOLD all of them again when the
* window increases.
* We *could* iterate over all data on this conn maybe? */
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] remote flow "
"window is exhausted", ctx->tunnel.stream_id));
}

/* Whatever the cause, we need to return CURL_EAGAIN for this call.
* We have unwritten state that needs us being invoked again and EAGAIN
* is the only way to ensure that. */
ctx->tunnel.upload_blocked_len = nwritten;
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) BLOCK: win %u/%zu "
"blocked_len=%zu",
ctx->tunnel.stream_id, len,
nghttp2_session_get_remote_window_size(ctx->h2), rwin,
nwritten));
*err = CURLE_AGAIN;
nwritten = -1;
goto out;
}
else if(should_close_session(ctx)) {
/* nghttp2 thinks this session is done. If the stream has not been
* closed, this is an error state for out transfer */
if(ctx->tunnel.closed) {
*err = CURLE_SEND_ERROR;
nwritten = -1;
}
else {
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
*err = CURLE_HTTP2;
nwritten = -1;
}
}

out:
DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) -> %zd, %d ",
start_len, nwritten, *err));
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] cf_send(len=%zu) -> %zd, %d, "
"h2 windows %d-%d (stream-conn), "
"buffers %zu-%zu (stream-conn)",
ctx->tunnel.stream_id, len, nwritten, *err,
nghttp2_session_get_stream_remote_window_size(
ctx->h2, ctx->tunnel.stream_id),
nghttp2_session_get_remote_window_size(ctx->h2),
Curl_bufq_len(&ctx->tunnel.sendbuf),
Curl_bufq_len(&ctx->outbufq)));
CF_DATA_RESTORE(cf, save);
return nwritten;
}
Expand Down
Loading

0 comments on commit e6e84e9

Please sign in to comment.