From 089d90d7c7ea7ed94a1b088875c4e7edf03da4f8 Mon Sep 17 00:00:00 2001 From: spacewander Date: Tue, 29 Mar 2022 18:08:38 +0800 Subject: [PATCH 1/2] mkdir Signed-off-by: spacewander From 142d5ed14d670d741c08580ff29b9d8da4dbeffc Mon Sep 17 00:00:00 2001 From: spacewander Date: Fri, 1 Apr 2022 16:07:33 +0800 Subject: [PATCH 2/2] feat(xRPC): implement read method Signed-off-by: spacewander --- lib/resty/apisix/stream/xrpc/socket.lua | 104 ++++++- patch/1.19.9/ngx_stream_lua-xrpc.patch | 376 ++++++++++++++++++++++++ t/stream/xrpc/downstream.t | 123 ++++++++ t/stream/xrpc/fuzzing.t | 77 +++++ t/stream/xrpc/upstream.t | 126 +++++++- 5 files changed, 786 insertions(+), 20 deletions(-) create mode 100644 patch/1.19.9/ngx_stream_lua-xrpc.patch create mode 100644 t/stream/xrpc/fuzzing.t diff --git a/lib/resty/apisix/stream/xrpc/socket.lua b/lib/resty/apisix/stream/xrpc/socket.lua index fe17bda..bcf8314 100644 --- a/lib/resty/apisix/stream/xrpc/socket.lua +++ b/lib/resty/apisix/stream/xrpc/socket.lua @@ -1,23 +1,115 @@ -require("table.clone") local base = require("resty.core.base") +local ffi = require("ffi") +local ffi_str = ffi.string +local C = ffi.C +local FFI_AGAIN = base.FFI_AGAIN +local FFI_DONE = base.FFI_DONE +local FFI_ERROR = base.FFI_ERROR +local get_string_buf = base.get_string_buf +local get_size_ptr = base.get_size_ptr +local get_request = base.get_request +local co_yield = coroutine._yield +local tab_clone = require("table.clone") base.allows_subsystem("stream") +ffi.cdef[[ +typedef unsigned char u_char; +typedef struct ngx_stream_lua_socket_tcp_upstream_s + ngx_stream_lua_socket_tcp_upstream_t; + +int +ngx_stream_lua_ffi_socket_tcp_read_buf(ngx_stream_lua_request_t *r, + ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len, + u_char *errbuf, size_t *errbuf_size); + +int +ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r, + ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len, + u_char *errbuf, size_t *errbuf_size); +]] +local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf +local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result + + +local ERR_BUF_SIZE = 256 +local SOCKET_CTX_INDEX = 1 +local resbuf = ffi.new("u_char*[1]") local Downstream = {} local Upstream = {} local downstream_mt local upstream_mt --- need to remove methods which will break the buffer management -local function remove_unwanted_method(sk) + +local function get_tcp_socket(cosocket) + local tcp_socket = cosocket[SOCKET_CTX_INDEX] + if not tcp_socket then + return error("bad tcp socket", 3) + end + + return tcp_socket +end + + +-- read the given length of data to a buffer in C land and return the buffer address +-- return error if the read data is less than given length +local function read_buf(cosocket, len) + if len <= 0 then + error("bad length: length of data should be positive, got " .. len, 2) + end + + if len > 4 * 1024 * 1024 then + error("bad length: length of data too big, got " .. len, 2) + end + + local r = get_request() + if not r then + error("no request found", 2) + end + + local u = get_tcp_socket(cosocket) + local errbuf = get_string_buf(ERR_BUF_SIZE) + local errbuf_size = get_size_ptr() + errbuf_size[0] = ERR_BUF_SIZE + + local rc = socket_tcp_read(r, u, resbuf, len, errbuf, errbuf_size) + if rc == FFI_DONE then + error(ffi_str(errbuf, errbuf_size[0]), 2) + end + + while true do + if rc == FFI_ERROR then + return nil, ffi_str(errbuf, errbuf_size[0]) + end + + if rc >= 0 then + return resbuf[0] + end + + assert(rc == FFI_AGAIN) + + co_yield() + + errbuf = get_string_buf(ERR_BUF_SIZE) + errbuf_size = get_size_ptr() + errbuf_size[0] = ERR_BUF_SIZE + rc = socket_tcp_get_read_result(r, u, resbuf, len, errbuf, errbuf_size) + end +end + + +local function patch_methods(sk) local methods = getmetatable(sk).__index - local copy = table.clone(methods) + local copy = tab_clone(methods) + -- need to remove methods which will break the buffer management copy.receive = nil copy.receiveany = nil copy.receiveuntil = nil + copy.read = read_buf + return {__index = copy} end @@ -25,13 +117,13 @@ end local function set_method_table(sk, is_downstream) if is_downstream then if not downstream_mt then - downstream_mt = remove_unwanted_method(sk) + downstream_mt = patch_methods(sk) end return setmetatable(sk, downstream_mt) end if not upstream_mt then - upstream_mt = remove_unwanted_method(sk) + upstream_mt = patch_methods(sk) end return setmetatable(sk, upstream_mt) end diff --git a/patch/1.19.9/ngx_stream_lua-xrpc.patch b/patch/1.19.9/ngx_stream_lua-xrpc.patch new file mode 100644 index 0000000..529c400 --- /dev/null +++ b/patch/1.19.9/ngx_stream_lua-xrpc.patch @@ -0,0 +1,376 @@ +diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c +index 7fcfb45..601673d 100644 +--- src/ngx_stream_lua_socket_tcp.c ++++ src/ngx_stream_lua_socket_tcp.c +@@ -234,6 +234,41 @@ enum { + } + + ++#define ngx_stream_lua_ffi_socket_check_busy_connecting(r, u, errbuf, \ ++ errbuf_size) \ ++ if ((u)->conn_waiting) { \ ++ *errbuf_size = ngx_snprintf((errbuf), *(errbuf_size), \ ++ "socket busy connecting") \ ++ - (errbuf); \ ++ return NGX_ERROR; \ ++ } ++ ++ ++#define ngx_stream_lua_ffi_socket_check_busy_reading(r, u, errbuf, \ ++ errbuf_size) \ ++ if ((u)->read_waiting) { \ ++ *errbuf_size = ngx_snprintf((errbuf), *(errbuf_size), \ ++ "socket busy reading") \ ++ - (errbuf); \ ++ return NGX_ERROR; \ ++ } ++ ++ ++#define ngx_stream_lua_ffi_socket_check_busy_writing(r, u, errbuf, \ ++ errbuf_size) \ ++ if ((u)->write_waiting) { \ ++ *errbuf_size = ngx_snprintf((errbuf), *(errbuf_size), \ ++ "socket busy writing") \ ++ - (errbuf); \ ++ } \ ++ if ((u)->raw_downstream \ ++ && ((r)->connection->buffered)) \ ++ { \ ++ *errbuf_size = ngx_snprintf((errbuf), *(errbuf_size), \ ++ "socket busy writing") \ ++ - (errbuf); \ ++ } ++ + + static char ngx_stream_lua_raw_req_socket_metatable_key; + static char ngx_stream_lua_tcp_socket_metatable_key; +@@ -6005,6 +6040,329 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer( + } + + ++static int ++ngx_stream_lua_socket_tcp_dummy_retval_handler(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L) ++{ ++ return 0; ++} ++ ++ ++static ngx_int_t ++ngx_stream_lua_ffi_socket_push_res(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_ctx_t *ctx, ngx_stream_lua_socket_tcp_upstream_t *u, ++ u_char **buf, size_t len) ++{ ++ dd("bufs_in: %p, buf_in: %p", u->bufs_in, u->buf_in); ++ ++ ngx_log_debug3(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0, ++ "stream lua tcp socket push res: pos:%p, last:%p, len:%d", ++ u->buffer.pos, u->buffer.last, len); ++ ++ *buf = u->buffer.pos - len; ++ return NGX_OK; ++} ++ ++ ++static void ++ngx_stream_lua_ffi_socket_prepare_error_retvals(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ u_char *p; ++ ++ if (ft_type & NGX_STREAM_LUA_SOCKET_FT_TIMEOUT) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "timeout") - errbuf; ++ ++ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_CLOSED) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "closed") - errbuf; ++ ++ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_BUFTOOSMALL) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "buffer too small") ++ - errbuf; ++ ++ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_NOMEM) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf; ++ ++ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_CLIENTABORT) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "client aborted") ++ - errbuf; ++ ++ } else { ++ ++ if (u->socket_errno) { ++ p = ngx_strerror(u->socket_errno, errbuf, *errbuf_size); ++ /* for compatibility with LuaSocket */ ++ ngx_strlow(errbuf, errbuf, p - errbuf); ++ ++ } else { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "error") ++ - errbuf; ++ } ++ } ++} ++ ++ ++static void ++ngx_stream_lua_ffi_socket_read_error_retval_handler( ++ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ ngx_uint_t ft_type; ++ ++ if (u->ft_type & NGX_STREAM_LUA_SOCKET_FT_TIMEOUT) { ++ u->no_close = 1; ++ } ++ ++ if (u->read_co_ctx) { ++ u->read_co_ctx->cleanup = NULL; ++ } ++ ++ ft_type = u->ft_type; ++ u->ft_type = 0; ++ ++ if (u->no_close) { ++ u->no_close = 0; ++ ++ } else { ++ ngx_stream_lua_socket_tcp_finalize_read_part(r, u); ++ } ++ ++ ngx_stream_lua_ffi_socket_prepare_error_retvals(r, u, ft_type, ++ errbuf, errbuf_size); ++} ++ ++ ++static ngx_int_t ++ngx_stream_lua_ffi_socket_read_retval_handler(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, u_char **buf, size_t len, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ ngx_stream_lua_ctx_t *ctx; ++ ngx_event_t *ev; ++ ++ ngx_stream_lua_loc_conf_t *llcf; ++ ++ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); ++ ++ if (u->raw_downstream || u->body_downstream) { ++ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); ++ ++ if (llcf->check_client_abort) { ++ ++ r->read_event_handler = ngx_stream_lua_rd_check_broken_connection; ++ ++ ev = r->connection->read; ++ ++ dd("rev active: %d", ev->active); ++ ++ if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && !ev->active) { ++ if (ngx_add_event(ev, NGX_READ_EVENT, 0) != NGX_OK) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, ++ "failed to add event") - errbuf; ++ return NGX_ERROR; ++ } ++ } ++ ++ } else { ++ /* llcf->check_client_abort == 0 */ ++ r->read_event_handler = ngx_stream_lua_block_reading; ++ } ++ } ++ ++ if (u->ft_type) { ++ ngx_stream_lua_ffi_socket_read_error_retval_handler(r, u, errbuf, ++ errbuf_size); ++ return NGX_ERROR; ++ } ++ ++ ngx_stream_lua_ffi_socket_push_res(r, ctx, u, buf, len); ++ return NGX_OK; ++} ++ ++ ++int ++ngx_stream_lua_ffi_socket_tcp_read_buf(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, u_char **buf, size_t len, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ ngx_int_t rc; ++ ngx_stream_lua_loc_conf_t *llcf; ++ ngx_stream_lua_ctx_t *lctx; ++ ngx_stream_lua_co_ctx_t *coctx; ++ ++ if (u == NULL || u->peer.connection == NULL || u->read_closed) { ++ ++ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); ++ ++ if (llcf->log_socket_errors) { ++ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, ++ "stream attempt to receive data on a closed " ++ "socket: u:%p, c:%p, ft:%d eof:%d", ++ u, u ? u->peer.connection : NULL, ++ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0); ++ } ++ ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "closed") - errbuf; ++ return NGX_ERROR; ++ } ++ ++ if (u->request != r) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "bad request") ++ - errbuf; ++ return NGX_DONE; ++ } ++ ++ ngx_stream_lua_ffi_socket_check_busy_connecting(r, u, errbuf, errbuf_size); ++ ngx_stream_lua_ffi_socket_check_busy_reading(r, u, errbuf, errbuf_size); ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "stream lua tcp socket read timeout: %M", u->read_timeout); ++ ++ u->input_filter = ngx_stream_lua_socket_read_chunk; ++ u->length = (size_t) len; ++ u->rest = u->length; ++ u->input_filter_ctx = u; ++ ++ lctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); ++ ++ if (u->bufs_in == NULL) { ++ size_t buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size; ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "stream lua tcp socket allocate new new buf of size %uz", ++ buf_len); ++ ++ u->bufs_in = ++ ngx_stream_lua_chain_get_free_buf(r->connection->log, ++ r->pool, ++ &lctx->free_recv_bufs, ++ buf_len); ++ ++ if (u->bufs_in == NULL) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") ++ - errbuf; ++ return NGX_DONE; ++ } ++ ++ u->buf_in = u->bufs_in; ++ u->buffer = *u->buf_in->buf; ++ ++ } else { ++ size_t remain = u->buffer.end - u->buffer.pos; ++ size_t buf_len; ++ ngx_chain_t *cl; ++ ++ if (remain < len) { ++ buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size; ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "stream lua tcp socket allocate new new buf of size %uz", ++ buf_len); ++ ++ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, ++ r->pool, ++ &lctx->free_recv_bufs, ++ buf_len); ++ if (cl == NULL) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf; ++ return NGX_DONE; ++ } ++ ++ u->buf_in->next = cl; ++ u->buf_in = cl; ++ ++ cl->buf->last = ngx_copy(cl->buf->last, u->buffer.pos, remain); ++ u->buffer.last = u->buffer.pos; ++ ++ u->buffer = *cl->buf; ++ } ++ } ++ ++ dd("tcp receive: buf_in: %p, bufs_in: %p", u->buf_in, u->bufs_in); ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "stream lua tcp socket read timeout: %M", u->read_timeout); ++ ++ if (u->raw_downstream || u->body_downstream) { ++ r->read_event_handler = ngx_stream_lua_req_socket_rev_handler; ++ } ++ ++ u->read_waiting = 0; ++ u->read_co_ctx = NULL; ++ ++ rc = ngx_stream_lua_socket_tcp_read(r, u); ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "stream lua socket tcp read, rc: %d", rc); ++ ++ if (rc == NGX_ERROR) { ++ dd("read failed: %d", (int) u->ft_type); ++ rc = ngx_stream_lua_ffi_socket_read_retval_handler(r, u, buf, len, ++ errbuf, errbuf_size); ++ dd("tcp receive retval returned: %d", (int) rc); ++ return rc; ++ } ++ ++ if (rc == NGX_OK) { ++ ++ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "stream lua tcp socket receive done in a single run"); ++ ++ return ngx_stream_lua_ffi_socket_read_retval_handler(r, u, buf, len, ++ errbuf, errbuf_size); ++ } ++ ++ if (rc == NGX_AGAIN) { ++ u->read_event_handler = ngx_stream_lua_socket_read_handler; ++ ++ coctx = lctx->cur_co_ctx; ++ ++ ngx_stream_lua_cleanup_pending_operation(coctx); ++ coctx->cleanup = ngx_stream_lua_coctx_cleanup; ++ coctx->data = u; ++ ++ if (lctx->entered_content_phase) { ++ r->write_event_handler = ngx_stream_lua_content_wev_handler; ++ ++ } else { ++ r->write_event_handler = ngx_stream_lua_core_run_phases; ++ } ++ ++ u->read_co_ctx = coctx; ++ u->read_waiting = 1; ++ u->read_prepare_retvals = ngx_stream_lua_socket_tcp_dummy_retval_handler; ++ ++ dd("setting data to %p, coctx:%p", u, coctx); ++ ++ if (u->raw_downstream || u->body_downstream) { ++ lctx->downstream = u; ++ } ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "lua socket receive yield, u: %p", u); ++ ++ return NGX_AGAIN; ++ } ++ ++ return ngx_stream_lua_ffi_socket_read_retval_handler(r, u, buf, len, ++ errbuf, errbuf_size); ++} ++ ++ ++int ++ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, u_char **buf, size_t len, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "lua tcp socket get receive result"); ++ ++ return ngx_stream_lua_ffi_socket_read_retval_handler(r, u, buf, len, ++ errbuf, errbuf_size); ++} ++ ++ + static ngx_int_t + ngx_stream_lua_socket_tcp_conn_op_resume(ngx_stream_lua_request_t *r) + { diff --git a/t/stream/xrpc/downstream.t b/t/stream/xrpc/downstream.t index d366ac5..ada12bb 100644 --- a/t/stream/xrpc/downstream.t +++ b/t/stream/xrpc/downstream.t @@ -17,3 +17,126 @@ __DATA__ --- stream_request --- stream_response chomp world + + + +=== TEST 2: read +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + local p = assert(sk:read(4)) + ngx.say(ffi.string(p, 4)) + local p = assert(sk:read(4)) + ngx.say(ffi.string(p, 3)) + local p = assert(sk:read(1)) + ngx.say(ffi.string(p, 1)) + + local p, err = sk:read(5) + ngx.say(p) + ngx.say(err) + local p, err = sk:read(5) + ngx.say(p) + ngx.say(err) + } +--- error_log +socket read timed out +--- stream_request +hello world +--- stream_response +hell +o w +r +nil +timeout +nil +timeout + + + +=== TEST 3: read with peek +--- stream_server_config + preread_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + sk:peek(4) + local p = assert(sk:read(9)) + ngx.say(ffi.string(p, 9)) + ngx.exit(200) + } + proxy_pass 127.0.0.1:1990; +--- stream_request +hello world +--- stream_response +hello wor + + + +=== TEST 4: read with peek (peeked data > read) +--- stream_server_config + preread_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + sk:peek(11) + local p = assert(sk:read(9)) + ngx.say(ffi.string(p, 9)) + ngx.exit(200) + } + proxy_pass 127.0.0.1:1990; +--- stream_request +hello world +--- stream_response +hello wor + + + +=== TEST 5: read over buffer +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + local len = 9 * 1024 + local p = assert(sk:read(len)) + ngx.print(ffi.string(p, len)) + } +--- stream_request eval +"123456789" x 1024 +--- stream_response eval +"123456789" x 1024 + + + +=== TEST 6: read over buffer in the middle +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + local len = 9 * 256 + local p = assert(sk:read(len)) + ngx.say(ffi.string(p, len)) + local p = assert(sk:read(len)) + ngx.say(ffi.string(p, len)) + + local len = 9 * 512 + local p = assert(sk:read(len)) + ngx.print(ffi.string(p, len)) + } +--- stream_request eval +"123456789" x 1024 +--- stream_response eval +"123456789" x 256 . +"\n" . +"123456789" x 256 . +"\n" . +"123456789" x 512 +--- grep_error_log eval +qr/stream lua tcp socket allocate new new buf of size \d+/ +--- grep_error_log_out +stream lua tcp socket allocate new new buf of size 4096 +stream lua tcp socket allocate new new buf of size 4096 +stream lua tcp socket allocate new new buf of size 4608 diff --git a/t/stream/xrpc/fuzzing.t b/t/stream/xrpc/fuzzing.t new file mode 100644 index 0000000..d7130ec --- /dev/null +++ b/t/stream/xrpc/fuzzing.t @@ -0,0 +1,77 @@ +use t::APISIX_NGINX 'no_plan'; + +run_tests(); + +__DATA__ + +=== TEST 1: read +--- stream_server_config + lua_socket_buffer_size 128; + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + local total = 9 * 1024 + while total > 0 do + local len = math.random(1, 512) + if len > total then + len = total + end + total = total - len + local p = assert(sk:read(len)) + ngx.print(ffi.string(p, len)) + end + } +--- stream_request eval +"123456789" x 1024 +--- stream_response eval +"123456789" x 1024 + + + +=== TEST 2: read (upstream) +--- stream_config + lua_socket_buffer_size 128; + server { + listen 1995; + content_by_lua_block { + local s = ("123456789"):rep(1024) + local total = 9 * 1024 + local idx = 1 + while total > 0 do + local len = math.random(1, 512) + if len > total then + len = total + end + total = total - len + local n, err = ngx.print(s:sub(idx, idx + len - 1)) + if not n then + ngx.log(ngx.ERR, err) + return + end + ngx.flush(true) + idx = idx + len + end + } + } +--- stream_server_config + lua_socket_buffer_size 128; + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + sk:settimeout(5) + local total = 9 * 1024 + while total > 0 do + local len = math.random(1, 512) + if len > total then + len = total + end + total = total - len + local p = assert(sk:read(len)) + ngx.print(ffi.string(p, len)) + end + } +--- stream_request +--- stream_response eval +"123456789" x 1024 diff --git a/t/stream/xrpc/upstream.t b/t/stream/xrpc/upstream.t index 09bc919..6a0b34d 100644 --- a/t/stream/xrpc/upstream.t +++ b/t/stream/xrpc/upstream.t @@ -1,10 +1,11 @@ use t::APISIX_NGINX 'no_plan'; -add_block_preprocessor(sub { - my ($block) = @_; +run_tests(); + +__DATA__ - my $stream_config = $block->stream_config; - $stream_config .= <<'_EOC_'; +=== TEST 1: inherit methods from raw socket +--- stream_config server { listen 1995; content_by_lua_block { @@ -14,16 +15,6 @@ add_block_preprocessor(sub { sk:send(resp) } } -_EOC_ - - $block->set_value("stream_config", $stream_config); -}); - -run_tests(); - -__DATA__ - -=== TEST 1: inherit methods from raw socket --- stream_server_config content_by_lua_block { local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() @@ -41,3 +32,110 @@ __DATA__ qr/get resp data: \w+/ --- grep_error_log_out get resp data: world + + + +=== TEST 2: read +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send("hello world") + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + + sk:settimeout(5) + local p = assert(sk:read(4)) + ngx.say(ffi.string(p, 4)) + local p = assert(sk:read(4)) + ngx.say(ffi.string(p, 3)) + local p = assert(sk:read(1)) + ngx.say(ffi.string(p, 1)) + + local p, err = sk:read(5) + ngx.say(p) + ngx.say(err) + local p, err = sk:read(5) + ngx.say(p) + ngx.say(err) + } +--- error_log +receive data on a closed socket +--- stream_request +--- stream_response +hell +o w +r +nil +closed +nil +closed + + + +=== TEST 3: read over buffer +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send(("123456789"):rep(1024)) + ngx.sleep(0.1) + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + + sk:settimeout(5) + local len = 9 * 1024 + local p = assert(sk:read(len)) + ngx.print(ffi.string(p, len)) + } +--- stream_request +--- stream_response eval +"123456789" x 1024 + + + +=== TEST 4: read over buffer in the middle +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send(("123456789"):rep(1024)) + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + sk:settimeout(5) + + local len = 9 * 256 + local p = assert(sk:read(len)) + ngx.say(ffi.string(p, len)) + local p = assert(sk:read(len)) + ngx.say(ffi.string(p, len)) + + local len = 9 * 512 + local p = assert(sk:read(len)) + ngx.print(ffi.string(p, len)) + } +--- stream_request +--- stream_response eval +"123456789" x 256 . +"\n" . +"123456789" x 256 . +"\n" . +"123456789" x 512