Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Method
- `serializer`: string - serializer type, default `json`, also support `raw` to keep origin string value.
- `extra_headers`: table - adding custom headers for etcd requests.
- `sni`: string - adding custom SNI fot etcd TLS requests.
- `unix_socket_proxy`: string - the unix socket path which will be used to proxy the etcd request.

The client method returns either a `etcd` object or an `error string`.

Expand Down
85 changes: 82 additions & 3 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ local str_byte = string.byte
local str_char = string.char
local ipairs = ipairs
local pairs = pairs
local unpack = unpack
local re_match = ngx.re.match
local type = type
local tab_insert = table.insert
Expand Down Expand Up @@ -56,6 +57,60 @@ local function choose_endpoint(self)
end


local function request_uri_via_unix_socket(self, uri, params)
local parsed_uri, err = self:parse_uri(uri, false)
if not parsed_uri then
return nil, err
end

local path, query
params.scheme, params.host, params.port, path, query = unpack(parsed_uri)
if params.unix_socket_proxy then
if not params.headers then
params.headers = {}
end

params.headers["Host"] = params.host
params.host = params.unix_socket_proxy
params.port = nil
end

params.path = params.path or path
params.query = params.query or query
params.ssl_server_name = params.ssl_server_name or params.host

local res
res, err = self:connect(params)
if not res then
return nil, err
end

res, err = self:request(params)
if not res then
self:close()
return nil, err
end

local body
body, err = res:read_body()
if not body then
self:close()
return nil, err
end

res.body = body

if params.keepalive == false then
self:close()

else
self:set_keepalive(params.keepalive_timeout, params.keepalive_pool)
end

return res, nil
end


