From 4239f193abb354e602ef6106cd2d876cc1a9d48e Mon Sep 17 00:00:00 2001 From: spacewander Date: Sat, 9 Oct 2021 21:06:27 +0800 Subject: [PATCH] feat: create/free HTTP ctx --- .github/workflows/test.yml | 8 +- lib/resty/proxy-wasm.lua | 26 ++++ src/http/ngx_http_wasm_ctx.h | 42 ++++++ src/http/ngx_http_wasm_module.c | 210 +++++++++++++++++++++++++++--- t/http_lifecycle.t | 91 +++++++++++++ t/testdata/http_lifecycle/main.go | 39 ++++++ 6 files changed, 396 insertions(+), 20 deletions(-) create mode 100644 src/http/ngx_http_wasm_ctx.h create mode 100644 t/http_lifecycle.t create mode 100644 t/testdata/http_lifecycle/main.go diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5f33284..0eef201 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,8 +18,10 @@ jobs: - name: Get dependencies run: | - sudo apt install -y cpanminus build-essential libncurses5-dev libreadline-dev libssl-dev perl - wget https://github.com/tinygo-org/tinygo/releases/download/v0.19.0/tinygo_0.19.0_amd64.deb + sudo apt install -y cpanminus build-essential libncurses5-dev libreadline-dev libssl-dev perl luarocks + sudo luarocks install lua-resty-http > build.log 2>&1 || (cat build.log && exit 1) + + wget https://github.com/tinygo-org/tinygo/releases/download/v0.19.0/tinygo_0.19.0_amd64.deb 2>/dev/null sudo dpkg -i tinygo_0.19.0_amd64.deb - name: Before install @@ -35,6 +37,6 @@ jobs: - name: Script run: | - make build.all.testdata + sudo make build.all.testdata export PATH=$OPENRESTY_PREFIX/nginx/sbin:$PATH prove -I. -Itest-nginx/lib -r t/ diff --git a/lib/resty/proxy-wasm.lua b/lib/resty/proxy-wasm.lua index b3e6fdf..c5f1360 100644 --- a/lib/resty/proxy-wasm.lua +++ b/lib/resty/proxy-wasm.lua @@ -1,6 +1,11 @@ local ffi = require("ffi") +local base = require("resty.core.base") local ffi_gc = ffi.gc local C = ffi.C +local get_request = base.get_request + + +base.allows_subsystem("http") ffi.cdef[[ @@ -9,10 +14,12 @@ void *ngx_http_wasm_load_plugin(const char *code, size_t size); void ngx_http_wasm_unload_plugin(void *plugin); void *ngx_http_wasm_on_configure(void *plugin, const char *conf, size_t size); void ngx_http_wasm_delete_plugin_ctx(void *hwp_ctx); +ngx_int_t ngx_http_wasm_on_http(void *hwp_ctx, void *r, int type); ]] local _M = {} +local HTTP_REQUEST_HEADERS = 1 function _M.load(path) @@ -55,4 +62,23 @@ function _M.on_configure(plugin, conf) end +function _M.on_http_request_headers(plugin_ctx) + if type(plugin_ctx) ~= "cdata" then + return nil, "bad plugin ctx" + end + + local r = get_request() + if not r then + return nil, "bad request" + end + + local rc = C.ngx_http_wasm_on_http(plugin_ctx, r, HTTP_REQUEST_HEADERS) + if rc < 0 then + return nil, "failed to run proxy_on_http_request_headers" + end + + return true +end + + return _M diff --git a/src/http/ngx_http_wasm_ctx.h b/src/http/ngx_http_wasm_ctx.h new file mode 100644 index 0000000..0e50521 --- /dev/null +++ b/src/http/ngx_http_wasm_ctx.h @@ -0,0 +1,42 @@ +#ifndef NGX_HTTP_WASM_CTX_H +#define NGX_HTTP_WASM_CTX_H + + +#include +#include "ngx_http_wasm_state.h" + + +typedef struct { + void *plugin; + uint32_t cur_ctx_id; + ngx_queue_t occupied; + ngx_queue_t free; + unsigned done:1; +} ngx_http_wasm_plugin_t; + + +typedef struct { + ngx_queue_t queue; + uint32_t id; + ngx_http_wasm_state_t *state; + ngx_http_wasm_plugin_t *hw_plugin; + ngx_pool_t *pool; + ngx_queue_t occupied; + ngx_queue_t free; + unsigned done:1; +} ngx_http_wasm_plugin_ctx_t; + + +typedef struct { + ngx_queue_t queue; + uint32_t id; + ngx_http_wasm_plugin_ctx_t *hwp_ctx; +} ngx_http_wasm_http_ctx_t; + + +typedef struct { + ngx_http_wasm_http_ctx_t *http_ctx; +} ngx_http_wasm_ctx_t; + + +#endif // NGX_HTTP_WASM_CTX_H diff --git a/src/http/ngx_http_wasm_module.c b/src/http/ngx_http_wasm_module.c index b7925b4..05bea71 100644 --- a/src/http/ngx_http_wasm_module.c +++ b/src/http/ngx_http_wasm_module.c @@ -2,6 +2,7 @@ #include #include #include "ngx_http_wasm_state.h" +#include "ngx_http_wasm_ctx.h" #include "vm/vm.h" @@ -26,22 +27,9 @@ typedef struct { } ngx_http_wasm_main_conf_t; -typedef struct { - void *plugin; - uint32_t cur_ctx_id; - ngx_queue_t occupied; - ngx_queue_t free; - unsigned done:1; -} ngx_http_wasm_plugin_t; - - -typedef struct { - uint32_t id; - ngx_http_wasm_state_t *state; - ngx_http_wasm_plugin_t *hw_plugin; - ngx_pool_t *pool; - ngx_queue_t queue; -} ngx_http_wasm_plugin_ctx_t; +typedef enum { + HTTP_REQUEST_HEADERS = 1, +} ngx_http_wasm_phase_t; static ngx_command_t ngx_http_wasm_cmds[] = { @@ -249,7 +237,7 @@ ngx_http_wasm_unload_plugin(ngx_http_wasm_plugin_t *hw_plugin) void -ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx) +ngx_http_wasm_free_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx) { ngx_int_t rc; uint32_t ctx_id = hwp_ctx->id; @@ -259,6 +247,11 @@ ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx) log = ngx_cycle->log; + if (!ngx_queue_empty(&hwp_ctx->occupied)) { + /* some http ctxs are using it. Do not free */ + return; + } + ngx_queue_remove(&hwp_ctx->queue); ngx_queue_insert_head(&hw_plugin->free, &hwp_ctx->queue); @@ -279,12 +272,22 @@ ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx) hwp_ctx->pool = NULL; } + ngx_log_error(NGX_LOG_INFO, log, 0, "free plugin context %d", ctx_id); + if (hw_plugin->done) { ngx_http_wasm_free_plugin(hw_plugin); } } +void +ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx) +{ + hwp_ctx->done = 1; + ngx_http_wasm_free_plugin_ctx(hwp_ctx); +} + + void * ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf, size_t size) { @@ -320,6 +323,8 @@ ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf, ctx_id = hw_plugin->cur_ctx_id; hwp_ctx->id = ctx_id; hwp_ctx->hw_plugin = hw_plugin; + ngx_queue_init(&hwp_ctx->occupied); + ngx_queue_init(&hwp_ctx->free); } rc = ngx_wasm_vm.call(plugin, &proxy_on_context_create, false, @@ -369,3 +374,174 @@ ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf, ngx_http_wasm_delete_plugin_ctx(hwp_ctx); return NULL; } + + +ngx_http_wasm_http_ctx_t * +ngx_http_wasm_create_http_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r) +{ + ngx_int_t rc; + uint32_t ctx_id; + ngx_http_wasm_http_ctx_t *http_ctx; + ngx_http_wasm_plugin_t *hw_plugin = hwp_ctx->hw_plugin; + void *plugin = hw_plugin->plugin; + ngx_log_t *log; + + log = r->connection->log; + + if (!ngx_queue_empty(&hwp_ctx->free)) { + ngx_queue_t *q; + + q = ngx_queue_last(&hwp_ctx->free); + ngx_queue_remove(q); + http_ctx = ngx_queue_data(q, ngx_http_wasm_http_ctx_t, queue); + ctx_id = http_ctx->id; + + } else { + http_ctx = ngx_pcalloc(hwp_ctx->pool, sizeof(ngx_http_wasm_http_ctx_t)); + if (http_ctx == NULL) { + ngx_log_error(NGX_LOG_ERR, log, 0, "no memory"); + return NULL; + } + + hw_plugin->cur_ctx_id++; + ctx_id = hw_plugin->cur_ctx_id; + http_ctx->id = ctx_id; + http_ctx->hwp_ctx = hwp_ctx; + } + + rc = ngx_wasm_vm.call(plugin, &proxy_on_context_create, false, + NGX_WASM_PARAM_I32_I32, ctx_id, hwp_ctx->id); + if (rc != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, log, 0, "failed to create context %d, rc: %d", + ctx_id, rc); + /* reuse the ctx_id */ + ngx_queue_insert_head(&hwp_ctx->free, &http_ctx->queue); + return NULL; + } + + ngx_queue_insert_head(&hwp_ctx->occupied, &http_ctx->queue); + + ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "create http context %d", ctx_id); + + return http_ctx; +} + + +static void +ngx_http_wasm_cleanup(void *data) +{ + ngx_int_t rc; + ngx_http_wasm_ctx_t *ctx = data; + ngx_http_wasm_http_ctx_t *http_ctx = ctx->http_ctx; + uint32_t ctx_id; + ngx_http_wasm_plugin_ctx_t *hwp_ctx; + void *plugin; + ngx_log_t *log; + + log = ngx_cycle->log; + + if (http_ctx == NULL) { + return; + } + + ctx_id = http_ctx->id; + hwp_ctx = http_ctx->hwp_ctx; + plugin = hwp_ctx->hw_plugin->plugin; + + ngx_queue_remove(&http_ctx->queue); + ngx_queue_insert_head(&hwp_ctx->free, &http_ctx->queue); + + rc = ngx_wasm_vm.call(plugin, &proxy_on_done, true, NGX_WASM_PARAM_I32, ctx_id); + if (rc <= 0) { + ngx_log_error(NGX_LOG_ERR, log, 0, "failed to mark context %d as done, rc: %d", + ctx_id, rc); + } + + rc = ngx_wasm_vm.call(plugin, &proxy_on_delete, false, NGX_WASM_PARAM_I32, ctx_id); + if (rc != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, log, 0, "failed to delete context %d, rc: %d", + ctx_id, rc); + } + + ngx_log_error(NGX_LOG_INFO, log, 0, "free http context %d", ctx_id); + + if (hwp_ctx->done) { + ngx_http_wasm_free_plugin_ctx(hwp_ctx); + } +} + + +static ngx_http_wasm_ctx_t * +ngx_http_wasm_get_module_ctx(ngx_http_request_t *r) +{ + ngx_http_wasm_ctx_t *ctx; + ngx_pool_cleanup_t *cln; + + ctx = ngx_http_get_module_ctx(r, ngx_http_wasm_module); + + if (ctx == NULL) { + ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_wasm_ctx_t)); + if (ctx == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no memory"); + return NULL; + } + + cln = ngx_pool_cleanup_add(r->pool, 0); + if (cln == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no memory"); + return NULL; + } + + cln->data = ctx; + cln->handler = ngx_http_wasm_cleanup; + + ngx_http_set_ctx(r, ctx, ngx_http_wasm_module); + } + + return ctx; +} + + +ngx_http_wasm_http_ctx_t * +ngx_http_wasm_fetch_http_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r) +{ + ngx_http_wasm_ctx_t *ctx; + + + ctx = ngx_http_wasm_get_module_ctx(r); + if (ctx == NULL) { + return NULL; + } + + if (ctx->http_ctx == NULL) { + ctx->http_ctx = ngx_http_wasm_create_http_ctx(hwp_ctx, r); + } + + return ctx->http_ctx; +} + + +ngx_int_t +ngx_http_wasm_on_http(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r, + ngx_http_wasm_phase_t type) +{ + ngx_int_t rc; + ngx_log_t *log; + ngx_http_wasm_http_ctx_t *http_ctx; + + log = r->connection->log; + + if (!ngx_http_wasm_vm_inited) { + ngx_log_error(NGX_LOG_ERR, log, 0, "miss wasm_vm configuration"); + return NGX_DECLINED; + } + + rc = NGX_OK; + + http_ctx = ngx_http_wasm_fetch_http_ctx(hwp_ctx, r); + if (http_ctx == NULL) { + return NGX_DECLINED; + } + + return rc; +} diff --git a/t/http_lifecycle.t b/t/http_lifecycle.t new file mode 100644 index 0000000..5f8fe85 --- /dev/null +++ b/t/http_lifecycle.t @@ -0,0 +1,91 @@ +use t::WASM 'no_plan'; + +run_tests(); + +__DATA__ + +=== TEST 1: manage ctx +--- config +location /t { + content_by_lua_block { + local wasm = require("resty.proxy-wasm") + local plugin = assert(wasm.load("t/testdata/http_lifecycle/main.go.wasm")) + local ctx = assert(wasm.on_configure(plugin, '{"body":512}')) + assert(wasm.on_http_request_headers(ctx)) + assert(wasm.on_http_request_headers(ctx)) + } +} +--- grep_error_log eval +qr/(create|free) http context \d/ +--- grep_error_log_out +create http context 2 +free http context 2 + + + +=== TEST 2: ensure plugin ctx is free after http ctx +--- config +location /t { + content_by_lua_block { + local wasm = require("resty.proxy-wasm") + local plugin = assert(wasm.load("t/testdata/http_lifecycle/main.go.wasm")) + do + local ctx = assert(wasm.on_configure(plugin, '{"body":512}')) + assert(wasm.on_http_request_headers(ctx)) + end + collectgarbage() + } +} +--- grep_error_log eval +qr/free (plugin|http) context \d/ +--- grep_error_log_out +free http context 2 +free plugin context 1 + + + +=== TEST 3: multiple http ctx +--- http_config + init_by_lua_block { + local wasm = require("resty.proxy-wasm") + local plugin = assert(wasm.load("t/testdata/http_lifecycle/main.go.wasm")) + package.loaded.ctx = assert(wasm.on_configure(plugin, '{"body":512}')) + } +--- config +location /t { + content_by_lua_block { + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/hit" + + for _ = 1, 2 do + local t = {} + for i = 1, 9 do + local th = assert(ngx.thread.spawn(function(i) + local httpc = http.new() + local res, err = httpc:request_uri(uri..i, {method = "GET"}) + if not res then + ngx.log(ngx.ERR, err) + return + end + end, i)) + table.insert(t, th) + end + for i, th in ipairs(t) do + ngx.thread.wait(th) + end + -- check if the ctx id is reused + end + } +} +location /hit { + content_by_lua_block { + local wasm = require("resty.proxy-wasm") + local ctx = package.loaded.ctx + assert(wasm.on_http_request_headers(ctx)) + ngx.sleep(math.random() / 10) + } +} +--- grep_error_log eval +qr/free http context (1|11)$/ +--- grep_error_log_out diff --git a/t/testdata/http_lifecycle/main.go b/t/testdata/http_lifecycle/main.go new file mode 100644 index 0000000..ca195b6 --- /dev/null +++ b/t/testdata/http_lifecycle/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm" + "github.com/tetratelabs/proxy-wasm-go-sdk/proxywasm/types" +) + +func main() { + proxywasm.SetVMContext(&vmContext{}) +} + +type vmContext struct { + // Embed the default VM context here, + // so that we don't need to reimplement all the methods. + types.DefaultVMContext +} + +// Override types.DefaultVMContext. +func (*vmContext) NewPluginContext(contextID uint32) types.PluginContext { + return &pluginContext{} +} + +type pluginContext struct { + // Embed the default plugin context here, + // so that we don't need to reimplement all the methods. + types.DefaultPluginContext +} + +// Override types.DefaultPluginContext. +func (*pluginContext) NewHttpContext(contextID uint32) types.HttpContext { + return &httpLifecycle{contextID: contextID} +} + +type httpLifecycle struct { + // Embed the default http context here, + // so that we don't need to reimplement all the methods. + types.DefaultHttpContext + contextID uint32 +}