Skip to content

Commit

Permalink
feat(proxy-wasm) support 'send_local_response' during ResponseHeaders
Browse files Browse the repository at this point in the history
Also support 'Pause' return code from ResponseHeaders.
  • Loading branch information
thibaultcha committed Feb 9, 2024
1 parent e8e9a59 commit c81997e
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 52 deletions.
22 changes: 22 additions & 0 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ action2rc(ngx_proxy_wasm_ctx_t *pwctx,
ngx_proxy_wasm_exec_t *pwexecs;
ngx_proxy_wasm_filter_t *filter;
ngx_proxy_wasm_action_e action;
#ifdef NGX_WASM_HTTP
ngx_wavm_instance_t *instance;
ngx_http_wasm_req_ctx_t *rctx;
#endif

filter = pwexec->filter;
action = pwctx->action;
Expand Down Expand Up @@ -498,6 +502,24 @@ action2rc(ngx_proxy_wasm_ctx_t *pwctx,
case NGX_PROXY_WASM_ACTION_PAUSE:
switch (pwctx->phase->index) {
#ifdef NGX_WASM_HTTP
case NGX_HTTP_WASM_HEADER_FILTER_PHASE:
instance = ngx_proxy_wasm_pwexec2instance(pwexec);
rctx = ngx_http_proxy_wasm_get_rctx(instance);

ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwctx->log, 0,
"proxy_wasm producing local response in "
"\"ResponseHeaders\" step "
"(filter: %l/%l, pwctx: %p)",
pwexec->index + 1, pwctx->nfilters, pwctx);

if (!rctx->resp_chunk_override) {
ngx_proxy_wasm_log_error(NGX_LOG_WARN, pwexec->log, 0,
"\"ResponseHeaders\" returned "
"\"PAUSE\": local response expected "
"but none produced");
}

goto yield;
case NGX_HTTP_WASM_BODY_FILTER_PHASE:
ngx_log_debug3(NGX_LOG_DEBUG_WASM, pwctx->log, 0,
"proxy_wasm buffering response after "
Expand Down
36 changes: 35 additions & 1 deletion src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,16 +1008,19 @@ static ngx_int_t
ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,
wasm_val_t args[], wasm_val_t rets[])
{
int32_t status, reason_len, body_len;
int32_t status, reason_len, body_len, cl;
#if (NGX_DEBUG)
int32_t grpc_status;
#endif
u_char *reason, *body;
ngx_int_t rc;
ngx_str_t s;
ngx_array_t headers;
ngx_proxy_wasm_marshalled_map_t map;
ngx_proxy_wasm_exec_t *pwexec;
ngx_http_wasm_req_ctx_t *rctx;
ngx_http_request_t *r;
static ngx_str_t lf = ngx_string("\n");

rctx = ngx_http_proxy_wasm_get_rctx(instance);
pwexec = ngx_proxy_wasm_instance2pwexec(instance);
Expand All @@ -1040,6 +1043,37 @@ ngx_proxy_wasm_hfuncs_send_local_response(ngx_wavm_instance_t *instance,
return ngx_proxy_wasm_result_err(rets);
}

if (rctx->entered_header_filter && !rctx->entered_body_filter) {
r = rctx->r;
cl = body_len;
s.data = body;
s.len = body_len;

rc = ngx_http_wasm_set_resp_body(rctx, &s, 0, s.len);
if (rc != NGX_OK) {
return ngx_proxy_wasm_result_err(rets);
}

if (body_len) {
/* append linefeed */
cl++;

rc = ngx_wasm_chain_append(r->connection->pool, &rctx->resp_chunk,
body_len, &lf, &rctx->free_bufs,
rctx->env.buf_tag, 0);
if (rc != NGX_OK) {
return ngx_proxy_wasm_result_err(rets);
}
}

ngx_http_wasm_set_resp_status(rctx, status, reason, reason_len);
ngx_http_wasm_set_resp_content_length(r, cl);

rctx->resp_chunk_override = 1;

return ngx_proxy_wasm_result_ok(rets);
}

rc = ngx_http_wasm_stash_local_response(rctx, status, reason, reason_len,
&headers, body, body_len);

Expand Down
8 changes: 8 additions & 0 deletions src/http/ngx_http_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,19 @@ struct ngx_http_wasm_req_ctx_s {

/* flags */

unsigned local_resp_flushed:1; /* local response can only be flushed once */
unsigned sock_buffer_reuse:1; /* convenience alias to loc->socket_buffer_reuse */
unsigned req_keepalive:1; /* r->keepalive copy */
unsigned reset_resp_shims:1;
unsigned entered_content_phase:1; /* entered content handler */
unsigned exited_content_phase:1; /* executed content handler at least once */
unsigned entered_header_filter:1; /* entered header_filter handler */
unsigned entered_body_filter:1; /* entered body_filter handler */
unsigned entered_log_phase:1; /* entered log phase */

unsigned in_wev:1; /* in wev_handler */
unsigned resp_content_chosen:1; /* content handler has an output to produce */
unsigned resp_chunk_override:1; /* override response chunk in body_filter handler */
unsigned resp_buffering:1; /* enable response buffering */
unsigned resp_content_sent:1; /* has started sending output (may have yielded) */
unsigned resp_finalized:1; /* finalized connection (ourselves) */
Expand Down Expand Up @@ -116,6 +120,8 @@ typedef struct {
/* http */
ngx_int_t ngx_http_wasm_rctx(ngx_http_request_t *r,
ngx_http_wasm_req_ctx_t **out);
ngx_int_t ngx_http_wasm_set_local_response_body(ngx_http_wasm_req_ctx_t *rctx,
u_char *body, size_t body_len);
ngx_int_t ngx_http_wasm_stash_local_response(ngx_http_wasm_req_ctx_t *rctx,
ngx_int_t status, u_char *reason, size_t reason_len, ngx_array_t *headers,
u_char *body, size_t body_len);
Expand Down Expand Up @@ -154,6 +160,8 @@ ngx_uint_t ngx_http_wasm_count_shim_headers(ngx_http_wasm_req_ctx_t *rctx);
/* payloads */
ngx_int_t ngx_http_wasm_set_req_body(ngx_http_wasm_req_ctx_t *rctx,
ngx_str_t *body, size_t at, size_t max);
void ngx_http_wasm_set_resp_status(ngx_http_wasm_req_ctx_t *rctx,
ngx_int_t status, u_char *reason, size_t reason_len);
ngx_int_t ngx_http_wasm_set_resp_body(ngx_http_wasm_req_ctx_t *rctx,
ngx_str_t *body, size_t at, size_t max);
ngx_int_t ngx_http_wasm_prepend_req_body(ngx_http_wasm_req_ctx_t *rctx,
Expand Down
20 changes: 20 additions & 0 deletions src/http/ngx_http_wasm_filter_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ ngx_http_wasm_header_filter_handler(ngx_http_request_t *r)
goto done;
}
}
#if (NGX_DEBUG)
else if (rc == NGX_AGAIN) {
/**
* Proxy-Wasm idiom for "local response in response headers",
* log potential usage and ignore rc.
*/
ngx_log_debug2(NGX_LOG_DEBUG_WASM, r->connection->log, 0,
"wasm \"header_filter\" ops resume rc: %d "
"(resp_chunk_override: %d)", rc,
rctx->resp_chunk_override);
}
#endif

rc = ngx_http_next_header_filter(r);

Expand Down Expand Up @@ -160,6 +172,14 @@ ngx_http_wasm_body_filter_handler(ngx_http_request_t *r, ngx_chain_t *in)
}
}

if (rctx->resp_chunk_override) {
rctx->resp_chunk_override = 0;

in = rctx->resp_chunk;
}

rctx->entered_body_filter = 1;

ngx_http_wasm_body_filter_resume(rctx, in);

if (rctx->resp_buffering) {
Expand Down
24 changes: 11 additions & 13 deletions src/http/ngx_http_wasm_local_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ ngx_http_wasm_flush_local_response(ngx_http_wasm_req_ctx_t *rctx)
return NGX_DECLINED;
}

if (rctx->local_resp_flushed) {
ngx_wasm_assert(0);
return NGX_ABORT;
}

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"wasm flushing local_response");

Expand All @@ -196,16 +201,10 @@ ngx_http_wasm_flush_local_response(ngx_http_wasm_req_ctx_t *rctx)
return NGX_ERROR;
}

