Skip to content

Commit

Permalink
fix(proxy-wasm) resume content phase after request_body read EAGAIN
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed Jun 10, 2024
1 parent 64b5d7b commit da0bb17
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 31 deletions.
8 changes: 7 additions & 1 deletion src/common/ngx_wasm_subsystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@
(env)->state = NGX_WASM_STATE_YIELD; \
ngx_wasm_set_resume_handler(env)

#define ngx_wasm_continuing(env) \
((env)->state == NGX_WASM_STATE_CONTINUE)

#define ngx_wasm_yielding(env) \
(env)->state == NGX_WASM_STATE_YIELD
((env)->state == NGX_WASM_STATE_YIELD)

#define ngx_wasm_errored(env) \
((env)->state == NGX_WASM_STATE_ERROR)

#define ngx_wasm_bad_subsystem(env) \
ngx_wasm_log_error(NGX_LOG_WASM_NYI, env->connection->log, 0, \
Expand Down
6 changes: 5 additions & 1 deletion src/http/ngx_http_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ struct ngx_http_wasm_req_ctx_s {
unsigned req_keepalive:1; /* r->keepalive copy */
unsigned reset_resp_shims:1;
unsigned entered_content_phase:1; /* entered content handler */
unsigned exited_content_phase:1; /* executed content handler at least once */
unsigned content_executed:1; /* executed content handler at least once */
unsigned in_req_body_handler:1; /* content invoked from read_request_body handler */
unsigned req_body_waited:1; /* read_request_body yielded at least once */
unsigned req_body_received:1; /* read_request_body finished */
unsigned entered_header_filter:1; /* entered header_filter handler */
unsigned entered_body_filter:1; /* entered body_filter handler */
unsigned entered_log_phase:1; /* entered log phase */
Expand Down Expand Up @@ -120,6 +123,7 @@ ngx_int_t ngx_http_wasm_stash_local_response(ngx_http_wasm_req_ctx_t *rctx,
void ngx_http_wasm_discard_local_response(ngx_http_wasm_req_ctx_t *rctx);
ngx_int_t ngx_http_wasm_flush_local_response(ngx_http_wasm_req_ctx_t *rctx);
ngx_int_t ngx_http_wasm_produce_resp_headers(ngx_http_wasm_req_ctx_t *rctx);
ngx_int_t ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx);


/* directives */
Expand Down
35 changes: 20 additions & 15 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ static ngx_int_t ngx_http_wasm_rewrite_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_wasm_preaccess_handler(ngx_http_request_t *r);
#endif
static ngx_int_t ngx_http_wasm_access_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx);
static ngx_int_t ngx_http_wasm_content_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_wasm_log_handler(ngx_http_request_t *r);
static ngx_int_t ngx_http_wasm_check_finalize(ngx_http_wasm_req_ctx_t *rctx,
Expand Down Expand Up @@ -935,7 +934,7 @@ ngx_http_wasm_access_handler(ngx_http_request_t *r)
}


static ngx_int_t
ngx_int_t
ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)
{
ngx_int_t rc;
Expand Down Expand Up @@ -977,10 +976,10 @@ ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)

goto finalize;
case NGX_DECLINED:
if (rctx->exited_content_phase) {
if (rctx->content_executed && !rctx->req_body_received) {
/* Content phase already ran, no stashed response.
* Do not resume ops again and run the orig content
* handler instead */
* handler instead. */
goto orig;
}

Expand All @@ -991,23 +990,26 @@ ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)
}

rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_CONTENT_PHASE);
if (rctx->env.state == NGX_WASM_STATE_ERROR) {
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
}

dd("content ops resume rc: %ld", rc);
rc = ngx_http_wasm_check_finalize(rctx, rc);
if (rc == NGX_ERROR
|| rc == NGX_AGAIN
|| rc == NGX_DONE
|| rc >= NGX_HTTP_SPECIAL_RESPONSE)
{
if (rc == NGX_AGAIN && r == r->main) {
r->main->count++;
dd("r->main->count++: %d", r->main->count);
if (rc == NGX_AGAIN) {
if (r == r->main
&& !rctx->req_body_waited
&& !rctx->in_req_body_handler)
{
r->main->count++;
dd("r->main->count++: %d", r->main->count);
}

rc = NGX_DONE;
}

dd("skip to finalize (rc: %ld)", rc);
goto finalize;
}

Expand Down Expand Up @@ -1055,12 +1057,16 @@ ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)

dd("finalize rc: %ld", rc);

rctx->exited_content_phase = 1;
rctx->content_executed = 1;

if (r->main->count == 1
if (rctx->in_wev
&& r->main->count == 1
&& rc != NGX_DECLINED
&& rc <= NGX_OK)
{
/* We must avoid reading from rctx after ngx_http_wasm_check_finalize,
* so we use this variable to determine what is explained in the
* comment below. */
done = 1;
}

Expand All @@ -1072,8 +1078,7 @@ ngx_http_wasm_content(ngx_http_wasm_req_ctx_t *rctx)
* ngx_http_wasm_wev_handler...). We should break them down.
*
* NGX_ABORT: no additional ngx_http_finalize_request in caller (for
* wev handler invocations).
*/
* wev handler invocations). */
rc = NGX_ABORT;
}

Expand Down
4 changes: 3 additions & 1 deletion src/http/ngx_http_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ ngx_http_wasm_read_client_request_body(ngx_http_request_t *r,
r->request_body_in_single_buf = 1;

rc = ngx_http_read_client_request_body(r, post_handler);
dd("read_client_request_body rc: %ld", rc);
if (rc < NGX_HTTP_SPECIAL_RESPONSE
&& rc != NGX_ERROR)
&& rc != NGX_AGAIN)
{
r->main->count--;
dd("r->main->count--: %ld", r->main->count);
}