local function http_request_uri(self, http_cli, method, uri, body, headers, keepalive)
local endpoint, err = choose_endpoint(self)
if not endpoint then
Expand All @@ -70,7 +125,7 @@ local function http_request_uri(self, http_cli, method, uri, body, headers, keep
end

local res
res, err = http_cli:request_uri(full_uri, {
res, err = request_uri_via_unix_socket(http_cli, full_uri, {
method = method,
body = body,
headers = headers,
Expand All @@ -79,6 +134,7 @@ local function http_request_uri(self, http_cli, method, uri, body, headers, keep
ssl_cert_path = self.ssl_cert_path,
ssl_key_path = self.ssl_key_path,
ssl_server_name = self.sni,
unix_socket_proxy = self.unix_socket_proxy,
})

if err then
Expand Down Expand Up @@ -199,6 +255,7 @@ function _M.new(opts)
local serializer = opts.serializer
local extra_headers = opts.extra_headers
local sni = opts.sni
local unix_socket_proxy = opts.unix_socket_proxy

if not typeof.uint(timeout) then
return nil, 'opts.timeout must be unsigned integer'
Expand Down Expand Up @@ -228,6 +285,10 @@ function _M.new(opts)
return nil, 'opts.password must be string or ignore'
end

if unix_socket_proxy and not typeof.string(unix_socket_proxy) then
return nil, 'opts.unix_socket_proxy must be string or ignore'
end

local endpoints = {}
local http_hosts
if type(http_host) == 'string' then -- signle node
Expand All @@ -243,12 +304,25 @@ function _M.new(opts)
return nil, "invalid http host: " .. host .. ", err: " .. (err or "not matched")
end

local addr
if unix_socket_proxy then
addr = unix_socket_proxy
else
addr = m[2] or "127.0.0.1"
end

local port
if not unix_socket_proxy then
port = m[3] or "2379"
end

tab_insert(endpoints, {
full_prefix = host .. utils.normalize(api_prefix),
http_host = host,
scheme = m[1],
host = m[2] or "127.0.0.1",
port = m[3] or "2379",
address = addr,
port = port,
api_prefix = api_prefix,
})
end
Expand Down Expand Up @@ -282,6 +356,7 @@ function _M.new(opts)
ssl_key_path = opts.ssl_key_path,
extra_headers = extra_headers,
sni = sni,
unix_socket_proxy = unix_socket_proxy,
},
mt)
end
Expand Down Expand Up @@ -581,7 +656,7 @@ local function http_request_chunk(self, http_cli)
local ok
ok, err = http_cli:connect({
scheme = endpoint.scheme,
host = endpoint.host,
host = endpoint.address,
port = endpoint.port,
ssl_verify = self.ssl_verify,
ssl_cert_path = self.ssl_cert_path,
Expand Down Expand Up @@ -668,6 +743,10 @@ local function request_chunk(self, method, path, opts, timeout)
return nil, err
end

if self.unix_socket_proxy then
headers["Host"] = endpoint.host
end

local res
res, err = http_cli:request({
method = method,
Expand Down
128 changes: 128 additions & 0 deletions t/v3/unix_socket.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use Test::Nginx::Socket::Lua;

log_level('info');
no_long_string();
repeat_each(1);

my $test_dir = html_dir();
$ENV{TEST_NGINX_HTML_DIR} ||= $test_dir;

my $etcd_version = `etcd --version`;
if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.1./) {
plan(skip_all => "etcd is too old, skip v3 protocol");
} else {
plan 'no_plan';
}

our $HttpConfig = <<"_EOC_";
lua_socket_log_errors off;
lua_package_path 'lib/?.lua;/usr/local/share/lua/5.1/?.lua;;';
init_by_lua_block {
local cjson = require("cjson.safe")

function check_res(data, err, val, status)
if err then
ngx.say("err: ", err)
ngx.exit(200)
end

if val then
if data and data.body.kvs==nil then
ngx.exit(404)
end
if data and data.body.kvs and val ~= data.body.kvs[1].value then
ngx.say("failed to check value")
ngx.log(ngx.ERR, "failed to check value, got: ", data.body.kvs[1].value,
", expect: ", val)
ngx.exit(200)
else
ngx.say("checked val as expect: ", val)
end
end

if status and status ~= data.status then
ngx.exit(data.status)
end
end
}

server {
listen unix:$test_dir/lua-resty-etcd.sock;
location / {
access_by_lua_block {
ngx.log(ngx.WARN, "hit with host ", ngx.var.http_host)
}
proxy_pass http://127.0.0.1:2379;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host \$http_host;
}
}
_EOC_

run_tests();

__DATA__

=== TEST 1: request over unix socket
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local etcd, err = require("resty.etcd").new({protocol = "v3", unix_socket_proxy = "unix:$TEST_NGINX_HTML_DIR/lua-resty-etcd.sock"})
check_res(etcd, err)

local res, err = etcd:set("/test", "abc")
check_res(res, err)

ngx.timer.at(0.1, function ()
etcd:set("/test", "bcd3")
end)

ngx.timer.at(0.2, function ()
etcd:set("/test", "bcd4")
end)

local cur_time = ngx.now()
local body_chunk_fun, err, http_cli = etcd:watch("/test", {timeout = 0.5, need_cancel = true})

if type(http_cli) ~= "table" then
ngx.say("need_cancel failed")
end

if not body_chunk_fun then
ngx.say("failed to watch: ", err)
end

local chunk, err = body_chunk_fun()
ngx.say("created: ", chunk.result.created)
local chunk, err = body_chunk_fun()
ngx.say("value: ", chunk.result.events[1].kv.value)

local res, err = etcd:watchcancel(http_cli)
if not res then
ngx.say("failed to cancel: ", err)
end

local chunk, err = body_chunk_fun()
ngx.say(err)

ngx.say("ok")
}
}
--- request
GET /t
--- no_error_log
[error]
--- grep_error_log eval
qr/hit with host 127.0.0.1/
--- grep_error_log_out
hit with host 127.0.0.1
hit with host 127.0.0.1
hit with host 127.0.0.1
--- response_body
created: true
value: bcd3
closed
ok
--- timeout: 5