Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions lib/resty/apisix/stream/xrpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,21 @@ int
ngx_stream_lua_ffi_socket_tcp_get_read_buf_result(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u, u_char **res, size_t len,
u_char *errbuf, size_t *errbuf_size);

int
ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *ds,
u_char *errbuf, size_t *errbuf_size);

int
ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf,
size_t *errbuf_size);
]]
local socket_tcp_read = C.ngx_stream_lua_ffi_socket_tcp_read_buf
local socket_tcp_get_read_result = C.ngx_stream_lua_ffi_socket_tcp_get_read_buf_result
local socket_tcp_move = C.ngx_stream_lua_ffi_socket_tcp_send_from_socket
local socket_tcp_get_move_result = C.ngx_stream_lua_ffi_socket_tcp_get_send_result


local ERR_BUF_SIZE = 256
Expand Down Expand Up @@ -100,6 +112,58 @@ local function read_buf(cosocket, len)
end


-- move the buffers from src cosocket to dst cosocket. The buffers are from previous one or multiple
-- read calls. It is equal to send multiple read buffer in the src cosocket to the dst cosocket.
local function move(dst, src)
local r = get_request()
if not r then
error("no request found", 2)
end

if src == dst then
error("can't move buffer in the same socket", 2)
end

if not src then
error("no source socket found", 2)
end

local dst_sk = get_tcp_socket(dst)
local src_sk = get_tcp_socket(src)
if not src_sk then
error("no source socket found", 2)
end

local errbuf = get_string_buf(ERR_BUF_SIZE)
local errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE

local rc = socket_tcp_move(r, dst_sk, src_sk, errbuf, errbuf_size)
if rc == FFI_DONE then
error(ffi_str(errbuf, errbuf_size[0]), 2)
end

while true do
if rc == FFI_ERROR then
return nil, ffi_str(errbuf, errbuf_size[0])
end

if rc >= 0 then
return true
end

assert(rc == FFI_AGAIN)

co_yield()

errbuf = get_string_buf(ERR_BUF_SIZE)
errbuf_size = get_size_ptr()
errbuf_size[0] = ERR_BUF_SIZE
rc = socket_tcp_get_move_result(r, dst_sk, errbuf, errbuf_size)
end
end


local function patch_methods(sk)
local methods = getmetatable(sk).__index
local copy = tab_clone(methods)
Expand All @@ -109,6 +173,7 @@ local function patch_methods(sk)
copy.receiveuntil = nil

copy.read = read_buf
copy.move = move

