diff --git a/api_v3.md b/api_v3.md index 0e870e1f..84daa9da 100644 --- a/api_v3.md +++ b/api_v3.md @@ -46,6 +46,7 @@ Method - `serializer`: string - serializer type, default `json`, also support `raw` to keep origin string value. - `extra_headers`: table - adding custom headers for etcd requests. - `sni`: string - adding custom SNI fot etcd TLS requests. + - `unix_socket_proxy`: string - the unix socket path which will be used to proxy the etcd request. The client method returns either a `etcd` object or an `error string`. diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 5371046d..ca27e8ab 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -14,6 +14,7 @@ local str_byte = string.byte local str_char = string.char local ipairs = ipairs local pairs = pairs +local unpack = unpack local re_match = ngx.re.match local type = type local tab_insert = table.insert @@ -56,6 +57,60 @@ local function choose_endpoint(self) end +local function request_uri_via_unix_socket(self, uri, params) + local parsed_uri, err = self:parse_uri(uri, false) + if not parsed_uri then + return nil, err + end + + local path, query + params.scheme, params.host, params.port, path, query = unpack(parsed_uri) + if params.unix_socket_proxy then + if not params.headers then + params.headers = {} + end + + params.headers["Host"] = params.host + params.host = params.unix_socket_proxy + params.port = nil + end + + params.path = params.path or path + params.query = params.query or query + params.ssl_server_name = params.ssl_server_name or params.host + + local res + res, err = self:connect(params) + if not res then + return nil, err + end + + res, err = self:request(params) + if not res then + self:close() + return nil, err + end + + local body + body, err = res:read_body() + if not body then + self:close() + return nil, err + end + + res.body = body + + if params.keepalive == false then + self:close() + + else + self:set_keepalive(params.keepalive_timeout, params.keepalive_pool) + end + + return res, nil +end + + local function http_request_uri(self, http_cli, method, uri, body, headers, keepalive) local endpoint, err = choose_endpoint(self) if not endpoint then @@ -70,7 +125,7 @@ local function http_request_uri(self, http_cli, method, uri, body, headers, keep end local res - res, err = http_cli:request_uri(full_uri, { + res, err = request_uri_via_unix_socket(http_cli, full_uri, { method = method, body = body, headers = headers, @@ -79,6 +134,7 @@ local function http_request_uri(self, http_cli, method, uri, body, headers, keep ssl_cert_path = self.ssl_cert_path, ssl_key_path = self.ssl_key_path, ssl_server_name = self.sni, + unix_socket_proxy = self.unix_socket_proxy, }) if err then @@ -199,6 +255,7 @@ function _M.new(opts) local serializer = opts.serializer local extra_headers = opts.extra_headers local sni = opts.sni + local unix_socket_proxy = opts.unix_socket_proxy if not typeof.uint(timeout) then return nil, 'opts.timeout must be unsigned integer' @@ -228,6 +285,10 @@ function _M.new(opts) return nil, 'opts.password must be string or ignore' end + if unix_socket_proxy and not typeof.string(unix_socket_proxy) then + return nil, 'opts.unix_socket_proxy must be string or ignore' + end + local endpoints = {} local http_hosts if type(http_host) == 'string' then -- signle node @@ -243,12 +304,25 @@ function _M.new(opts) return nil, "invalid http host: " .. host .. ", err: " .. (err or "not matched") end + local addr + if unix_socket_proxy then + addr = unix_socket_proxy + else + addr = m[2] or "127.0.0.1" + end + + local port + if not unix_socket_proxy then + port = m[3] or "2379" + end + tab_insert(endpoints, { full_prefix = host .. utils.normalize(api_prefix), http_host = host, scheme = m[1], host = m[2] or "127.0.0.1", - port = m[3] or "2379", + address = addr, + port = port, api_prefix = api_prefix, }) end @@ -282,6 +356,7 @@ function _M.new(opts) ssl_key_path = opts.ssl_key_path, extra_headers = extra_headers, sni = sni, + unix_socket_proxy = unix_socket_proxy, }, mt) end @@ -581,7 +656,7 @@ local function http_request_chunk(self, http_cli) local ok ok, err = http_cli:connect({ scheme = endpoint.scheme, - host = endpoint.host, + host = endpoint.address, port = endpoint.port, ssl_verify = self.ssl_verify, ssl_cert_path = self.ssl_cert_path, @@ -668,6 +743,10 @@ local function request_chunk(self, method, path, opts, timeout) return nil, err end + if self.unix_socket_proxy then + headers["Host"] = endpoint.host + end + local res res, err = http_cli:request({ method = method, diff --git a/t/v3/unix_socket.t b/t/v3/unix_socket.t new file mode 100644 index 00000000..c0fab5c2 --- /dev/null +++ b/t/v3/unix_socket.t @@ -0,0 +1,128 @@ +use Test::Nginx::Socket::Lua; + +log_level('info'); +no_long_string(); +repeat_each(1); + +my $test_dir = html_dir(); +$ENV{TEST_NGINX_HTML_DIR} ||= $test_dir; + +my $etcd_version = `etcd --version`; +if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.1./) { + plan(skip_all => "etcd is too old, skip v3 protocol"); +} else { + plan 'no_plan'; +} + +our $HttpConfig = <<"_EOC_"; + lua_socket_log_errors off; + lua_package_path 'lib/?.lua;/usr/local/share/lua/5.1/?.lua;;'; + init_by_lua_block { + local cjson = require("cjson.safe") + + function check_res(data, err, val, status) + if err then + ngx.say("err: ", err) + ngx.exit(200) + end + + if val then + if data and data.body.kvs==nil then + ngx.exit(404) + end + if data and data.body.kvs and val ~= data.body.kvs[1].value then + ngx.say("failed to check value") + ngx.log(ngx.ERR, "failed to check value, got: ", data.body.kvs[1].value, + ", expect: ", val) + ngx.exit(200) + else + ngx.say("checked val as expect: ", val) + end + end + + if status and status ~= data.status then + ngx.exit(data.status) + end + end + } + + server { + listen unix:$test_dir/lua-resty-etcd.sock; + location / { + access_by_lua_block { + ngx.log(ngx.WARN, "hit with host ", ngx.var.http_host) + } + proxy_pass http://127.0.0.1:2379; + proxy_http_version 1.1; + proxy_set_header Connection ""; + proxy_set_header Host \$http_host; + } + } +_EOC_ + +run_tests(); + +__DATA__ + +=== TEST 1: request over unix socket +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local etcd, err = require("resty.etcd").new({protocol = "v3", unix_socket_proxy = "unix:$TEST_NGINX_HTML_DIR/lua-resty-etcd.sock"}) + check_res(etcd, err) + + local res, err = etcd:set("/test", "abc") + check_res(res, err) + + ngx.timer.at(0.1, function () + etcd:set("/test", "bcd3") + end) + + ngx.timer.at(0.2, function () + etcd:set("/test", "bcd4") + end) + + local cur_time = ngx.now() + local body_chunk_fun, err, http_cli = etcd:watch("/test", {timeout = 0.5, need_cancel = true}) + + if type(http_cli) ~= "table" then + ngx.say("need_cancel failed") + end + + if not body_chunk_fun then + ngx.say("failed to watch: ", err) + end + + local chunk, err = body_chunk_fun() + ngx.say("created: ", chunk.result.created) + local chunk, err = body_chunk_fun() + ngx.say("value: ", chunk.result.events[1].kv.value) + + local res, err = etcd:watchcancel(http_cli) + if not res then + ngx.say("failed to cancel: ", err) + end + + local chunk, err = body_chunk_fun() + ngx.say(err) + + ngx.say("ok") + } + } +--- request +GET /t +--- no_error_log +[error] +--- grep_error_log eval +qr/hit with host 127.0.0.1/ +--- grep_error_log_out +hit with host 127.0.0.1 +hit with host 127.0.0.1 +hit with host 127.0.0.1 +--- response_body +created: true +value: bcd3 +closed +ok +--- timeout: 5