Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ jobs:

- name: Get dependencies
run: |
sudo apt install -y cpanminus build-essential libncurses5-dev libreadline-dev libssl-dev perl
wget https://github.com/tinygo-org/tinygo/releases/download/v0.19.0/tinygo_0.19.0_amd64.deb
sudo apt install -y cpanminus build-essential libncurses5-dev libreadline-dev libssl-dev perl luarocks
sudo luarocks install lua-resty-http > build.log 2>&1 || (cat build.log && exit 1)

wget https://github.com/tinygo-org/tinygo/releases/download/v0.19.0/tinygo_0.19.0_amd64.deb 2>/dev/null
sudo dpkg -i tinygo_0.19.0_amd64.deb

- name: Before install
Expand All @@ -35,6 +37,6 @@ jobs:

- name: Script
run: |
make build.all.testdata
sudo make build.all.testdata
export PATH=$OPENRESTY_PREFIX/nginx/sbin:$PATH
prove -I. -Itest-nginx/lib -r t/
26 changes: 26 additions & 0 deletions lib/resty/proxy-wasm.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
local ffi = require("ffi")
local base = require("resty.core.base")
local ffi_gc = ffi.gc
local C = ffi.C
local get_request = base.get_request


base.allows_subsystem("http")


ffi.cdef[[
Expand All @@ -9,10 +14,12 @@ void *ngx_http_wasm_load_plugin(const char *code, size_t size);
void ngx_http_wasm_unload_plugin(void *plugin);
void *ngx_http_wasm_on_configure(void *plugin, const char *conf, size_t size);
void ngx_http_wasm_delete_plugin_ctx(void *hwp_ctx);
ngx_int_t ngx_http_wasm_on_http(void *hwp_ctx, void *r, int type);
]]


local _M = {}
local HTTP_REQUEST_HEADERS = 1


function _M.load(path)
Expand Down Expand Up @@ -55,4 +62,23 @@ function _M.on_configure(plugin, conf)
end


function _M.on_http_request_headers(plugin_ctx)
if type(plugin_ctx) ~= "cdata" then
return nil, "bad plugin ctx"
end

local r = get_request()
if not r then
return nil, "bad request"
end

local rc = C.ngx_http_wasm_on_http(plugin_ctx, r, HTTP_REQUEST_HEADERS)
if rc < 0 then
return nil, "failed to run proxy_on_http_request_headers"
end

return true
end


return _M
42 changes: 42 additions & 0 deletions src/http/ngx_http_wasm_ctx.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef NGX_HTTP_WASM_CTX_H
#define NGX_HTTP_WASM_CTX_H


#include <ngx_core.h>
#include "ngx_http_wasm_state.h"


typedef struct {
void *plugin;
uint32_t cur_ctx_id;
ngx_queue_t occupied;
ngx_queue_t free;
unsigned done:1;
} ngx_http_wasm_plugin_t;


typedef struct {
ngx_queue_t queue;
uint32_t id;
ngx_http_wasm_state_t *state;
ngx_http_wasm_plugin_t *hw_plugin;
ngx_pool_t *pool;
ngx_queue_t occupied;
ngx_queue_t free;
unsigned done:1;
} ngx_http_wasm_plugin_ctx_t;


typedef struct {
ngx_queue_t queue;
uint32_t id;
ngx_http_wasm_plugin_ctx_t *hwp_ctx;
} ngx_http_wasm_http_ctx_t;


typedef struct {
ngx_http_wasm_http_ctx_t *http_ctx;
} ngx_http_wasm_ctx_t;


#endif // NGX_HTTP_WASM_CTX_H
210 changes: 193 additions & 17 deletions src/http/ngx_http_wasm_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <ngx_core.h>
#include <ngx_http.h>
#include "ngx_http_wasm_state.h"
#include "ngx_http_wasm_ctx.h"
#include "vm/vm.h"


Expand All @@ -26,22 +27,9 @@ typedef struct {
} ngx_http_wasm_main_conf_t;


