Skip to content

Commit 24032fb

Browse files
committed
fix(*) wait both upload and download coroutines to exit,
add timeout options, add backwards compatibility
1 parent 85fa461 commit 24032fb

File tree

2 files changed

+36
-22
lines changed

2 files changed

+36
-22
lines changed

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,15 @@ Where `protocol` is the identified protocol in lowercase string, and `expected`
221221
API
222222
=======
223223

224+
multiplexer.new
225+
--------------------------
226+
**syntax:** *multiplexer:new(connect_timeout, send_timeout, read_timeout)*
227+
228+
Initialize the multiplexer instance. And sets the connect timeout thresold, send timeout threshold, and read timeout threshold, as in [tcpsock:settimeouts](https://github.com/openresty/lua-nginx-module#tcpsocksettimeouts).
229+
230+
231+
[Back to TOC](#table-of-contents)
232+
224233
multiplexer.load_protocols
225234
--------------------------
226235
**syntax:** *multiplexer:load_protocols("protocol-1", "protocol-2", ...)*

lib/resty/multiplexer/init.lua

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -102,24 +102,22 @@ _M.matcher_config = setmetatable({}, {
102102
end
103103
})
104104

105-
function _M.new(self, bufsize, timeout)
105+
function _M.new(self, connect_timeout, send_timeout, read_timeout)
106106
local srvsock, err = tcp()
107107
if not srvsock then
108108
return nil, err
109109
end
110-
--srvsock:settimeout(timeout or 10000)
110+
srvsock:settimeouts(connect_timeout or 10000, send_timeout or 10000, read_timeout or 3600000)
111111

112112
local reqsock, err = ngx.req.socket()
113113
if not reqsock then
114114
return nil, err
115115
end
116-
--reqsock:settimeout(timeout or 10000)
116+
reqsock:settimeouts(connect_timeout or 10000, send_timeout or 10000, read_timeout or 3600000)
117+
117118
return setmetatable({
118119
srvsock = srvsock,
119120
reqsock = reqsock,
120-
exit_flag = false,
121-
server_name = nil,
122-
bufsize = bufsize or 1024
123121
}, mt)
124122
end
125123

@@ -130,7 +128,9 @@ local function _cleanup(self)
130128
local srvsock = self.srvsock
131129
local reqsock = self.reqsock
132130
if srvsock ~= nil then
133-
srvsock:shutdown("send")
131+
if srvsock.shutdown then
132+
srvsock:shutdown("send")
133+
end
134134
if srvsock.close ~= nil then
135135
local ok, err = srvsock:setkeepalive()
136136
if not ok then
@@ -140,7 +140,9 @@ local function _cleanup(self)
140140
end
141141

142142
if reqsock ~= nil then
143-
reqsock:shutdown("send")
143+
if reqsock.shutdown then
144+
reqsock:shutdown("send")
145+
end
144146
if reqsock.close ~= nil then
145147
local ok, err = reqsock:close()
146148
if not ok then
@@ -158,9 +160,12 @@ local function probe(self)
158160
local bytes_read = 0
159161
local buf = ''
160162
for _, v in pairs(_M.protocols) do
161-
ngx.log(ngx.INFO, "[multiplexer] check on position " .. v[1])
163+
ngx.log(ngx.INFO, "[multiplexer] waiting for ", v[1] - bytes_read, " more bytes")
162164
-- read more bytes
163-
local new_buf, err = self.reqsock:receive(v[1] - bytes_read)
165+
local new_buf, err, partial = self.reqsock:receive(v[1] - bytes_read)
166+
if err then
167+
return 0, nil, buf .. partial
168+
end
164169
-- concat buffer
165170
buf = buf .. new_buf
166171
-- check protocol
@@ -177,14 +182,14 @@ end
177182

178183
local function _upl(self)
179184
-- proxy client request to server
180-
local buf, len, err, hd, _
185+
local buf, err, partial
181186
local rsock = self.reqsock
182187
local ssock = self.srvsock
183188
while true do
184-
buf, err, _ = rsock:receive("*p")
189+
buf, err, partial = rsock:receive("*p")
185190
if err then
186-
if ssock.close ~= nil then
187-
_, err = ssock:send(_)
191+
if ssock.close ~= nil and partial then
192+
_, err = ssock:send(partial)
188193
end
189194
break
190195
elseif buf == nil then
@@ -200,14 +205,14 @@ end
200205

201206
local function _dwn(self)
202207
-- proxy response to client
203-
local buf, len, err, hd, _
208+
local buf, err, partial
204209
local rsock = self.reqsock
205210
local ssock = self.srvsock
206211
while true do
207-
buf, err, _ = ssock:receive("*p")
212+
buf, err, partial = ssock:receive("*p")
208213
if err then
209-
if rsock.close ~= nil then
210-
_, err = rsock:send(_)
214+
if rsock.close ~= nil and partial then
215+
_, err = rsock:send(partial)
211216
end
212217
break
213218
elseif buf == nil then
@@ -265,10 +270,10 @@ function _M.run(self)
265270
-- send buffer
266271
self.srvsock:send(buffer)
267272

268-
wait(
269-
spawn(_upl, self),
270-
spawn(_dwn, self)
271-
)
273+
local co_upl = spawn(_upl, self)
274+
local co_dwn = spawn(_dwn, self)
275+
wait(co_upl)
276+
wait(co_dwn)
272277

273278
break
274279
end

0 commit comments

Comments
 (0)