return {__index = copy}
end
Expand Down
263 changes: 255 additions & 8 deletions patch/1.19.9/ngx_stream_lua-xrpc.patch
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
diff --git src/ngx_stream_lua_socket_tcp.c src/ngx_stream_lua_socket_tcp.c
index 7fcfb45..601673d 100644
index 7fcfb45..8fc96cf 100644
--- src/ngx_stream_lua_socket_tcp.c
+++ src/ngx_stream_lua_socket_tcp.c
@@ -234,6 +234,41 @@ enum {
Expand Down Expand Up @@ -44,10 +44,39 @@ index 7fcfb45..601673d 100644

static char ngx_stream_lua_raw_req_socket_metatable_key;
static char ngx_stream_lua_tcp_socket_metatable_key;
@@ -6005,6 +6040,329 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
@@ -6005,6 +6040,576 @@ static ngx_int_t ngx_stream_lua_socket_insert_buffer(
}


+static void
+ngx_stream_lua_ffi_socket_reset_buf(ngx_stream_lua_ctx_t *ctx,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_chain_t *cl = u->bufs_in;
+ ngx_chain_t **ll = NULL;
+
+ if (cl->next) {
+ ll = &cl->next;
+ }
+
+ if (ll) {
+ *ll = ctx->free_recv_bufs;
+ ctx->free_recv_bufs = u->bufs_in;
+ u->bufs_in = u->buf_in;
+ }
+
+ if (u->buffer.pos == u->buffer.last) {
+ u->buffer.pos = u->buffer.start;
+ u->buffer.last = u->buffer.pos;
+ }
+
+ if (u->bufs_in) {
+ u->buf_in->buf->last = u->buffer.pos;
+ u->buf_in->buf->pos = u->buffer.pos;
+ }
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_dummy_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
Expand Down Expand Up @@ -257,11 +286,13 @@ index 7fcfb45..601673d 100644
+ u->buffer = *u->buf_in->buf;
+
+ } else {
+ size_t remain = u->buffer.end - u->buffer.pos;
+ size_t remain_space = u->buffer.end - u->buffer.pos;
+ size_t remain_data = u->buffer.last - u->buffer.pos;
+ size_t buf_len;
+ ngx_chain_t *cl;
+ u_char *pos;
+ ngx_chain_t *cl, *tmp_cl;
+
+ if (remain < len) {
+ if (remain_space < len) {
+ buf_len = len > u->conf->buffer_size ? len : u->conf->buffer_size;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
Expand All @@ -277,13 +308,17 @@ index 7fcfb45..601673d 100644
+ return NGX_DONE;
+ }
+
+ u->buf_in->next = cl;
+ for (tmp_cl = u->bufs_in; tmp_cl->next; tmp_cl = tmp_cl->next) {}
+ tmp_cl->next = cl;
+ u->buf_in = cl;
+
+ cl->buf->last = ngx_copy(cl->buf->last, u->buffer.pos, remain);
+ u->buffer.last = u->buffer.pos;
+
+ pos = u->buffer.pos;
+ u->buffer = *cl->buf;
+
+ if (remain_data > 0) {
+ u->buffer.last = ngx_copy(u->buffer.last, pos, remain_data);
+ }
+ }
+ }
+
Expand Down Expand Up @@ -370,6 +405,218 @@ index 7fcfb45..601673d 100644
+ errbuf, errbuf_size);
+}
+
+
+static void
+ngx_stream_lua_ffi_socket_write_error_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ u_char *errbuf, size_t *errbuf_size)
+{
+ ngx_uint_t ft_type;
+
+ if (u->write_co_ctx) {
+ u->write_co_ctx->cleanup = NULL;
+ }
+
+ ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0);
+
+ ft_type = u->ft_type;
+ u->ft_type = 0;
+
+ ngx_stream_lua_ffi_socket_prepare_error_retvals(r, u, ft_type,
+ errbuf, errbuf_size);
+}
+
+
+int
+ngx_stream_lua_ffi_socket_tcp_send_from_socket(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_socket_tcp_upstream_t *src,
+ u_char *errbuf, size_t *errbuf_size)
+{
+ size_t len = 0;
+ ngx_int_t rc;
+ ngx_chain_t *cl, *in_cl;
+ ngx_stream_lua_ctx_t *ctx;
+ int tcp_nodelay;
+ ngx_buf_t *b;
+ ngx_connection_t *c;
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_stream_core_srv_conf_t *clcf;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ if (u == NULL || u->peer.connection == NULL || u->write_closed) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "attempt to send data on a closed socket: u:%p, "
+ "c:%p, ft:%d eof:%d",
+ u, u ? u->peer.connection : NULL,
+ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
+ }
+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "closed") - errbuf;
+ return NGX_ERROR;
+ }
+
+ if (u->request != r) {
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "bad request")
+ - errbuf;
+ return NGX_DONE;
+ }
+
+ ngx_stream_lua_ffi_socket_check_busy_connecting(r, u, errbuf, errbuf_size);
+ ngx_stream_lua_ffi_socket_check_busy_writing(r, u, errbuf, errbuf_size);
+
+ if (u->body_downstream) {
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size,
+ "attempt to write to request sockets")
+ - errbuf;
+ return NGX_DONE;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua tcp socket send timeout: %M", u->send_timeout);
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ for (in_cl = src->bufs_in; in_cl; in_cl = in_cl->next) {
+ b = in_cl->buf;
+
+ len += b->last - b->pos;
+
+ ngx_log_debug3(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua tcp socket move cl:%p buf %p, len: %d",
+ in_cl, b, b->last - b->pos);
+
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua tcp socket move total buf %p, len: %d",
+ src->bufs_in, len);
+
+ if (len == 0) {
+ return NGX_OK;
+ }
+
+ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool,
+ &ctx->free_bufs, len);
+
+ if (cl == NULL) {
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size, "no memory") - errbuf;
+ return NGX_DONE;
+ }
+
+ /* TODO: avoid copying (it requires to modify the way cosocket sends data) */
+ for (in_cl = src->bufs_in; in_cl; in_cl = in_cl->next) {
+ b = in_cl->buf;
+ cl->buf->last = ngx_copy(cl->buf->last, b->pos, b->last - b->pos);
+ }
+
+ ngx_stream_lua_ffi_socket_reset_buf(ctx, src);
+
+ u->request_bufs = cl;
+
+ u->request_len = len;
+
+ /* mimic ngx_stream_upstream_init_request here */
+
+ clcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_core_module);
+ c = u->peer.connection;
+
+ if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "lua socket tcp_nodelay");
+
+ tcp_nodelay = 1;
+
+ if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
+ (const void *) &tcp_nodelay, sizeof(int))
+ == -1)
+ {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+ if (llcf->log_socket_errors) {
+ ngx_connection_error(c, ngx_socket_errno,
+ "setsockopt(TCP_NODELAY) "
+ "failed");
+ }
+
+ *errbuf_size = ngx_snprintf(errbuf, *errbuf_size,
+ "setsocketopt tcp_nodelay failed")
+ - errbuf;
+ return NGX_ERROR;
+ }
+
+ c->tcp_nodelay = NGX_TCP_NODELAY_SET;
+ }
+
+ u->write_waiting = 0;
+ u->write_co_ctx = NULL;
+
+ ngx_stream_lua_probe_socket_tcp_send_start(r, u, b->pos, len);
+
+ rc = ngx_stream_lua_socket_send(r, u);
+
+ dd("socket send returned %d", (int) rc);
+
+ if (rc == NGX_ERROR) {
+ ngx_stream_lua_ffi_socket_write_error_retval_handler(r, u, errbuf,
+ errbuf_size);
+ return NGX_ERROR;
+ }
+
+ if (rc == NGX_OK) {
+ return rc;
+ }
+
+ /* rc == NGX_AGAIN */
+
+ coctx = ctx->cur_co_ctx;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ if (u->raw_downstream) {
+ ctx->writing_raw_req_socket = 1;
+ }
+
+ if (ctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ u->write_co_ctx = coctx;
+ u->write_waiting = 1;
+ u->write_prepare_retvals = ngx_stream_lua_socket_tcp_dummy_retval_handler;
+
+ dd("setting data to %p", u);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "lua socket send yield, u: %p", u);
+
+ return NGX_AGAIN;
+}
+
+
+int
+ngx_stream_lua_ffi_socket_tcp_get_send_result(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, u_char *errbuf,
+ size_t *errbuf_size)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua tcp socket get send result");
+
+ if (u->ft_type) {
+ ngx_stream_lua_ffi_socket_write_error_retval_handler(r, u, errbuf,
+ errbuf_size);
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
static ngx_int_t
ngx_stream_lua_socket_tcp_conn_op_resume(ngx_stream_lua_request_t *r)
Expand Down
Loading