typedef struct {
void *plugin;
uint32_t cur_ctx_id;
ngx_queue_t occupied;
ngx_queue_t free;
unsigned done:1;
} ngx_http_wasm_plugin_t;


typedef struct {
uint32_t id;
ngx_http_wasm_state_t *state;
ngx_http_wasm_plugin_t *hw_plugin;
ngx_pool_t *pool;
ngx_queue_t queue;
} ngx_http_wasm_plugin_ctx_t;
typedef enum {
HTTP_REQUEST_HEADERS = 1,
} ngx_http_wasm_phase_t;


static ngx_command_t ngx_http_wasm_cmds[] = {
Expand Down Expand Up @@ -249,7 +237,7 @@ ngx_http_wasm_unload_plugin(ngx_http_wasm_plugin_t *hw_plugin)


void
ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx)
ngx_http_wasm_free_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx)
{
ngx_int_t rc;
uint32_t ctx_id = hwp_ctx->id;
Expand All @@ -259,6 +247,11 @@ ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx)

log = ngx_cycle->log;

if (!ngx_queue_empty(&hwp_ctx->occupied)) {
/* some http ctxs are using it. Do not free */
return;
}

ngx_queue_remove(&hwp_ctx->queue);
ngx_queue_insert_head(&hw_plugin->free, &hwp_ctx->queue);

Expand All @@ -279,12 +272,22 @@ ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx)
hwp_ctx->pool = NULL;
}

ngx_log_error(NGX_LOG_INFO, log, 0, "free plugin context %d", ctx_id);

if (hw_plugin->done) {
ngx_http_wasm_free_plugin(hw_plugin);
}
}


void
ngx_http_wasm_delete_plugin_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx)
{
hwp_ctx->done = 1;
ngx_http_wasm_free_plugin_ctx(hwp_ctx);
}


void *
ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf, size_t size)
{
Expand Down Expand Up @@ -320,6 +323,8 @@ ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf,
ctx_id = hw_plugin->cur_ctx_id;
hwp_ctx->id = ctx_id;
hwp_ctx->hw_plugin = hw_plugin;
ngx_queue_init(&hwp_ctx->occupied);
ngx_queue_init(&hwp_ctx->free);
}

rc = ngx_wasm_vm.call(plugin, &proxy_on_context_create, false,
Expand Down Expand Up @@ -369,3 +374,174 @@ ngx_http_wasm_on_configure(ngx_http_wasm_plugin_t *hw_plugin, const char *conf,
ngx_http_wasm_delete_plugin_ctx(hwp_ctx);
return NULL;
}


ngx_http_wasm_http_ctx_t *
ngx_http_wasm_create_http_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r)
{
ngx_int_t rc;
uint32_t ctx_id;
ngx_http_wasm_http_ctx_t *http_ctx;
ngx_http_wasm_plugin_t *hw_plugin = hwp_ctx->hw_plugin;
void *plugin = hw_plugin->plugin;
ngx_log_t *log;

log = r->connection->log;

if (!ngx_queue_empty(&hwp_ctx->free)) {
ngx_queue_t *q;

q = ngx_queue_last(&hwp_ctx->free);
ngx_queue_remove(q);
http_ctx = ngx_queue_data(q, ngx_http_wasm_http_ctx_t, queue);
ctx_id = http_ctx->id;

} else {
http_ctx = ngx_pcalloc(hwp_ctx->pool, sizeof(ngx_http_wasm_http_ctx_t));
if (http_ctx == NULL) {
ngx_log_error(NGX_LOG_ERR, log, 0, "no memory");
return NULL;
}

hw_plugin->cur_ctx_id++;
ctx_id = hw_plugin->cur_ctx_id;
http_ctx->id = ctx_id;
http_ctx->hwp_ctx = hwp_ctx;
}

rc = ngx_wasm_vm.call(plugin, &proxy_on_context_create, false,
NGX_WASM_PARAM_I32_I32, ctx_id, hwp_ctx->id);
if (rc != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, log, 0, "failed to create context %d, rc: %d",
ctx_id, rc);
/* reuse the ctx_id */
ngx_queue_insert_head(&hwp_ctx->free, &http_ctx->queue);
return NULL;
}

