From 487f971a8da148dfd6d9030c93aaf124b16f6e45 Mon Sep 17 00:00:00 2001 From: spacewander Date: Wed, 6 Apr 2022 17:28:38 +0800 Subject: [PATCH] feat: add 'move' method Signed-off-by: spacewander --- lib/resty/apisix/stream/xrpc/socket.lua | 65 ++++++ patch/1.19.9/ngx_stream_lua-xrpc.patch | 263 +++++++++++++++++++++++- t/stream/xrpc/downstream.t | 116 +++++++++++ t/stream/xrpc/fuzzing.t | 100 ++++++++- t/stream/xrpc/upstream.t | 125 +++++++++++ 5 files changed, 660 insertions(+), 9 deletions(-) diff --git a/lib/resty/apisix/stream/xrpc/socket.lua b/lib/resty/apisix/stream/xrpc/socket.lua index bcf8314..0dd4066 100644 --- a/lib/resty/apisix/stream/xrpc/socket.lua +++ b/lib/resty/apisix/stream/xrpc/socket.lua @@ -29,9 +29,21 @@ int ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len, u_char *errbuf, size_t *errbuf_size); + +int +ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r, + ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *ds, + u_char *errbuf, size_t *errbuf_size); + +int +ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r, + ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf, + size_t *errbuf_size); ]] local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result +local socket_tcp_move = C.ngx_stream_lua_ffi_socket_tcp_send_from_socket +local socket_tcp_get_move_result = C.ngx_stream_lua_ffi_socket_tcp_get_send_result local ERR_BUF_SIZE = 256 @@ -100,6 +112,58 @@ local function read_buf(cosocket, len) end +-- move the buffers from src cosocket to dst cosocket. The buffers are from previous one or multiple +-- read calls. It is equal to send multiple read buffer in the src cosocket to the dst cosocket. +local function move(dst, src) + local r = get_request() + if not r then + error("no request found", 2) + end + + if src == dst then + error("can't move buffer in the same socket", 2) + end + + if not src then + error("no source socket found", 2) + end + + local dst_sk = get_tcp_socket(dst) + local src_sk = get_tcp_socket(src) + if not src_sk then + error("no source socket found", 2) + end + + local errbuf = get_string_buf(ERR_BUF_SIZE) + local errbuf_size = get_size_ptr() + errbuf_size[0] = ERR_BUF_SIZE + + local rc = socket_tcp_move(r, dst_sk, src_sk, errbuf, errbuf_size) + if rc == FFI_DONE then + error(ffi_str(errbuf, errbuf_size[0]), 2) + end + + while true do + if rc == FFI_ERROR then + return nil, ffi_str(errbuf, errbuf_size[0]) + end + + if rc >= 0 then + return true + end + + assert(rc == FFI_AGAIN) + + co_yield() + + errbuf = get_string_buf(ERR_BUF_SIZE) + errbuf_size = get_size_ptr() + errbuf_size[0] = ERR_BUF_SIZE + rc = socket_tcp_get_move_result(r, dst_sk, errbuf, errbuf_size) + end +end + + local function patch_methods(sk) local methods = getmetatable(sk).__index local copy = tab_clone(methods) @@ -109,6 +173,7 @@ local function patch_methods(sk) copy.receiveuntil = nil copy.read = read_buf + copy.move = move return {__index = copy} end diff --git a/patch/1.19.9/ngx_stream_lua-xrpc.patch b/patch/1.19.9/ngx_stream_lua-xrpc.patch index 529c400..0906f84 100644 --- a/patch/1.19.9/ngx_stream_lua-xrpc.patch +++ b/patch/1.19.9/ngx_stream_lua-xrpc.patch @@ -1,5 +1,5 @@ diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c -index 7fcfb45..601673d 100644 +index 7fcfb45..8fc96cf 100644 --- src/ngx_stream_lua_socket_tcp.c +++ src/ngx_stream_lua_socket_tcp.c @@ -234,6 +234,41 @@ enum { @@ -44,10 +44,39 @@ index 7fcfb45..601673d 100644 static char ngx_stream_lua_raw_req_socket_metatable_key; static char ngx_stream_lua_tcp_socket_metatable_key; -@@ -6005,6 +6040,329 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer( +@@ -6005,6 +6040,576 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer( } ++static void ++ngx_stream_lua_ffi_socket_reset_buf(ngx_stream_lua_ctx_t *ctx, ++ ngx_stream_lua_socket_tcp_upstream_t *u) ++{ ++ ngx_chain_t *cl = u->bufs_in; ++ ngx_chain_t **ll = NULL; ++ ++ if (cl->next) { ++ ll = &cl->next; ++ } ++ ++ if (ll) { ++ *ll = ctx->free_recv_bufs; ++ ctx->free_recv_bufs = u->bufs_in; ++ u->bufs_in = u->buf_in; ++ } ++ ++ if (u->buffer.pos == u->buffer.last) { ++ u->buffer.pos = u->buffer.start; ++ u->buffer.last = u->buffer.pos; ++ } ++ ++ if (u->bufs_in) { ++ u->buf_in->buf->last = u->buffer.pos; ++ u->buf_in->buf->pos = u->buffer.pos; ++ } ++} ++ ++ +static int +ngx_stream_lua_socket_tcp_dummy_retval_handler(ngx_stream_lua_request_t *r, + ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L) @@ -257,11 +286,13 @@ index 7fcfb45..601673d 100644 + u->buffer = *u->buf_in->buf; + + } else { -+ size_t remain = u->buffer.end - u->buffer.pos; ++ size_t remain_space = u->buffer.end - u->buffer.pos; ++ size_t remain_data = u->buffer.last - u->buffer.pos; + size_t buf_len; -+ ngx_chain_t *cl; ++ u_char *pos; ++ ngx_chain_t *cl, *tmp_cl; + -+ if (remain < len) { ++ if (remain_space < len) { + buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, @@ -277,13 +308,17 @@ index 7fcfb45..601673d 100644 + return NGX_DONE; + } + -+ u->buf_in->next = cl; ++ for (tmp_cl = u->bufs_in; tmp_cl->next; tmp_cl = tmp_cl->next) {} ++ tmp_cl->next = cl; + u->buf_in = cl; + -+ cl->buf->last = ngx_copy(cl->buf->last, u->buffer.pos, remain); + u->buffer.last = u->buffer.pos; -+ ++ pos = u->buffer.pos; + u->buffer = *cl->buf; ++ ++ if (remain_data > 0) { ++ u->buffer.last = ngx_copy(u->buffer.last, pos, remain_data); ++ } + } + } + @@ -370,6 +405,218 @@ index 7fcfb45..601673d 100644 + errbuf, errbuf_size); +} + ++ ++static void ++ngx_stream_lua_ffi_socket_write_error_retval_handler( ++ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ ngx_uint_t ft_type; ++ ++ if (u->write_co_ctx) { ++ u->write_co_ctx->cleanup = NULL; ++ } ++ ++ ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0); ++ ++ ft_type = u->ft_type; ++ u->ft_type = 0; ++ ++ ngx_stream_lua_ffi_socket_prepare_error_retvals(r, u, ft_type, ++ errbuf, errbuf_size); ++} ++ ++ ++int ++ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *src, ++ u_char *errbuf, size_t *errbuf_size) ++{ ++ size_t len = 0; ++ ngx_int_t rc; ++ ngx_chain_t *cl, *in_cl; ++ ngx_stream_lua_ctx_t *ctx; ++ int tcp_nodelay; ++ ngx_buf_t *b; ++ ngx_connection_t *c; ++ ngx_stream_lua_loc_conf_t *llcf; ++ ngx_stream_core_srv_conf_t *clcf; ++ ngx_stream_lua_co_ctx_t *coctx; ++ ++ if (u == NULL || u->peer.connection == NULL || u->write_closed) { ++ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); ++ ++ if (llcf->log_socket_errors) { ++ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, ++ "attempt to send data on a closed socket: u:%p, " ++ "c:%p, ft:%d eof:%d", ++ u, u ? u->peer.connection : NULL, ++ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0); ++ } ++ ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "closed") - errbuf; ++ return NGX_ERROR; ++ } ++ ++ if (u->request != r) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "bad request") ++ - errbuf; ++ return NGX_DONE; ++ } ++ ++ ngx_stream_lua_ffi_socket_check_busy_connecting(r, u, errbuf, errbuf_size); ++ ngx_stream_lua_ffi_socket_check_busy_writing(r, u, errbuf, errbuf_size); ++ ++ if (u->body_downstream) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, ++ "attempt to write to request sockets") ++ - errbuf; ++ return NGX_DONE; ++ } ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "lua tcp socket send timeout: %M", u->send_timeout); ++ ++ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module); ++ ++ for (in_cl = src->bufs_in; in_cl; in_cl = in_cl->next) { ++ b = in_cl->buf; ++ ++ len += b->last - b->pos; ++ ++ ngx_log_debug3(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "lua tcp socket move cl:%p buf %p, len: %d", ++ in_cl, b, b->last - b->pos); ++ ++ } ++ ++ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "lua tcp socket move total buf %p, len: %d", ++ src->bufs_in, len); ++ ++ if (len == 0) { ++ return NGX_OK; ++ } ++ ++ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool, ++ &ctx->free_bufs, len); ++ ++ if (cl == NULL) { ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf; ++ return NGX_DONE; ++ } ++ ++ /* TODO: avoid copying (it requires to modify the way cosocket sends data) */ ++ for (in_cl = src->bufs_in; in_cl; in_cl = in_cl->next) { ++ b = in_cl->buf; ++ cl->buf->last = ngx_copy(cl->buf->last, b->pos, b->last - b->pos); ++ } ++ ++ ngx_stream_lua_ffi_socket_reset_buf(ctx, src); ++ ++ u->request_bufs = cl; ++ ++ u->request_len = len; ++ ++ /* mimic ngx_stream_upstream_init_request here */ ++ ++ clcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_core_module); ++ c = u->peer.connection; ++ ++ if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) { ++ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, ++ "lua socket tcp_nodelay"); ++ ++ tcp_nodelay = 1; ++ ++ if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY, ++ (const void *) &tcp_nodelay, sizeof(int)) ++ == -1) ++ { ++ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module); ++ if (llcf->log_socket_errors) { ++ ngx_connection_error(c, ngx_socket_errno, ++ "setsockopt(TCP_NODELAY) " ++ "failed"); ++ } ++ ++ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, ++ "setsocketopt tcp_nodelay failed") ++ - errbuf; ++ return NGX_ERROR; ++ } ++ ++ c->tcp_nodelay = NGX_TCP_NODELAY_SET; ++ } ++ ++ u->write_waiting = 0; ++ u->write_co_ctx = NULL; ++ ++ ngx_stream_lua_probe_socket_tcp_send_start(r, u, b->pos, len); ++ ++ rc = ngx_stream_lua_socket_send(r, u); ++ ++ dd("socket send returned %d", (int) rc); ++ ++ if (rc == NGX_ERROR) { ++ ngx_stream_lua_ffi_socket_write_error_retval_handler(r, u, errbuf, ++ errbuf_size); ++ return NGX_ERROR; ++ } ++ ++ if (rc == NGX_OK) { ++ return rc; ++ } ++ ++ /* rc == NGX_AGAIN */ ++ ++ coctx = ctx->cur_co_ctx; ++ ++ ngx_stream_lua_cleanup_pending_operation(coctx); ++ coctx->cleanup = ngx_stream_lua_coctx_cleanup; ++ coctx->data = u; ++ ++ if (u->raw_downstream) { ++ ctx->writing_raw_req_socket = 1; ++ } ++ ++ if (ctx->entered_content_phase) { ++ r->write_event_handler = ngx_stream_lua_content_wev_handler; ++ ++ } else { ++ r->write_event_handler = ngx_stream_lua_core_run_phases; ++ } ++ ++ u->write_co_ctx = coctx; ++ u->write_waiting = 1; ++ u->write_prepare_retvals = ngx_stream_lua_socket_tcp_dummy_retval_handler; ++ ++ dd("setting data to %p", u); ++ ++ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, ++ "lua socket send yield, u: %p", u); ++ ++ return NGX_AGAIN; ++} ++ ++ ++int ++ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r, ++ ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf, ++ size_t *errbuf_size) ++{ ++ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0, ++ "lua tcp socket get send result"); ++ ++ if (u->ft_type) { ++ ngx_stream_lua_ffi_socket_write_error_retval_handler(r, u, errbuf, ++ errbuf_size); ++ return NGX_ERROR; ++ } ++ ++ return NGX_OK; ++} ++ + static ngx_int_t ngx_stream_lua_socket_tcp_conn_op_resume(ngx_stream_lua_request_t *r) diff --git a/t/stream/xrpc/downstream.t b/t/stream/xrpc/downstream.t index ada12bb..2f50535 100644 --- a/t/stream/xrpc/downstream.t +++ b/t/stream/xrpc/downstream.t @@ -140,3 +140,119 @@ qr/stream lua tcp socket allocate new new buf of size \d+/ stream lua tcp socket allocate new new buf of size 4096 stream lua tcp socket allocate new new buf of size 4096 stream lua tcp socket allocate new new buf of size 4608 + + + +=== TEST 7: read & move +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + local exp = { + "hell", + "o wo", + "r", + } + for i = 1, 3 do + local data = sk:receiveany(128) + if data ~= exp[i] then + ngx.log(ngx.ERR, "actual: ", data, ", expected: ", exp[i]) + end + end + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket() + ds:settimeout(5) + + local us = require("resty.apisix.stream.xrpc.socket").upstream.socket() + us:settimeout(50) + assert(us:connect("127.0.0.1", 1995)) + + local p = assert(ds:read(4)) + assert(us:move(ds)) + ngx.sleep(0.01) + local p = assert(ds:read(4)) + assert(us:move(ds)) + ngx.sleep(0.01) + local p = assert(ds:read(1)) + assert(us:move(ds)) + } +--- stream_request +hello world + + + +=== TEST 8: multiple moves +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + local data = sk:receiveany(128) + local exp = "hello wor" + if data ~= exp then + ngx.log(ngx.ERR, "actual: ", data, ", expected: ", exp) + end + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket() + ds:settimeout(5) + + local us = require("resty.apisix.stream.xrpc.socket").upstream.socket() + us:settimeout(50) + assert(us:connect("127.0.0.1", 1995)) + + local p = assert(ds:read(4)) + local p = assert(ds:read(4)) + local p = assert(ds:read(1)) + assert(us:move(ds)) + } +--- stream_request +hello world + + + +=== TEST 9: multiple moves + read over buffer in the middle +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + local data = sk:receive(9 * 24) + local exp = ("123456789"):rep(24) + if data ~= exp then + ngx.log(ngx.ERR, "actual: ", data, ", expected: ", exp) + end + } + } +--- stream_server_config + lua_socket_buffer_size 128; + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + sk:settimeout(5) + + local us = require("resty.apisix.stream.xrpc.socket").upstream.socket() + us:settimeout(50) + assert(us:connect("127.0.0.1", 1995)) + + local len = 9 * 8 + local p = assert(sk:read(len)) + local len = 9 * 16 + local p = assert(sk:read(len)) + assert(us:move(sk)) + } +--- stream_request eval +"123456789" x 24 +--- grep_error_log eval +qr/stream lua tcp socket allocate new new buf of size \d+/ +--- grep_error_log_out +stream lua tcp socket allocate new new buf of size 128 +stream lua tcp socket allocate new new buf of size 144 diff --git a/t/stream/xrpc/fuzzing.t b/t/stream/xrpc/fuzzing.t index d7130ec..5d75918 100644 --- a/t/stream/xrpc/fuzzing.t +++ b/t/stream/xrpc/fuzzing.t @@ -8,6 +8,8 @@ __DATA__ --- stream_server_config lua_socket_buffer_size 128; content_by_lua_block { + math.randomseed(ngx.time()) + local ffi = require("ffi") local sk = require("resty.apisix.stream.xrpc.socket").downstream.socket() sk:settimeout(5) @@ -50,6 +52,7 @@ __DATA__ return end ngx.flush(true) + ngx.sleep(0.01) idx = idx + len end } @@ -57,10 +60,12 @@ __DATA__ --- stream_server_config lua_socket_buffer_size 128; content_by_lua_block { + math.randomseed(ngx.time()) + local ffi = require("ffi") local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() assert(sk:connect("127.0.0.1", 1995)) - sk:settimeout(5) + sk:settimeout(500) local total = 9 * 1024 while total > 0 do local len = math.random(1, 512) @@ -75,3 +80,96 @@ __DATA__ --- stream_request --- stream_response eval "123456789" x 1024 + + + +=== TEST 3: move +--- stream_config + lua_socket_buffer_size 128; + server { + listen 1995; + content_by_lua_block { + local s = ("123456789"):rep(10240) + local total = 9 * 1024 * 10 + local idx = 1 + while total > 0 do + local len = math.random(1, 512) + if len > total then + len = total + end + total = total - len + local n, err = ngx.print(s:sub(idx, idx + len - 1)) + if not n then + ngx.log(ngx.ERR, err) + return + end + ngx.flush(true) + ngx.sleep(0.01) + idx = idx + len + end + } + } +--- stream_server_config + lua_socket_buffer_size 128; + content_by_lua_block { + math.randomseed(ngx.time()) + + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + local dsk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + sk:settimeout(500) + local total = 9 * 1024 * 10 + while total > 0 do + local len = math.random(1, 512) + if len > total then + len = total + end + total = total - len + local p = assert(sk:read(len)) + dsk:move(sk) + end + } +--- stream_request +--- stream_response eval +"123456789" x 10240 +--- timeout: 8 + + + +=== TEST 4: move (multiple read) +--- stream_config + lua_socket_buffer_size 128; + server { + listen 1995; + content_by_lua_block { + local s = ("123456789"):rep(128) + ngx.say(s) + } + } +--- stream_server_config + lua_socket_buffer_size 128; + content_by_lua_block { + math.randomseed(ngx.time()) + + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + local dsk = require("resty.apisix.stream.xrpc.socket").downstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + sk:settimeout(500) + local total = 9 * 128 + while total > 0 do + local len = math.random(1, 512) + if len > total then + len = total + end + total = total - len + local p = assert(sk:read(len)) + if total % 2 == 0 then + dsk:move(sk) + end + end + } +--- stream_request +--- stream_response eval +"123456789" x 128 diff --git a/t/stream/xrpc/upstream.t b/t/stream/xrpc/upstream.t index 6a0b34d..b49e145 100644 --- a/t/stream/xrpc/upstream.t +++ b/t/stream/xrpc/upstream.t @@ -139,3 +139,128 @@ closed "123456789" x 256 . "\n" . "123456789" x 512 + + + +=== TEST 5: empty move +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send("hello world") + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket() + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + + sk:settimeout(5) + assert(ds:move(sk)) + } +--- stream_request +--- stream_response + + + +=== TEST 6: move bigger buffer +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send(("123456789"):rep(1024)) + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + + sk:settimeout(5) + local len = 9 * 512 + local p = assert(sk:read(len)) + + local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket() + assert(ds:move(sk)) + -- mix send operation + assert(ds:send("\n")) + + local p = assert(sk:read(len / 2)) + assert(ds:move(sk)) + } +--- stream_request +--- stream_response eval +"123456789" x 512 . +"\n" . +"123456789" x 256 + + + +=== TEST 7: read over buffer in the middle, move +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send(("123456789"):rep(256)) + sk:send(("abcdefghi"):rep(512)) + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + + sk:settimeout(5) + local len = 9 * 256 + local p = assert(sk:read(len)) + local p = assert(sk:read(len * 2)) + + local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket() + assert(ds:move(sk)) + } +--- stream_request +--- stream_response eval +"123456789" x 256 . +"abcdefghi" x 512 + + + +=== TEST 8: read over buffer in the middle, multiple moves +--- stream_config + server { + listen 1995; + content_by_lua_block { + local sk = ngx.req.socket(true) + sk:send(("123456789"):rep(1024)) + } + } +--- stream_server_config + content_by_lua_block { + local ffi = require("ffi") + local sk = require("resty.apisix.stream.xrpc.socket").upstream.socket() + assert(sk:connect("127.0.0.1", 1995)) + + sk:settimeout(5) + local len = 9 * 256 + local p = assert(sk:read(len)) + + local ds = require("resty.apisix.stream.xrpc.socket").downstream.socket() + assert(ds:move(sk)) + -- mix send operation + assert(ds:send("\n")) + + local p = assert(sk:read(len * 2)) + assert(ds:move(sk)) + } +--- stream_request +--- stream_response eval +"123456789" x 256 . +"\n" . +"123456789" x 512