Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: support openresty 1.15.8 and later
  • Loading branch information
fffonion committed Aug 31, 2020
1 parent 24032fb commit c898715
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 41 deletions.
44 changes: 36 additions & 8 deletions README.md
Expand Up @@ -30,9 +30,13 @@ This library implemented a transparent port service multiplexer, which can be us

Note that nginx [stream module](https://nginx.org/en/docs/stream/ngx_stream_core_module.html) and [stream-lua-nginx-module](https://github.com/openresty/stream-lua-nginx-module) is required.

Also a customed [patch](patches/stream-lua-readpartial.patch) from [@fcicq](https://github.com/fcicq) is needed. The origin discussion can be found [here](https://github.com/fffonion/lua-resty-sniproxy/issues/1).
Tested on Openresty >= 1.13.6.1.

Tested on Openresty 1.13.6.1.
With OpenResty 1.13.6.1, a customed [patch](patches/stream-lua-readpartial.patch) from [@fcicq](https://github.com/fcicq) is needed. The origin discussion can be found [here](https://github.com/fffonion/lua-resty-sniproxy/issues/1). And native
proxying is not supported as `reqsock:peek` is missing.

Starting OpenResty 1.15.8.1, only native proxying is supported and no patch is needed. Lua land proxying will be
possible when stream-lua-nginx-module implemented `tcpsock:receiveany`.

[Back to TOC](#table-of-contents)

Expand Down Expand Up @@ -70,13 +74,37 @@ stream {

resolver 8.8.8.8;

# for OpenResty >= 1.13.6.1, native Nginx proxying
lua_add_variable $multiplexer_upstream;
server {
listen 80;
content_by_lua_block {
local mul = require("resty.multiplexer")
local mp = mul:new()
mp:run()
}
error_log /var/log/nginx/multiplexer-error.log error;
listen 443;

resolver 8.8.8.8;

preread_by_lua_block {
local mul = require("resty.multiplexer")
local mp = mul:new()
mp:preread_by()
}
proxy_pass $multiplexer_upstream;
}

# for OpenResty < 1.13.6.1, Lua land proxying
server {
error_log /var/log/nginx/multiplexer-error.log error;
listen 443;

resolver 8.8.8.8;

server {
listen 80;
content_by_lua_block {
local mul = require("resty.multiplexer")
local mp = mul:new()
mp:content_by()
}
}
}
}
```
Expand Down
124 changes: 91 additions & 33 deletions lib/resty/multiplexer/init.lua
Expand Up @@ -103,12 +103,16 @@ _M.matcher_config = setmetatable({}, {
})

function _M.new(self, connect_timeout, send_timeout, read_timeout)
if _M.rules == nil or _M.matchers == nil then
return nil, "[multiplexer] no rule is defined"
end

local srvsock, err = tcp()
if not srvsock then
return nil, err
end
srvsock:settimeouts(connect_timeout or 10000, send_timeout or 10000, read_timeout or 3600000)

local reqsock, err = ngx.req.socket()
if not reqsock then
return nil, err
Expand Down Expand Up @@ -138,7 +142,7 @@ local function _cleanup(self)
end
end
end

if reqsock ~= nil then
if reqsock.shutdown then
reqsock:shutdown("send")
Expand All @@ -150,10 +154,27 @@ local function _cleanup(self)
end
end
end

end

local function probe(self)
local function probe(sock, is_preread)
local f
if is_preread then
local read = 0
-- peek always start from beginning
f = function(sock, len)
local b, err = sock:peek(len + read)
if err then
return b, err
end
b = b:sub(read+1)
read = read + len
return b
end
else
f = sock.receive
end

if _M.protocols == nil then
return 0, nil, ""
end
Expand All @@ -162,7 +183,7 @@ local function probe(self)
for _, v in pairs(_M.protocols) do
ngx.log(ngx.INFO, "[multiplexer] waiting for ", v[1] - bytes_read, " more bytes")
-- read more bytes
local new_buf, err, partial = self.reqsock:receive(v[1] - bytes_read)
local new_buf, err, partial = f(sock, v[1] - bytes_read)
if err then
return 0, nil, buf .. partial
end
Expand Down Expand Up @@ -218,45 +239,67 @@ local function _dwn(self)
elseif buf == nil then
break
end

_, err = rsock:send(buf)
if err then
break
end
end
end

function _M.run(self)
while true do
if _M.matchers == nil then
ngx.log(ngx.ERR, "[multiplexer] no rule is defined")
local function _select_upstream(protocol_name)
local upstream, port
for _, v in pairs(_M.rules) do
local is_match = false
-- stop before last to elements of rules, which is server addr and port
for i = 1, #v - 2, 1 do
local m = _M.matchers[v[i][1]]
if not m then
ngx.log(ngx.WARN, "[multiplexer] try to use a matcher '", v[i][1], "', which is not loaded ")
elseif m.match(protocol_name, v[i][2]) then
is_match = true
end
end
if is_match then
upstream = v[#v - 1]
port = v[#v]
break
end
local code, protocol, buffer = probe(self)
end
return upstream, port, nil
end

function _M.preread_by(self)
local code, protocol, _ = probe(self.reqsock, true)
if code ~= 0 then
ngx.log(ngx.INFO, "[multiplexer] cleaning up with an exit code ", code)
return
end
ngx.log(ngx.NOTICE, format("[multiplexer] protocol:%s exit:%d", protocol, code))

local upstream, port, _ = _select_upstream(protocol)
if upstream == nil or port == nil then
ngx.log(ngx.NOTICE, "[multiplexer] no matches found for this request")
return
end

if upstream:sub(1, 5) ~= "unix:" then
upstream = upstream .. ":" .. tostring(port)
end
ngx.log(ngx.INFO, "[multiplexer] selecting upstream: ", upstream)
ngx.var.multiplexer_upstream = upstream
end

function _M.content_by(self)
while true do
local code, protocol, buffer = probe(self.reqsock)
if code ~= 0 then
ngx.log(ngx.INFO, "[multiplexer] cleaning up with an exit code ", code)
break
end
ngx.log(ngx.NOTICE, format("[multiplexer] protocol:%s exit:%d", protocol, code))
local upstream, port

for _, v in pairs(_M.rules) do
local is_match = false
-- stop before last to elements of rules, which is server addr and port
for i = 1, #v - 2, 1 do
local m = _M.matchers[v[i][1]]
if not m then
ngx.log(ngx.WARN, "[multiplexer] try to use a matcher '", v[i][1], "', which is not loaded ")
elseif m.match(protocol, v[i][2]) then
is_match = true
end
end
if is_match then
upstream = v[#v - 1]
port = v[#v]
break
end
end
local upstream, port = _select_upstream(protocol)

if upstream == nil or port == nil then
ngx.log(ngx.NOTICE, "[multiplexer] no matches found for this request")
break
Expand All @@ -269,16 +312,31 @@ function _M.run(self)
end
-- send buffer
self.srvsock:send(buffer)

local co_upl = spawn(_upl, self)
local co_dwn = spawn(_dwn, self)
wait(co_upl)
wait(co_dwn)

break
end
_cleanup(self)


end

-- backward compatibility
function _M.run(self)
local phase = ngx.get_phase()
if phase == 'content' then
ngx.log(ngx.ERR, "content_by")
self:content_by()
elseif phase == 'preread' then
ngx.log(ngx.ERR, "preread_by")
self:preread_by()
else
ngx.log(ngx.ERR, "multiplexer doesn't support running in ", phase)
ngx.exit(ngx.ERROR)
end
end


Expand Down

0 comments on commit c898715

Please sign in to comment.