Skip to content

Commit

Permalink
Fixing busy loop on large uploads for HTTP/2.
Browse files Browse the repository at this point in the history
- refs curl#10449 with improvements in h2_cf_send() and nwritten return code
  handling. Cherry picked and improved after review by @jay
- setting KEEP_SEND_PAUSE flags when exhausting remote HTTP/2 window
  size of a stream.
- clearing KEEP_SEND_PAUSE when receiving HTTP/2 window updates
  on a paused stream.
  • Loading branch information
icing committed Feb 27, 2023
1 parent 2af16ad commit 97633e8
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 37 deletions.
65 changes: 30 additions & 35 deletions lib/http2.c
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,17 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv RST", stream_id));
stream->reset = TRUE;
break;
case NGHTTP2_WINDOW_UPDATE:
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv WINDOW_UPDATE", stream_id));
if(data_s->req.keepon & KEEP_SEND_PAUSE
&& data_s->req.keepon & KEEP_SEND) {
data_s->req.keepon &= ~KEEP_SEND_PAUSE;
drain_this(cf, data_s);
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] unpausing after win update",
stream_id));
}
break;
default:
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv frame %x",
stream_id, frame->hd.type));
Expand Down Expand Up @@ -1006,18 +1017,6 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
return NGHTTP2_ERR_PAUSE;
}

#if 0
/* pause execution of nghttp2 if we received data for another handle
in order to process them first. */
if(CF_DATA_CURRENT(cf) != data_s) {
ctx->pause_stream_id = stream_id;
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] not call_data -> NGHTTP2_ERR_PAUSE",
stream_id));
drain_this(cf, data_s);
return NGHTTP2_ERR_PAUSE;
}
#endif

return 0;
}

Expand Down Expand Up @@ -1763,7 +1762,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}

DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: win %u/%u",
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u",
stream->stream_id,
nghttp2_session_get_local_window_size(ctx->h2),
nghttp2_session_get_stream_local_window_size(ctx->h2,
Expand Down Expand Up @@ -1979,7 +1978,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
ssize_t nwritten;

CF_DATA_SAVE(save, cf, data);
DEBUGF(LOG_CF(data, cf, "send len=%zu", len));
DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len));

if(stream->stream_id != -1) {
if(stream->close_handled) {
Expand Down Expand Up @@ -2009,16 +2008,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
goto out;
}

/* The callback for stream input updates `upload_len` with what it
* has provided to nghttp2. If it has *not* been called, `upload_len`
* is unchanged and we should report a block on send. */
if(stream->upload_len == len) {
*err = CURLE_AGAIN;
nwritten = -1;
goto out;
}

nwritten = len - stream->upload_len;
nwritten = (ssize_t)len - (ssize_t)stream->upload_len;
stream->upload_mem = NULL;
stream->upload_len = 0;

Expand All @@ -2037,19 +2027,24 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
nghttp2_session_resume_data(ctx->h2, stream->stream_id);
}

#ifdef DEBUG_HTTP2
if(!nwritten) {
infof(data, "http2_send: easy %p (stream %u) win %u/%u",
data, stream->stream_id,
nghttp2_session_get_remote_window_size(ctx->h2),
nghttp2_session_get_stream_remote_window_size(ctx->h2,
stream->stream_id)
);

size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
stream->stream_id);
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%zu",
stream->stream_id,
nghttp2_session_get_remote_window_size(ctx->h2), rwin));
if(rwin == 0) {
/* We cannot upload more as the stream's remote window size
* is 0. We need to receive WIN_UPDATEs before we can continue.
*/
data->req.keepon |= KEEP_SEND_PAUSE;
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pausing send as remote flow "
"window is exhausted", stream->stream_id));
}
}
infof(data, "http2_send returns %zd for stream %u", nwritten,
stream->stream_id);
#endif
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ",
stream->stream_id, nwritten));

/* handled writing BODY for open stream. */
goto out;
}
Expand Down
38 changes: 38 additions & 0 deletions tests/tests-httpd/test_07_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,41 @@ def test_07_21_upload_parallel_large(self, env: Env, httpd, nghttpx, repeat, pro
respdata = open(curl.response_file(i)).readlines()
assert respdata == indata

# PUT 100k
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
fdata = os.path.join(env.gen_dir, 'data-100k')
count = 1
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]'
r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
extra_args=['--parallel'])
assert r.exit_code == 0, f'{r}'
r.check_stats(count=count, exp_status=200)
exp_data = [f'{os.path.getsize(fdata)}']
r.check_stats(count=count, exp_status=200)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
assert respdata == exp_data

# PUT 10m
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
fdata = os.path.join(env.gen_dir, 'data-10m')
count = 1
curl = CurlClient(env=env)
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]&chunk_delay=10ms'
r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
extra_args=['--parallel'])
assert r.exit_code == 0, f'{r}'
r.check_stats(count=count, exp_status=200)
exp_data = [f'{os.path.getsize(fdata)}']
r.check_stats(count=count, exp_status=200)
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
assert respdata == exp_data

27 changes: 25 additions & 2 deletions tests/tests-httpd/testenv/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,29 @@ def http_upload(self, urls: List[str], data: str,
with_stats=with_stats,
with_headers=with_headers)