return rc;
Expand Down
22 changes: 15 additions & 7 deletions src/http/proxy_wasm/ngx_http_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,23 @@ ngx_http_proxy_wasm_on_request_body_handler(ngx_http_request_t *r)
if (len) {
pwctx = (ngx_proxy_wasm_ctx_t *) rctx->data;
pwctx->req_body_len = len;
}

rc = ngx_proxy_wasm_resume(pwctx, pwctx->phase,
NGX_PROXY_WASM_STEP_REQ_BODY);
if (rc == NGX_AGAIN) {
ngx_wasm_yield(&rctx->env);
rctx->req_body_received = 1;

} else if (rc == NGX_ERROR || rc == NGX_HTTP_INTERNAL_SERVER_ERROR) {
ngx_wasm_error(&rctx->env);
}
dd("enter content");
rctx->in_req_body_handler = 1;

(void) ngx_http_wasm_content(rctx);

rctx->in_req_body_handler = 0;
dd("exit content");

if (rctx->req_body_waited
&& !ngx_wasm_yielding(&rctx->env))
{
/* decrement r->count like wev_handler last_finalize */
ngx_http_finalize_request(r, rc);
}

dd("exit (len: %ld)", len);
Expand Down
50 changes: 46 additions & 4 deletions src/wasm/ngx_wasm_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,52 @@ ngx_wasm_op_proxy_wasm_handler(ngx_wasm_op_ctx_t *opctx,
r->headers_in.content_length_n = rctx->req_content_length_n;
}

rc = ngx_http_wasm_read_client_request_body(r,
ngx_http_proxy_wasm_on_request_body_handler);
if (rc == NGX_OK && ngx_wasm_yielding(&rctx->env)) {
rc = NGX_AGAIN;
if (!rctx->req_body_received) {
rc = ngx_http_wasm_read_client_request_body(r,
ngx_http_proxy_wasm_on_request_body_handler);
if (rc == NGX_OK) {
if (ngx_wasm_yielding(&rctx->env)) {
/* yield in request_body content invocation */
rc = NGX_AGAIN;

} else if (ngx_wasm_errored(&rctx->env)) {
/* error in request_body content invocation */
rc = NGX_HTTP_INTERNAL_SERVER_ERROR;

} else if (rctx->req_body_received
&& !rctx->resp_content_chosen)
{
/* request_body content invocation finished normally, no
* content produced */
rc = NGX_DONE;
}

} else if (rc == NGX_AGAIN) {
rctx->req_body_waited = 1;
}

} else if (pwctx->req_body_len) {
rc = ngx_proxy_wasm_resume(pwctx, pwctx->phase,
NGX_PROXY_WASM_STEP_REQ_BODY);
switch (rc) {
case NGX_ERROR:
case NGX_HTTP_INTERNAL_SERVER_ERROR:
ngx_wasm_error(&rctx->env);
break;
case NGX_AGAIN:
ngx_wasm_yield(&rctx->env);
break;
case NGX_DONE:
rc = NGX_OK;
/* fallthrough */
default:
ngx_wa_assert(rc == NGX_OK);
break;
}

} else {
/* received empty body */
rc = NGX_OK;
}

break;
Expand Down
3 changes: 1 addition & 2 deletions t/03-proxy_wasm/006-on_http_next_action.t
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ pausing after "RequestHeaders"
--- request
POST /t
Hello world
--- error_code: 200
--- error_code:
--- response_body
nop
--- error_log
pausing after "RequestBody"
--- no_error_log
Expand Down
66 changes: 66 additions & 0 deletions t/03-proxy_wasm/010-eagain_sanity.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:

BEGIN {
$ENV{MOCKEAGAIN} = 'rw';
$ENV{MOCKEAGAIN_VERBOSE} ||= 0;
$ENV{TEST_NGINX_EVENT_TYPE} = 'poll';
}

use strict;
use lib '.';
use t::TestWasmX;

plan_tests(5);
run_tests();

__DATA__

=== TEST 1: EAGAIN - reading request body
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: on_phases
--- config
location /t {
proxy_wasm on_phases;
echo_sleep 0.3;
echo ok;
}
--- request
POST /t

Hello world
--- response_body
ok
--- grep_error_log eval: qr/#\d+ on_(request|log).*?(?=(, client|\s+while))/
--- grep_error_log_out eval
qr/\A#\d+ on_request_headers, 3 headers, eof: false
#\d+ on_request_body, 11 bytes, eof: true
#\d+ on_log\Z/
--- no_error_log
[error]
[crit]



=== TEST 2: EAGAIN - reading request body in subrequest
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: on_phases
--- config
location /subrequest {
internal;
proxy_wasm on_phases;
echo_sleep 0.3;
echo ok;
}

location /t {
echo_subrequest POST '/subrequest' -b 'Hello world';
}
--- response_body
ok
--- grep_error_log eval: qr/#\d+ on_(request|log).*?(?=(, client|\s+while))/
--- grep_error_log_out eval
qr/\A#\d+ on_request_headers, 3 headers, eof: false
#\d+ on_request_body, 11 bytes, eof: true\Z/
--- no_error_log
[error]
[crit]
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ hello world


=== TEST 12: Lua bridge - proxy_wasm_lua_resolver, IPv6 record
--- SKIP
--- skip_eval: 5: system("cat /sys/module/ipv6/parameters/disable") ne 0 || defined $ENV{GITHUB_ACTIONS}
--- timeout eval: $::ExtTimeout
--- load_nginx_modules: ngx_http_echo_module
Expand Down

0 comments on commit da0bb17

Please sign in to comment.