r->headers_out.status = rctx->local_resp_status;

if (r->err_status) {
r->err_status = 0;
}

if (rctx->local_resp_reason.len) {
r->headers_out.status_line.data = rctx->local_resp_reason.data;
r->headers_out.status_line.len = rctx->local_resp_reason.len;
}
ngx_http_wasm_set_resp_status(rctx,
rctx->local_resp_status,
rctx->local_resp_reason.data,
rctx->local_resp_reason.len);

for (i = 0; i < rctx->local_resp_headers.nelts; i++) {
elt = &((ngx_table_elt_t *) rctx->local_resp_headers.elts)[i];
Expand All @@ -231,9 +230,8 @@ ngx_http_wasm_flush_local_response(ngx_http_wasm_req_ctx_t *rctx)
return NGX_ERROR;
}

rc = ngx_http_wasm_send_chain_link(r, rctx->local_resp_body);

rctx->local_resp_status = 0;
rctx->local_resp_flushed = 1;

return rc;
return ngx_http_wasm_send_chain_link(r, rctx->local_resp_body);
}
2 changes: 2 additions & 0 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,8 @@ ngx_http_wasm_log_handler(ngx_http_request_t *r)
return NGX_ERROR;
}

rctx->entered_log_phase = 1;

rc = ngx_wasm_ops_resume(&rctx->opctx, NGX_HTTP_LOG_PHASE);

#if (NGX_HTTP_WASM_DONE_IN_LOG)
Expand Down
44 changes: 40 additions & 4 deletions src/http/ngx_http_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,44 @@ ngx_http_wasm_prepend_req_body(ngx_http_wasm_req_ctx_t *rctx, ngx_str_t *body)
}