ngx_queue_insert_head(&hwp_ctx->occupied, &http_ctx->queue);

ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "create http context %d", ctx_id);

return http_ctx;
}


static void
ngx_http_wasm_cleanup(void *data)
{
ngx_int_t rc;
ngx_http_wasm_ctx_t *ctx = data;
ngx_http_wasm_http_ctx_t *http_ctx = ctx->http_ctx;
uint32_t ctx_id;
ngx_http_wasm_plugin_ctx_t *hwp_ctx;
void *plugin;
ngx_log_t *log;

log = ngx_cycle->log;

if (http_ctx == NULL) {
return;
}

ctx_id = http_ctx->id;
hwp_ctx = http_ctx->hwp_ctx;
plugin = hwp_ctx->hw_plugin->plugin;

ngx_queue_remove(&http_ctx->queue);
ngx_queue_insert_head(&hwp_ctx->free, &http_ctx->queue);

rc = ngx_wasm_vm.call(plugin, &proxy_on_done, true, NGX_WASM_PARAM_I32, ctx_id);
if (rc <= 0) {
ngx_log_error(NGX_LOG_ERR, log, 0, "failed to mark context %d as done, rc: %d",
ctx_id, rc);
}

rc = ngx_wasm_vm.call(plugin, &proxy_on_delete, false, NGX_WASM_PARAM_I32, ctx_id);
if (rc != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, log, 0, "failed to delete context %d, rc: %d",
ctx_id, rc);
}

ngx_log_error(NGX_LOG_INFO, log, 0, "free http context %d", ctx_id);

if (hwp_ctx->done) {
ngx_http_wasm_free_plugin_ctx(hwp_ctx);
}
}


static ngx_http_wasm_ctx_t *
ngx_http_wasm_get_module_ctx(ngx_http_request_t *r)
{
ngx_http_wasm_ctx_t *ctx;
ngx_pool_cleanup_t *cln;

ctx = ngx_http_get_module_ctx(r, ngx_http_wasm_module);

if (ctx == NULL) {
ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_wasm_ctx_t));
if (ctx == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no memory");
return NULL;
}

cln = ngx_pool_cleanup_add(r->pool, 0);
if (cln == NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no memory");
return NULL;
}

cln->data = ctx;
cln->handler = ngx_http_wasm_cleanup;

ngx_http_set_ctx(r, ctx, ngx_http_wasm_module);
}

return ctx;
}


ngx_http_wasm_http_ctx_t *
ngx_http_wasm_fetch_http_ctx(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r)
{
ngx_http_wasm_ctx_t *ctx;


ctx = ngx_http_wasm_get_module_ctx(r);
if (ctx == NULL) {
return NULL;
}

if (ctx->http_ctx == NULL) {
ctx->http_ctx = ngx_http_wasm_create_http_ctx(hwp_ctx, r);
}

return ctx->http_ctx;
}


ngx_int_t
ngx_http_wasm_on_http(ngx_http_wasm_plugin_ctx_t *hwp_ctx, ngx_http_request_t *r,
ngx_http_wasm_phase_t type)
{
ngx_int_t rc;
ngx_log_t *log;
ngx_http_wasm_http_ctx_t *http_ctx;

log = r->connection->log;

if (!ngx_http_wasm_vm_inited) {
ngx_log_error(NGX_LOG_ERR, log, 0, "miss wasm_vm configuration");
return NGX_DECLINED;
}

rc = NGX_OK;

http_ctx = ngx_http_wasm_fetch_http_ctx(hwp_ctx, r);
if (http_ctx == NULL) {
return NGX_DECLINED;
}

return rc;
}
Loading