From 21b03f0945c997b30dd12b1bf59d1c3ea45477b6 Mon Sep 17 00:00:00 2001 From: spacewander Date: Mon, 11 Oct 2021 19:01:23 +0800 Subject: [PATCH] feat: run http_request_headers filter --- README.md | 20 +++++++ proxy_wasm_abi.md | 17 ++++++ src/http/ngx_http_wasm_api.c | 25 +++++++-- src/http/ngx_http_wasm_ctx.h | 2 +- src/http/ngx_http_wasm_module.c | 88 ++++++++++++++++++++++--------- src/http/ngx_http_wasm_state.c | 17 +++++- src/http/ngx_http_wasm_state.h | 5 +- src/vm/vm.h | 7 +-- src/vm/wasmtime.c | 11 ++++ t/http_lifecycle.t | 41 ++++++++++++++ t/log.t | 20 +++++++ t/testdata/http_lifecycle/main.go | 11 ++++ t/testdata/log/main.go | 15 ++++++ 13 files changed, 244 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 3402479..28d15e4 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,26 @@ if not ctx then end ``` +### on_http_request_headers + +`syntax: ok, err = proxy_wasm.on_http_request_headers(plugin_ctx)` + +Run the HTTP request headers filter in the plugin of the given plugin ctx. + +```lua +local plugin, err = proxy_wasm.load("t/testdata/plugin_lifecycle/main.go.wasm") +if not plugin then + ngx.log(ngx.ERR, "failed to load wasm ", err) + return +end +local ctx, err = wasm.on_configure(plugin, '{"body":512}') +if not ctx then + ngx.log(ngx.ERR, "failed to create plugin ctx ", err) + return +end +assert(wasm.on_http_request_headers(ctx)) +``` + ## proxy-wasm ABI Implemented proxy-wasm ABI can be found in [proxy_wasm_abi](./proxy_wasm_abi.md). diff --git a/proxy_wasm_abi.md b/proxy_wasm_abi.md index cb7b3ad..e9f5058 100644 --- a/proxy_wasm_abi.md +++ b/proxy_wasm_abi.md @@ -87,6 +87,23 @@ Called when the host environment starts the plugin. Its configuration (`plugin_c might be retrieved using `proxy_get_buffer`. +## HTTP (L7) extensions + +### `proxy_on_request_headers` + +* params: + - `i32 (uint32_t) context_id` + - `i32 (size_t) num_headers` + - `i32 (bool) end_of_stream` +* returns: + - `i32 (proxy_action_t) next_action` + +Called when HTTP request headers are received from the client. TODO: Headers can be retrieved using +`proxy_get_map` and/or `proxy_get_map_value`. + +TODO: pass a correct `num_headers` but not 0. + + # Functions implemented in the host environment All functions implemented in the host environment return `proxy_result_t`, which indicates the diff --git a/src/http/ngx_http_wasm_api.c b/src/http/ngx_http_wasm_api.c index e9ccdb0..678a221 100644 --- a/src/http/ngx_http_wasm_api.c +++ b/src/http/ngx_http_wasm_api.c @@ -47,10 +47,19 @@ proxy_set_effective_context(int32_t id) int32_t proxy_log(int32_t log_level, int32_t addr, int32_t size) { - const u_char *p; - ngx_uint_t host_log_level = NGX_LOG_ERR; + const u_char *p; + ngx_uint_t host_log_level = NGX_LOG_ERR; + ngx_http_request_t *r; + ngx_log_t *log; + + r = ngx_http_wasm_get_req(); + if (r == NULL) { + log = ngx_cycle->log; + } else { + log = r->connection->log; + } - p = ngx_wasm_vm.get_memory(ngx_cycle->log, addr, size); + p = ngx_wasm_vm.get_memory(log, addr, size); if (p == NULL) { return PROXY_RESULT_INVALID_MEMORY_ACCESS; } @@ -81,7 +90,7 @@ proxy_log(int32_t log_level, int32_t addr, int32_t size) break; } - ngx_log_error(host_log_level, ngx_cycle->log, 0, "%*s", size, p); + ngx_log_error(host_log_level, log, 0, "%*s", size, p); return PROXY_RESULT_OK; } @@ -100,8 +109,14 @@ proxy_get_buffer_bytes(int32_t type, int32_t start, int32_t length, u_char *buf; const ngx_str_t *conf; + ngx_http_request_t *r; - log = ngx_cycle->log; + r = ngx_http_wasm_get_req(); + if (r == NULL) { + log = ngx_cycle->log; + } else { + log = r->connection->log; + } switch (type) { case PROXY_BUFFER_TYPE_PLUGIN_CONFIGURATION: diff --git a/src/http/ngx_http_wasm_ctx.h b/src/http/ngx_http_wasm_ctx.h index 0e50521..4c2d80f 100644 --- a/src/http/ngx_http_wasm_ctx.h +++ b/src/http/ngx_http_wasm_ctx.h @@ -35,7 +35,7 @@ typedef struct { typedef struct { - ngx_http_wasm_http_ctx_t *http_ctx; + ngx_array_t *http_ctxs; } ngx_http_wasm_ctx_t; diff --git a/src/http/ngx_http_wasm_module.c b/src/http/ngx_http_wasm_module.c index 05bea71..c9b206e 100644 --- a/src/http/ngx_http_wasm_module.c +++ b/src/http/ngx_http_wasm_module.c @@ -20,6 +20,8 @@ static ngx_str_t proxy_on_context_create = ngx_string("proxy_on_context_create") static ngx_str_t proxy_on_configure = ngx_string("proxy_on_configure"); static ngx_str_t proxy_on_done = ngx_string("proxy_on_done"); static ngx_str_t proxy_on_delete = ngx_string("proxy_on_delete"); +static ngx_str_t proxy_on_request_headers = + ngx_string("proxy_on_request_headers"); typedef struct { @@ -348,6 +350,7 @@ ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf, if (hwp_ctx->state == NULL) { goto free_hwp_ctx; } + hwp_ctx->state->r = NULL; state_conf = (u_char *) (hwp_ctx->state + 1); /* copy conf so we can access it anytime */ @@ -360,6 +363,9 @@ ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf, rc = ngx_wasm_vm.call(plugin, &proxy_on_configure, true, NGX_WASM_PARAM_I32_I32, ctx_id, size); + + ngx_http_wasm_set_state(NULL); + if (rc <= 0) { ngx_log_error(NGX_LOG_ERR, log, 0, "failed to configure plugin context %d, rc: %d", ctx_id, rc); @@ -431,42 +437,47 @@ static void ngx_http_wasm_cleanup(void *data) { ngx_int_t rc; + ngx_uint_t i; ngx_http_wasm_ctx_t *ctx = data; - ngx_http_wasm_http_ctx_t *http_ctx = ctx->http_ctx; + ngx_array_t *http_ctxs = ctx->http_ctxs; uint32_t ctx_id; + ngx_http_wasm_http_ctx_t *http_ctx; ngx_http_wasm_plugin_ctx_t *hwp_ctx; void *plugin; ngx_log_t *log; log = ngx_cycle->log; - if (http_ctx == NULL) { + if (http_ctxs == NULL) { return; } - ctx_id = http_ctx->id; - hwp_ctx = http_ctx->hwp_ctx; - plugin = hwp_ctx->hw_plugin->plugin; + for (i = 0; i < http_ctxs->nelts; i++) { + http_ctx = ((ngx_http_wasm_http_ctx_t **) http_ctxs->elts)[i]; + ctx_id = http_ctx->id; + hwp_ctx = http_ctx->hwp_ctx; + plugin = hwp_ctx->hw_plugin->plugin; - ngx_queue_remove(&http_ctx->queue); - ngx_queue_insert_head(&hwp_ctx->free, &http_ctx->queue); + ngx_queue_remove(&http_ctx->queue); + ngx_queue_insert_head(&hwp_ctx->free, &http_ctx->queue); - rc = ngx_wasm_vm.call(plugin, &proxy_on_done, true, NGX_WASM_PARAM_I32, ctx_id); - if (rc <= 0) { - ngx_log_error(NGX_LOG_ERR, log, 0, "failed to mark context %d as done, rc: %d", - ctx_id, rc); - } + rc = ngx_wasm_vm.call(plugin, &proxy_on_done, true, NGX_WASM_PARAM_I32, ctx_id); + if (rc <= 0) { + ngx_log_error(NGX_LOG_ERR, log, 0, "failed to mark context %d as done, rc: %d", + ctx_id, rc); + } - rc = ngx_wasm_vm.call(plugin, &proxy_on_delete, false, NGX_WASM_PARAM_I32, ctx_id); - if (rc != NGX_OK) { - ngx_log_error(NGX_LOG_ERR, log, 0, "failed to delete context %d, rc: %d", - ctx_id, rc); - } + rc = ngx_wasm_vm.call(plugin, &proxy_on_delete, false, NGX_WASM_PARAM_I32, ctx_id); + if (rc != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, log, 0, "failed to delete context %d, rc: %d", + ctx_id, rc); + } - ngx_log_error(NGX_LOG_INFO, log, 0, "free http context %d", ctx_id); + ngx_log_error(NGX_LOG_INFO, log, 0, "free http context %d", ctx_id); - if (hwp_ctx->done) { - ngx_http_wasm_free_plugin_ctx(hwp_ctx); + if (hwp_ctx->done) { + ngx_http_wasm_free_plugin_ctx(hwp_ctx); + } } } @@ -505,7 +516,10 @@ ngx_http_wasm_get_module_ctx(ngx_http_request_t *r) ngx_http_wasm_http_ctx_t * ngx_http_wasm_fetch_http_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r) { + ngx_uint_t i; ngx_http_wasm_ctx_t *ctx; + ngx_http_wasm_http_ctx_t *http_ctx; + ngx_http_wasm_http_ctx_t **p; ctx = ngx_http_wasm_get_module_ctx(r); @@ -513,11 +527,30 @@ ngx_http_wasm_fetch_http_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_reque return NULL; } - if (ctx->http_ctx == NULL) { - ctx->http_ctx = ngx_http_wasm_create_http_ctx(hwp_ctx, r); + if (ctx->http_ctxs == NULL) { + ctx->http_ctxs = ngx_array_create(r->pool, 1, sizeof(ngx_http_wasm_ctx_t *)); + if (ctx->http_ctxs == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no memory"); + return NULL; + } } - return ctx->http_ctx; + p = ctx->http_ctxs->elts; + + for (i = 0; i < ctx->http_ctxs->nelts; i++) { + if (p[i]->hwp_ctx == hwp_ctx) { + return p[i]; + } + } + + http_ctx = ngx_http_wasm_create_http_ctx(hwp_ctx, r); + if (http_ctx == NULL) { + return NULL; + } + + p = ngx_array_push(ctx->http_ctxs); + *p = http_ctx; + return http_ctx; } @@ -536,12 +569,19 @@ ngx_http_wasm_on_http(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r return NGX_DECLINED; } - rc = NGX_OK; + hwp_ctx->state->r = r; + ngx_http_wasm_set_state(hwp_ctx->state); http_ctx = ngx_http_wasm_fetch_http_ctx(hwp_ctx, r); if (http_ctx == NULL) { + ngx_http_wasm_set_state(NULL); return NGX_DECLINED; } + rc = ngx_wasm_vm.call(hwp_ctx->hw_plugin->plugin, + &proxy_on_request_headers, + true, NGX_WASM_PARAM_I32_I32_I32, http_ctx->id, + 0, 1); + ngx_http_wasm_set_state(NULL); return rc; } diff --git a/src/http/ngx_http_wasm_state.c b/src/http/ngx_http_wasm_state.c index 25f896f..e6c53c6 100644 --- a/src/http/ngx_http_wasm_state.c +++ b/src/http/ngx_http_wasm_state.c @@ -1,7 +1,7 @@ #include "ngx_http_wasm_state.h" -static ngx_http_wasm_state_t *cur_state; +static ngx_http_wasm_state_t *cur_state = NULL; void @@ -14,5 +14,20 @@ ngx_http_wasm_set_state(ngx_http_wasm_state_t *state) const ngx_str_t * ngx_http_wasm_get_conf(void) { + if (cur_state == NULL) { + return NULL; + } + return &cur_state->conf; } + + +ngx_http_request_t * +ngx_http_wasm_get_req(void) +{ + if (cur_state == NULL) { + return NULL; + } + + return cur_state->r; +} diff --git a/src/http/ngx_http_wasm_state.h b/src/http/ngx_http_wasm_state.h index fc2d441..9a5e7ae 100644 --- a/src/http/ngx_http_wasm_state.h +++ b/src/http/ngx_http_wasm_state.h @@ -3,15 +3,18 @@ #include +#include typedef struct { - ngx_str_t conf; + ngx_str_t conf; + ngx_http_request_t *r; } ngx_http_wasm_state_t; void ngx_http_wasm_set_state(ngx_http_wasm_state_t *state); const ngx_str_t *ngx_http_wasm_get_conf(void); +ngx_http_request_t *ngx_http_wasm_get_req(void); #endif // NGX_HTTP_WASM_STATE_H diff --git a/src/vm/vm.h b/src/vm/vm.h index fc1c442..0fc7eb5 100644 --- a/src/vm/vm.h +++ b/src/vm/vm.h @@ -6,9 +6,10 @@ #include -#define NGX_WASM_PARAM_VOID 1 -#define NGX_WASM_PARAM_I32 2 -#define NGX_WASM_PARAM_I32_I32 3 +#define NGX_WASM_PARAM_VOID 1 +#define NGX_WASM_PARAM_I32 2 +#define NGX_WASM_PARAM_I32_I32 3 +#define NGX_WASM_PARAM_I32_I32_I32 4 typedef struct { diff --git a/src/vm/wasmtime.c b/src/vm/wasmtime.c index a2e4364..cb19fc2 100644 --- a/src/vm/wasmtime.c +++ b/src/vm/wasmtime.c @@ -19,6 +19,9 @@ static ngx_str_t vm_name = ngx_string("wasmtime"); static wasm_engine_t *vm_engine; static wasmtime_val_t param_int32[1] = {{ .kind = WASMTIME_I32 }}; static wasmtime_val_t param_int32_int32[2] = {{ .kind = WASMTIME_I32 }, { .kind = WASMTIME_I32 }}; +static wasmtime_val_t param_int32_int32_int32[3] = { + { .kind = WASMTIME_I32 }, { .kind = WASMTIME_I32 }, { .kind = WASMTIME_I32 } +}; static ngx_wasm_wasmtime_plugin_t *cur_plugin; @@ -242,6 +245,14 @@ ngx_wasm_wasmtime_call(void *data, ngx_str_t *name, bool has_result, int param_t param_num = 2; break; + case NGX_WASM_PARAM_I32_I32_I32: + params = param_int32_int32_int32; + params[0].of.i32 = va_arg(args, int32_t); + params[1].of.i32 = va_arg(args, int32_t); + params[2].of.i32 = va_arg(args, int32_t); + param_num = 3; + break; + default: ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "unknown param type: %d", param_type); return NGX_ERROR; diff --git a/t/http_lifecycle.t b/t/http_lifecycle.t index 5f8fe85..da19337 100644 --- a/t/http_lifecycle.t +++ b/t/http_lifecycle.t @@ -89,3 +89,44 @@ location /hit { --- grep_error_log eval qr/free http context (1|11)$/ --- grep_error_log_out + + + +=== TEST 4: multi plugin ctx in same req +--- config +location /t { + content_by_lua_block { + local wasm = require("resty.proxy-wasm") + local plugin = assert(wasm.load("t/testdata/http_lifecycle/main.go.wasm")) + local ctx1 = assert(wasm.on_configure(plugin, '{"body":512}')) + local ctx2 = assert(wasm.on_configure(plugin, '{"body":256}')) + assert(wasm.on_http_request_headers(ctx1)) + assert(wasm.on_http_request_headers(ctx2)) + } +} +--- grep_error_log eval +qr/run http ctx \d+ with conf \S+/ +--- grep_error_log_out +run http ctx 3 with conf {"body":512}, +run http ctx 4 with conf {"body":256}, + + + +=== TEST 5: multi plugin in same req +--- config +location /t { + content_by_lua_block { + local wasm = require("resty.proxy-wasm") + local plugin1 = assert(wasm.load("t/testdata/http_lifecycle/main.go.wasm")) + local plugin2 = assert(wasm.load("t/testdata/http_lifecycle/main.go.wasm")) + local ctx1 = assert(wasm.on_configure(plugin1, '{"body":512}')) + local ctx2 = assert(wasm.on_configure(plugin2, '{"body":256}')) + assert(wasm.on_http_request_headers(ctx1)) + assert(wasm.on_http_request_headers(ctx2)) + } +} +--- grep_error_log eval +qr/run http ctx \d+ with conf \S+/ +--- grep_error_log_out +run http ctx 2 with conf {"body":512}, +run http ctx 2 with conf {"body":256}, diff --git a/t/log.t b/t/log.t index cbe6bcb..4232a57 100644 --- a/t/log.t +++ b/t/log.t @@ -26,3 +26,23 @@ qr/\[emerg\] \d+#\d+: ouch, something is wrong / --- no_error_log [alert] + + + +=== TEST 2: log in http request +--- config +location /t { + content_by_lua_block { + local wasm = require("resty.proxy-wasm") + local plugin = assert(wasm.load("t/testdata/log/main.go.wasm")) + local ctx = assert(wasm.on_configure(plugin, '{"body":512}')) + assert(wasm.on_http_request_headers(ctx)) + } +} +--- grep_error_log eval +qr/(create|run) http ctx \d+, client/ +--- grep_error_log_out +create http ctx 2, client +run http ctx 2, client +--- no_error_log +[alert] diff --git a/t/testdata/http_lifecycle/main.go b/t/testdata/http_lifecycle/main.go index ca195b6..560f798 100644 --- a/t/testdata/http_lifecycle/main.go +++ b/t/testdata/http_lifecycle/main.go @@ -37,3 +37,14 @@ type httpLifecycle struct { types.DefaultHttpContext contextID uint32 } + +func (ctx *httpLifecycle) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action { + data, err := proxywasm.GetPluginConfiguration() + if err != nil { + proxywasm.LogCriticalf("error reading plugin configuration: %v", err) + return types.ActionContinue + } + + proxywasm.LogWarnf("run http ctx %d with conf %s", ctx.contextID, string(data)) + return types.ActionContinue +} diff --git a/t/testdata/log/main.go b/t/testdata/log/main.go index e61798e..21bb462 100644 --- a/t/testdata/log/main.go +++ b/t/testdata/log/main.go @@ -31,3 +31,18 @@ func (ctx *pluginLifecycle) OnPluginStart(pluginConfigurationSize int) types.OnP proxywasm.LogTrace("ouch, something is wrong") return types.OnPluginStartStatusOK } + +func (*pluginLifecycle) NewHttpContext(contextID uint32) types.HttpContext { + proxywasm.LogWarnf("create http ctx %d", contextID) + return &httpLifecycle{contextID: contextID} +} + +type httpLifecycle struct { + types.DefaultHttpContext + contextID uint32 +} + +func (ctx *httpLifecycle) OnHttpRequestHeaders(numHeaders int, endOfStream bool) types.Action { + proxywasm.LogWarnf("run http ctx %d", ctx.contextID) + return types.ActionContinue +}