Skip to content

Commit

Permalink
feat(*) on_http_response_headers + runloop refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed Apr 13, 2021
1 parent ba670ce commit f6227ca
Show file tree
Hide file tree
Showing 21 changed files with 642 additions and 368 deletions.
4 changes: 2 additions & 2 deletions src/common/proxy_wasm/ngx_proxy_wasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ngx_proxy_wasm_func_lookup(ngx_proxy_wasm_t *pwm, const char *n)
}


static ngx_proxy_wasm_abi_version_t
static ngx_proxy_wasm_abi_version_e
ngx_proxy_wasm_abi_version(ngx_proxy_wasm_t *pwm)
{
size_t i;
Expand Down Expand Up @@ -89,7 +89,7 @@ ngx_proxy_wasm_init(ngx_proxy_wasm_t *pwm)

case NGX_PROXY_WASM_UNKNOWN:
pwm->ecode = NGX_PROXY_WASM_ERR_UNKNOWN_ABI;
ngx_proxy_wasm_log_error(NGX_LOG_EMERG, pwm->log, pwm->ecode, NULL);
ngx_proxy_wasm_log_error(NGX_LOG_WASM_NYI, pwm->log, pwm->ecode, NULL);
return NGX_ERROR;

default:
Expand Down
12 changes: 8 additions & 4 deletions src/common/proxy_wasm/ngx_proxy_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ typedef enum {
NGX_PROXY_WASM_0_2_1 = 2,
NGX_PROXY_WASM_VNEXT = 3,
NGX_PROXY_WASM_UNKNOWN = 4,
} ngx_proxy_wasm_abi_version_t;
} ngx_proxy_wasm_abi_version_e;


