Skip to content

Commit

Permalink
Merge 3222bf4 into a7a4fbe
Browse files Browse the repository at this point in the history
  • Loading branch information
nic-chen committed Jul 30, 2020
2 parents a7a4fbe + 3222bf4 commit 1c8e0e9
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 6 deletions.
9 changes: 9 additions & 0 deletions apisix/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ function _M.plugin(plugin_name)
end


function _M.consumers()
if not consumers then
return nil, nil
end

return consumers.values, consumers.conf_version
end


function _M.init_worker()
local err
consumers, err = core.config.new("/consumers", {
Expand Down
53 changes: 50 additions & 3 deletions apisix/core/config_etcd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ local mt = {
end
}


local function getkey(etcd_cli, key)
if not etcd_cli then
return nil, "not inited"
end

local res, err = etcd_cli:get(key)
if not res then
-- log.error("failed to get key from etcd: ", err)
return nil, err
end

if type(res.body) ~= "table" then
return nil, "failed to get key from etcd"
end

return res
end


local function readdir(etcd_cli, key)
if not etcd_cli then
return nil, nil, "not inited"
Expand All @@ -67,12 +87,12 @@ local function readdir(etcd_cli, key)
return res
end

local function waitdir(etcd_cli, key, modified_index)
local function waitdir(etcd_cli, key, modified_index, timeout)
if not etcd_cli then
return nil, nil, "not inited"
end

local res, err = etcd_cli:waitdir(key, modified_index)
local res, err = etcd_cli:waitdir(key, modified_index, timeout)
if not res then
-- log.error("failed to get key from etcd: ", err)
return nil, err
Expand Down Expand Up @@ -201,9 +221,25 @@ local function sync_data(self)
return true
end

local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1)
-- for fetch the etcd index
local key_res, _ = getkey(self.etcd_cli, self.key)

local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout)

log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1)
log.info("res: ", json.delay_encode(dir_res, true))
if err and err == "timeout" then
if key_res and key_res.headers then
local key_index = key_res.headers["X-Etcd-Index"]
local key_idx = key_index and tonumber(key_index) or 0
if key_idx and key_idx > self.prev_index then
-- Avoid the index to exceed 1000 by updating other keys
-- that will causing a full reload
self:upgrade_version(key_index)
end
end
end

if not dir_res then
return false, err
end
Expand Down Expand Up @@ -330,6 +366,15 @@ function _M.get(self, key)
end


function _M.getkey(self, key)
if not self.running then
return nil, "stoped"
end

return getkey(self.etcd_cli, key)
end


local function _automatic_fetch(premature, self)
if premature then
return
Expand Down Expand Up @@ -395,6 +440,7 @@ function _M.new(key, opts)
local automatic = opts and opts.automatic
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local timeout = opts and opts.timeout

local obj = setmetatable({
etcd_cli = etcd_cli,
Expand All @@ -410,6 +456,7 @@ function _M.new(key, opts)
prev_index = nil,
last_err = nil,
last_err_time = nil,
timeout = timeout,
filter = filter_fun,
}, mt)

Expand Down
9 changes: 9 additions & 0 deletions apisix/http/router/radixtree_sni.lua
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ function _M.match_and_set(api_ctx)
end


function _M.ssls()
if not ssl_certificates then
return nil, nil
end

return ssl_certificates.values, ssl_certificates.conf_version
end