void
ngx_http_wasm_set_resp_status(ngx_http_wasm_req_ctx_t *rctx,
ngx_int_t status, u_char *reason, size_t reason_len)
{
ngx_http_request_t *r = rctx->r;

r->headers_out.status = status;

if (r->err_status) {
r->err_status = 0;
}

if (reason) {
r->headers_out.status_line.data = reason;
}

r->headers_out.status_line.len = reason_len;
}


ngx_int_t
ngx_http_wasm_set_resp_body(ngx_http_wasm_req_ctx_t *rctx, ngx_str_t *body,
size_t at, size_t max)
{
ngx_http_request_t *r = rctx->r;

if (rctx->resp_chunk == NULL) {
return NGX_ABORT;
if (rctx->entered_log_phase) {
return NGX_ABORT;
}

rctx->resp_chunk = ngx_wasm_chain_get_free_buf(rctx->pool,
&rctx->free_bufs,
body->len,
rctx->env.buf_tag, 1);
if (rctx->resp_chunk == NULL) {
return NGX_ERROR;
}
}

if (r->header_sent && !r->chunked) {
Expand All @@ -339,7 +369,7 @@ ngx_http_wasm_set_resp_body(ngx_http_wasm_req_ctx_t *rctx, ngx_str_t *body,
body->len = ngx_min(body->len, max);

if (ngx_wasm_chain_append(r->connection->pool, &rctx->resp_chunk, at, body,
&rctx->free_bufs, buf_tag, 0)
&rctx->free_bufs, rctx->env.buf_tag, 0)
!= NGX_OK)
{
return NGX_ERROR;
Expand All @@ -349,8 +379,14 @@ ngx_http_wasm_set_resp_body(ngx_http_wasm_req_ctx_t *rctx, ngx_str_t *body,
&rctx->resp_chunk_eof);

if (!rctx->resp_chunk_len) {
/* discard chunk */
rctx->resp_chunk = NULL;
if (rctx->entered_body_filter) {
/* discard chunk */
rctx->resp_chunk = NULL;

} else {
rctx->resp_chunk->buf->last_buf = 1;
rctx->resp_chunk->buf->last_in_chain = 1;
}
}

return NGX_OK;
Expand Down
2 changes: 2 additions & 0 deletions src/wasm/ngx_wasm_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ngx_wasm_chain_clear(ngx_chain_t *in, size_t offset, unsigned *eof,
pos += len;
}

buf->flush = 0;
buf->last_buf = 0;
buf->last_in_chain = 0;
}
Expand Down Expand Up @@ -273,6 +274,7 @@ ngx_wasm_chain_append(ngx_pool_t *pool, ngx_chain_t **in, size_t at,
nl = cl;
cl = cl->next;
nl->next = *free;

if (*free) {
*free = nl;
}
Expand Down
28 changes: 16 additions & 12 deletions t/03-proxy_wasm/006-on_http_next_action.t
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,28 @@ pausing after "RequestBody"


=== TEST 3: proxy_wasm - on_response_headers -> Pause
NYI
Expects local response set in headers phase.
--- wasm_modules: on_phases
--- config
location /t {
proxy_wasm on_phases 'pause_on=response_headers';
return 200;
}
--- error_code: 500
--- response_body_like: 500 Internal Server Error
--- error_code: 200
--- response_body
--- error_log eval
[
qr/pausing after "ResponseHeaders"/,
qr#\[error\] .*? bad "on_response_headers" return action: "PAUSE"#,
qr#\[info\] .*? filter chain failed resuming: previous error \(invalid return action\)#
qr/"ResponseHeaders" returned "PAUSE": local response expected but none produced/,
]
--- no_error_log
[error]



=== TEST 4: proxy_wasm - on_response_body -> Pause
Triggers response buffering.
--- skip_no_debug: 5
--- skip_no_debug
--- wasm_modules: on_phases
--- config
location /t {
Expand Down Expand Up @@ -158,7 +159,7 @@ pausing after "RequestBody"


=== TEST 7: proxy_wasm - subrequest on_response_headers -> Pause
NYI
Expects local response set in headers phase.
--- timeout_expected: 1
--- abort
--- load_nginx_modules: ngx_http_echo_module
Expand All @@ -181,19 +182,22 @@ NYI
echo_subrequest_async GET /nop;
}
--- error_code: 200
--- response_body_like: 500 Internal Server Error
--- response_body
ok
ok
--- error_log eval
[
qr/pausing after "ResponseHeaders"/,
qr#\[error\] .*? bad "on_response_headers" return action: "PAUSE"#,
qr#\[info\] .*? filter chain failed resuming: previous error \(invalid return action\)#
qr/"ResponseHeaders" returned "PAUSE": local response expected but none produced/,
]
--- no_error_log
[error]



=== TEST 8: proxy_wasm - subrequest on_response_body -> Pause
NYI
--- skip_no_debug: 5
Triggers response buffering.
--- skip_no_debug
--- load_nginx_modules: ngx_http_echo_module
--- wasm_modules: on_phases
--- config
Expand Down
Loading

0 comments on commit c81997e

Please sign in to comment.