typedef enum {
Expand Down Expand Up @@ -133,9 +133,6 @@ struct ngx_proxy_wasm_s {
ngx_log_t *log;
ngx_wavm_module_t *module;
ngx_wavm_linked_module_t *lmodule;
ngx_wavm_instance_t *instance;
ngx_wavm_ctx_t wvctx;
ngx_proxy_wasm_abi_version_t abi_version;

/* dyn config */

Expand All @@ -155,6 +152,10 @@ struct ngx_proxy_wasm_s {
* - tested: 0.1.0
*/

ngx_wavm_ctx_t wvctx;
ngx_wavm_instance_t *instance;
ngx_proxy_wasm_abi_version_e abi_version;

ngx_wavm_funcref_t *proxy_on_memory_allocate;

/* context */
Expand Down Expand Up @@ -213,6 +214,9 @@ struct ngx_proxy_wasm_s {

ngx_wavm_funcref_t *proxy_on_custom_callback;

/* control flow: flags */

unsigned trap:1;
};


Expand Down
113 changes: 27 additions & 86 deletions src/common/proxy_wasm/ngx_proxy_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ngx_proxy_wasm_get_map(ngx_wavm_instance_t *instance,
return &r->headers_in.headers;

default:
ngx_wasm_log_error(NGX_LOG_ALERT, instance->log, 0,
ngx_wasm_log_error(NGX_LOG_WASM_NYI, instance->log, 0,
"NYI - get_map: %d", map_type);
return NULL;

Expand Down Expand Up @@ -135,7 +135,7 @@ ngx_proxy_wasm_hfuncs_proxy_log(ngx_wavm_instance_t *instance,
break;

default:
ngx_wasm_log_error(NGX_LOG_ALERT, instance->log, 0,
ngx_wasm_log_error(NGX_LOG_WASM_NYI, instance->log, 0,
"NYI: unknown log level \"%d\"", log_level);

return ngx_proxy_wasm_result_badarg(rets);
Expand Down Expand Up @@ -274,24 +274,16 @@ static ngx_int_t
ngx_proxy_wasm_hfuncs_send_http_response(ngx_wavm_instance_t *instance,
wasm_val_t args[], wasm_val_t rets[])
{
size_t i;
int32_t status, reason_len, body_len, headers_len
int32_t status, reason_len, body_len, headers_len
#if (NGX_DEBUG)
, grpc_status
, grpc_status
#endif
;
u_char *reason, *body, *marsh_headers, *p;
ngx_http_wasm_req_ctx_t *rctx = instance->ctx->data;
ngx_http_request_t *r = rctx->r;
ngx_int_t rc;
ngx_array_t *headers;
ngx_table_elt_t *elt;
ngx_buf_t *b;
ngx_chain_t *cl = NULL;

if (r->connection->fd == NGX_WASM_BAD_FD || rctx->sent_last) {
return NGX_WAVM_BAD_CTX;
}
;
u_char *reason, *body, *marsh_headers;
ngx_int_t rc;
ngx_array_t *headers = NULL;
ngx_http_wasm_req_ctx_t *rctx = instance->ctx->data;
ngx_http_request_t *r = rctx->r;

status = args[0].of.i32;
reason = ngx_wavm_memory_lift(instance->memory, args[1].of.i32);
Expand All @@ -306,98 +298,47 @@ ngx_proxy_wasm_hfuncs_send_http_response(ngx_wavm_instance_t *instance,
ngx_wasm_assert(grpc_status == -1);
#endif

/* status */

if (status < 100 || status > 999) {
return ngx_proxy_wasm_result_badarg(rets);
}

r->headers_out.status = status;

if (r->err_status) {
r->err_status = 0;
}

/* reason */

if (reason_len) {
reason_len += 5; /* "ddd <reason>\0" */
p = ngx_palloc(r->pool, reason_len);
ngx_snprintf(p, reason_len, "%03ui %s", status, reason);

r->headers_out.status_line.data = p;
r->headers_out.status_line.len = reason_len - 1;
}

/* headers */

if (headers_len) {
/* free (ngx_array_destroy): r->pool */
headers = ngx_proxy_wasm_pairs_unmarshal(r->pool, marsh_headers,
headers_len);
if (headers == NULL) {
return ngx_proxy_wasm_result_err(rets);
}

for (i = 0; i < headers->nelts; i++) {
elt = &((ngx_table_elt_t *) headers->elts)[i];

rc = ngx_http_wasm_set_resp_header(r, elt->key, elt->value, 1);
if (rc != NGX_OK) {
return ngx_proxy_wasm_result_err(rets);
}
}
}

/* body */

if (body_len) {
b = ngx_create_temp_buf(r->pool, body_len + sizeof(LF));
if (b == NULL) {
return ngx_proxy_wasm_result_err(rets);
}
rc = ngx_http_wasm_stash_local_response(rctx, status, reason, reason_len,
headers, body, body_len);
switch (rc) {

b->last = ngx_copy(b->last, body, body_len);
//*b->last++ = LF;
case NGX_OK:
return ngx_proxy_wasm_result_ok(rets);

b->last_buf = 1;
b->last_in_chain = 1;

cl = ngx_alloc_chain_link(r->connection->pool);
if (cl == NULL) {
return ngx_proxy_wasm_result_err(rets);
}
case NGX_ERROR:
return ngx_proxy_wasm_result_err(rets);

cl->buf = b;
cl->next = NULL;
case NGX_DECLINED:
return ngx_proxy_wasm_result_badarg(rets);

if (ngx_http_wasm_set_resp_content_length(r, body_len) != NGX_OK) {
return ngx_proxy_wasm_result_err(rets);
}
}
case NGX_ABORT:
return NGX_WAVM_BAD_USAGE;

rc = ngx_http_wasm_send_chain_link(r, cl);
if (rc == NGX_ERROR) {
return NGX_WAVM_ERROR;
default:
break;

} else if (rc == NGX_AGAIN) {
/* TODO: NYI - NGX_WAVM_AGAIN */
return NGX_AGAIN;
}

/* NGX_OK */
/* unreachable */

rctx->sent_last = 1;

return ngx_proxy_wasm_result_ok(rets);
ngx_wasm_assert(0);
return NGX_WAVM_BUSY;
}


static ngx_int_t
ngx_proxy_wasm_hfuncs_nop(ngx_wavm_instance_t *instance,
wasm_val_t args[], wasm_val_t rets[])
{
ngx_wasm_log_error(NGX_LOG_ALERT, instance->log, 0,
ngx_wasm_log_error(NGX_LOG_WASM_NYI, instance->log, 0,
"NYI: proxy_wasm hfunc");

return NGX_WAVM_BUSY; /* NYI */
Expand Down
30 changes: 26 additions & 4 deletions src/http/ngx_http_wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,45 @@ typedef struct {

/* control flow */

ngx_http_handler_pt r_content_handler;

ngx_int_t local_resp_status;
ngx_str_t local_resp_reason;
ngx_array_t *local_resp_headers;
ngx_chain_t *local_resp_body;
size_t local_resp_body_len;
unsigned local_resp:1;
unsigned sent_last:1;
unsigned finalized:1;
} ngx_http_wasm_req_ctx_t;


void ngx_http_wasm_header_filter_init(void);
/* ops */
ngx_int_t ngx_http_wasm_stash_local_response(ngx_http_wasm_req_ctx_t *rctx,
ngx_int_t status, u_char *reason, size_t reason_len, ngx_array_t *headers,
u_char *body, size_t body_len);
ngx_int_t ngx_http_wasm_flush_local_response(ngx_http_request_t *r,
ngx_http_wasm_req_ctx_t *rctx);


ngx_uint_t ngx_http_wasm_req_headers_count(ngx_http_request_t *r);
ngx_int_t ngx_http_wasm_send_header(ngx_http_request_t *r);
ngx_int_t ngx_http_wasm_send_chain_link(ngx_http_request_t *r, ngx_chain_t *in);
/* utils */
ngx_int_t ngx_http_wasm_set_resp_header(ngx_http_request_t *r, ngx_str_t key,
ngx_str_t value, unsigned override);
ngx_int_t ngx_http_wasm_set_resp_content_length(ngx_http_request_t *r,
off_t cl);
uintptr_t ngx_http_wasm_escape(u_char *dst, u_char *src, size_t size,
ngx_http_wasm_escape_kind kind);
ngx_int_t ngx_http_wasm_send_chain_link(ngx_http_request_t *r, ngx_chain_t *in);
static ngx_inline ngx_uint_t
ngx_http_wasm_req_headers_count(ngx_http_request_t *r)
{
return ngx_wasm_list_nelts(&r->headers_in.headers);
}
static ngx_inline ngx_uint_t
ngx_http_wasm_resp_headers_count(ngx_http_request_t *r)
{
return ngx_wasm_list_nelts(&r->headers_out.headers);
}

extern ngx_wavm_host_def_t ngx_http_wasm_host_interface;

Expand Down
76 changes: 41 additions & 35 deletions src/http/ngx_http_wasm_host.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,52 +75,58 @@ ngx_int_t
ngx_http_wasm_hfuncs_resp_say(ngx_wavm_instance_t *instance,
wasm_val_t args[], wasm_val_t rets[])
{
size_t len;
u_char *body;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_http_wasm_req_ctx_t *rctx = instance->ctx->data;
ngx_http_request_t *r = rctx->r;
size_t len;
u_char *body;
ngx_int_t rc;
ngx_buf_t *b;
ngx_chain_t *cl;
ngx_http_wasm_req_ctx_t *rctx = instance->ctx->data;
ngx_http_request_t *r = rctx->r;

body = ngx_wavm_memory_lift(instance->memory, args[0].of.i32);
len = args[1].of.i32;
body = ngx_wavm_memory_lift(instance->memory, args[0].of.i32);
len = args[1].of.i32;

if (r->connection->fd == (ngx_socket_t) -1) {
return NGX_WAVM_BAD_CTX;
}
if (r->connection->fd == (ngx_socket_t) -1) {
return NGX_WAVM_BAD_CTX;
}

b = ngx_create_temp_buf(r->pool, len + sizeof(LF));
if (b == NULL) {
return NGX_WAVM_ERROR;
}
if (rctx->finalized) {
return NGX_WAVM_BAD_USAGE;
}

b->last = ngx_copy(b->last, body, len);
*b->last++ = LF;
b = ngx_create_temp_buf(r->pool, len + sizeof(LF));
if (b == NULL) {
return NGX_WAVM_ERROR;
}

b->last_buf = 1;
b->last_in_chain = 1;
b->last = ngx_copy(b->last, body, len);
*b->last++ = LF;

cl = ngx_alloc_chain_link(r->connection->pool);
if (cl == NULL) {
return NGX_WAVM_ERROR;
}
b->last_buf = 1;
b->last_in_chain = 1;

cl->buf = b;
cl->next = NULL;
cl = ngx_alloc_chain_link(r->connection->pool);
if (cl == NULL) {
return NGX_WAVM_ERROR;
}

rc = ngx_http_wasm_send_chain_link(r, cl);
if (rc == NGX_ERROR) {
return NGX_WAVM_ERROR;
cl->buf = b;
cl->next = NULL;

} else if (rc == NGX_AGAIN) {
/* TODO: NYI - NGX_WAVM_AGAIN */
return NGX_AGAIN;
}
rc = ngx_http_wasm_send_chain_link(r, cl);
if (rc == NGX_ERROR) {
return NGX_WAVM_ERROR;

rctx->sent_last = 1;
} else if (rc == NGX_AGAIN) {
/* TODO: NYI - NGX_WAVM_AGAIN */
return NGX_WAVM_AGAIN;
}

return NGX_WAVM_OK;
ngx_wasm_assert(rc == NGX_DONE || rc == NGX_OK);

rctx->sent_last = 1;

return NGX_WAVM_OK;
}


Expand Down

0 comments on commit f6227ca

Please sign in to comment.