function _M.init_worker()
local err
ssl_certificates, err = core.config.new("/ssl", {
Expand Down
9 changes: 9 additions & 0 deletions apisix/plugins/grpc-transcode/proto.lua
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ function _M.fetch(proto_id)
end


function _M.protos()
if not protos then
return nil, nil
end

return protos.values, protos.conf_version
end


function _M.init()
local err
protos, err = core.config.new("/proto", {
Expand Down
109 changes: 106 additions & 3 deletions apisix/plugins/prometheus/exporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,19 @@ local ngx_capture = ngx.location.capture
local re_gmatch = ngx.re.gmatch
local tonumber = tonumber
local select = select
local type = type
local prometheus
local router = require("apisix.router")
local get_routes = router.http_routes
local get_ssls = router.ssls
local get_services = require("apisix.http.service").services
local get_consumers = require("apisix.consumer").consumers
local get_upstreams = require("apisix.upstream").upstreams
local clear_tab = core.table.clear
local get_stream_routes = router.stream_routes
local get_protos = require("apisix.plugins.grpc-transcode.proto").protos



-- Default set of latency buckets, 1ms to 60s:
local DEFAULT_BUCKETS = { 1, 2, 5, 7, 10, 15, 20, 25, 30, 40, 50, 60, 70,
Expand All @@ -34,7 +46,6 @@ local metrics = {}


local inner_tab_arr = {}
local clear_tab = core.table.clear
local function gen_arr(...)
clear_tab(inner_tab_arr)

Expand All @@ -56,7 +67,7 @@ function _M.init()
return
end

core.table.clear(metrics)
clear_tab(metrics)

-- across all services
prometheus = base_prometheus.init("prometheus-metrics", "apisix_")
Expand All @@ -67,6 +78,10 @@ function _M.init()
metrics.etcd_reachable = prometheus:gauge("etcd_reachable",
"Config server etcd reachable from APISIX, 0 is unreachable")

metrics.etcd_modify_indexes = prometheus:gauge("etcd_modify_indexes",
"Etcd modify index for APISIX keys",
{"key"})

-- per service
metrics.status = prometheus:counter("http_status",
"HTTP status codes per service in APISIX",
Expand Down Expand Up @@ -152,20 +167,100 @@ local function nginx_status()

label_values[1] = name
metrics.connections:set(val[0], label_values)

end
end


local key_values = {}
local function set_modify_index(key, items, items_ver, global_max_index)
clear_tab(key_values)
local max_idx = 0
if items_ver and items then
for _, item in ipairs(items) do
if type(item) == "table" and item.modifiedIndex > max_idx then
max_idx = item.modifiedIndex
end
end
end

key_values[1] = key
metrics.etcd_modify_indexes:set(max_idx, key_values)


global_max_index = max_idx > global_max_index and max_idx or global_max_index

return global_max_index
end


local function etcd_modify_index()
clear_tab(key_values)
local global_max_idx = 0

-- routes
local routes, routes_ver = get_routes()
global_max_idx = set_modify_index("routes", routes, routes_ver, global_max_idx)

-- services
local services, services_ver = get_services()
global_max_idx = set_modify_index("services", services, services_ver, global_max_idx)

-- ssls
local ssls, ssls_ver = get_ssls()
global_max_idx = set_modify_index("ssls", ssls, ssls_ver, global_max_idx)

-- consumers
local consumers, consumers_ver = get_consumers()
global_max_idx = set_modify_index("consumers", consumers, consumers_ver, global_max_idx)

-- global_rules
local global_rules = router.global_rules
if global_rules then
global_max_idx = set_modify_index("global_rules", global_rules.values,
global_rules.conf_version, global_max_idx)

-- prev_index
key_values[1] = "prev_index"
metrics.etcd_modify_indexes:set(global_rules.prev_index, key_values)

else
global_max_idx = set_modify_index("global_rules", nil, nil, global_max_idx)
end

-- upstreams
local upstreams, upstreams_ver = get_upstreams()
global_max_idx = set_modify_index("upstreams", upstreams, upstreams_ver, global_max_idx)

-- stream_routes
local stream_routes, stream_routes_ver = get_stream_routes()
global_max_idx = set_modify_index("stream_routes", stream_routes,
stream_routes_ver, global_max_idx)

-- proto
local protos, protos_ver = get_protos()
global_max_idx = set_modify_index("protos", protos, protos_ver, global_max_idx)

-- global max
key_values[1] = "max_modify_index"
metrics.etcd_modify_indexes:set(global_max_idx, key_values)

end


function _M.collect()
if not prometheus or not metrics then
core.log.err("prometheus: plugin is not initialized, please make sure ",
core.log.error("prometheus: plugin is not initialized, please make sure ",
" 'prometheus_metrics' shared dict is present in nginx template")
return 500, {message = "An unexpected error occurred"}
end

-- across all services
nginx_status()

-- etcd modify index
etcd_modify_index()

-- config server status
local config = core.config.new()
local version, err = config:server_version()
Expand All @@ -178,6 +273,14 @@ function _M.collect()
"processing metrics endpoint: ", err)
end

local res, _ = config:getkey("/routes")
if res and res.headers then
clear_tab(key_values)
-- global max
key_values[1] = "x_etcd_index"
metrics.etcd_modify_indexes:set(res.headers["X-Etcd-Index"], key_values)
end

core.response.set_header("content_type", "text/plain")
return 200, core.table.concat(prometheus:metric_data())
end
Expand Down
12 changes: 12 additions & 0 deletions apisix/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,22 @@ function _M.stream_init_worker()
end


function _M.ssls()
return _M.router_ssl.ssls()
end

function _M.http_routes()
return _M.router_http.routes()
end

function _M.stream_routes()
-- maybe it's not inited.
if not _M.router_stream then
return nil, nil
end
return _M.router_stream.routes()
end


-- for test
_M.filter_test = filter
Expand Down
1 change: 1 addition & 0 deletions apisix/schema_def.lua
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ _M.service = {
_M.consumer = {
type = "object",
properties = {
id = id_schema,
username = {
type = "string", minLength = 1, maxLength = 32,
pattern = [[^[a-zA-Z0-9_]+$]]
Expand Down

0 comments on commit 1c8e0e9

Please sign in to comment.