def http_put(self, urls: List[str], data=None, fdata=None,
alpn_proto: Optional[str] = None,
with_stats: bool = True,
with_headers: bool = False,
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if fdata is not None:
extra_args.extend(['-T', fdata])
elif data is not None:
extra_args.extend(['-T', '-'])
extra_args.extend([
'-o', 'download_#1.data',
])
if with_stats:
extra_args.extend([
'-w', '%{json}\\n'
])
return self._raw(urls, intext=data,
alpn_proto=alpn_proto, options=extra_args,
with_stats=with_stats,
with_headers=with_headers)

def response_file(self, idx: int):
return os.path.join(self._run_dir, f'download_{idx}.data')

Expand Down Expand Up @@ -303,7 +326,7 @@ def _run(self, args, intext='', with_stats: bool = False):
duration=datetime.now() - start,
with_stats=with_stats)

def _raw(self, urls, timeout=10, options=None, insecure=False,
def _raw(self, urls, intext='', timeout=10, options=None, insecure=False,
alpn_proto: Optional[str] = None,
force_resolve=True,
with_stats=False,
Expand All @@ -312,7 +335,7 @@ def _raw(self, urls, timeout=10, options=None, insecure=False,
urls=urls, timeout=timeout, options=options, insecure=insecure,
alpn_proto=alpn_proto, force_resolve=force_resolve,
with_headers=with_headers)
r = self._run(args, with_stats=with_stats)
r = self._run(args, intext=intext, with_stats=with_stats)
if r.exit_code == 0 and with_headers:
self._parse_headerfile(self._headerfile, r=r)
if r.json:
Expand Down
3 changes: 3 additions & 0 deletions tests/tests-httpd/testenv/httpd.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ def _curltest_conf(self) -> List[str]:
f' <Location /curltest/echo>',
f' SetHandler curltest-echo',
f' </Location>',
f' <Location /curltest/put>',
f' SetHandler curltest-put',
f' </Location>',
f' <Location /curltest/tweak>',
f' SetHandler curltest-tweak',
f' </Location>',
Expand Down
103 changes: 103 additions & 0 deletions tests/tests-httpd/testenv/mod_curltest/mod_curltest.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

static void curltest_hooks(apr_pool_t *pool);
static int curltest_echo_handler(request_rec *r);
static int curltest_put_handler(request_rec *r);
static int curltest_tweak_handler(request_rec *r);

AP_DECLARE_MODULE(curltest) = {
Expand Down Expand Up @@ -81,6 +82,7 @@ static void curltest_hooks(apr_pool_t *pool)

/* curl test handlers */
ap_hook_handler(curltest_echo_handler, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_handler(curltest_put_handler, NULL, NULL, APR_HOOK_MIDDLE);
ap_hook_handler(curltest_tweak_handler, NULL, NULL, APR_HOOK_MIDDLE);
}

Expand Down Expand Up @@ -409,3 +411,104 @@ static int curltest_tweak_handler(request_rec *r)
}
return AP_FILTER_ERROR;
}

static int curltest_put_handler(request_rec *r)
{
conn_rec *c = r->connection;
apr_bucket_brigade *bb;
apr_bucket *b;
apr_status_t rv;
char buffer[16*1024];
const char *ct;
apr_off_t rbody_len = 0;
const char *request_id = "none";
apr_time_t chunk_delay = 0;
apr_array_header_t *args = NULL;
long l;
int i;

if(strcmp(r->handler, "curltest-put")) {
return DECLINED;
}
if(r->method_number != M_PUT) {
return DECLINED;
}

if(r->args) {
args = apr_cstr_split(r->args, "&", 1, r->pool);
for(i = 0; i < args->nelts; ++i) {
char *s, *val, *arg = APR_ARRAY_IDX(args, i, char*);
s = strchr(arg, '=');
if(s) {
*s = '\0';
val = s + 1;
if(!strcmp("id", arg)) {
/* just an id for repeated requests with curl's url globbing */
request_id = val;
continue;
}
else if(!strcmp("chunk_delay", arg)) {
rv = duration_parse(&chunk_delay, val, "s");
if(APR_SUCCESS == rv) {
continue;
}
}
}
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "query parameter not "
"understood: '%s' in %s",
arg, r->args);
ap_die(HTTP_BAD_REQUEST, r);
return OK;
}
}

ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: processing");
r->status = 200;
r->clength = -1;
r->chunked = 1;
apr_table_unset(r->headers_out, "Content-Length");
/* Discourage content-encodings */
apr_table_unset(r->headers_out, "Content-Encoding");
apr_table_setn(r->subprocess_env, "no-brotli", "1");
apr_table_setn(r->subprocess_env, "no-gzip", "1");

ct = apr_table_get(r->headers_in, "content-type");
ap_set_content_type(r, ct? ct : "text/plain");

bb = apr_brigade_create(r->pool, c->bucket_alloc);
/* copy any request body into the response */
if((rv = ap_setup_client_block(r, REQUEST_CHUNKED_DECHUNK))) goto cleanup;
if(ap_should_client_block(r)) {
while(0 < (l = ap_get_client_block(r, &buffer[0], sizeof(buffer)))) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r,
"put_handler: read %ld bytes from request body", l);
if(chunk_delay) {
apr_sleep(chunk_delay);
}
rbody_len += l;
}
}
/* we are done */
rv = apr_brigade_printf(bb, NULL, NULL, "%"APR_OFF_T_FMT, rbody_len);
if(APR_SUCCESS != rv) goto cleanup;
b = apr_bucket_eos_create(c->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(bb, b);
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: request read");

rv = ap_pass_brigade(r->output_filters, bb);

cleanup:
if(rv == APR_SUCCESS
|| r->status != HTTP_OK
|| c->aborted) {
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler: done");
return OK;
}
else {
/* no way to know what type of error occurred */
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler failed");
return AP_FILTER_ERROR;
}
return DECLINED;
}

0 comments on commit 97633e8

Please sign in to comment.