From 96475b30e7a219041b62f2d8cc1c523e282258bf Mon Sep 17 00:00:00 2001 From: WenMing Date: Wed, 24 Jul 2019 14:52:14 +0800 Subject: [PATCH] feature: supported zipkin plugin. --- conf/config.yaml | 3 +- lua/apisix/core/request.lua | 20 +++ lua/apisix/core/response.lua | 6 +- lua/apisix/plugins/zipkin.lua | 149 +++++++++++++++++++ lua/apisix/plugins/zipkin/codec.lua | 101 +++++++++++++ lua/apisix/plugins/zipkin/random_sampler.lua | 17 +++ lua/apisix/plugins/zipkin/reporter.lua | 120 +++++++++++++++ rockspec/apisix-dev-0.rockspec | 1 + 8 files changed, 415 insertions(+), 2 deletions(-) create mode 100644 lua/apisix/plugins/zipkin.lua create mode 100644 lua/apisix/plugins/zipkin/codec.lua create mode 100644 lua/apisix/plugins/zipkin/random_sampler.lua create mode 100644 lua/apisix/plugins/zipkin/reporter.lua diff --git a/conf/config.yaml b/conf/config.yaml index 67c961b6fd70b..bcb81ee11a9f8 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -19,7 +19,8 @@ plugins: # plugin list - example-plugin - limit-req - limit-count + - limit-conn - key-auth - prometheus - - limit-conn - node-status + - zipkin diff --git a/lua/apisix/core/request.lua b/lua/apisix/core/request.lua index cfaac65e57846..0071bda607cf7 100644 --- a/lua/apisix/core/request.lua +++ b/lua/apisix/core/request.lua @@ -8,6 +8,9 @@ local _M = {version = 0.1} local function _headers(ctx) + if not ctx then + ctx = ngx.ctx.api_ctx + end local headers = ctx.headers if not headers then headers = get_headers() @@ -20,6 +23,9 @@ _M.headers = _headers function _M.header(ctx, name) + if not ctx then + ctx = ngx.ctx.api_ctx + end return _headers(ctx)[name] end @@ -28,6 +34,9 @@ end -- so if there is a load balancer between downstream client and APISIX, -- this function will return the ip of load balancer. function _M.get_ip(ctx) + if not ctx then + ctx = ngx.ctx.api_ctx + end return ctx.var.realip_remote_addr or ctx.var.remote_addr or '' end @@ -35,8 +44,19 @@ end -- get remote address of downstream client, -- in cases there is a load balancer between downstream client and APISIX. function _M.get_remote_client_ip(ctx) + if not ctx then + ctx = ngx.ctx.api_ctx + end return ctx.var.remote_addr or '' end +function _M.get_remote_client_port(ctx) + if not ctx then + ctx = ngx.ctx.api_ctx + end + return tonumber(ctx.var.remote_port) + end + + return _M diff --git a/lua/apisix/core/response.lua b/lua/apisix/core/response.lua index 24db61781ca86..2e23096c9649c 100644 --- a/lua/apisix/core/response.lua +++ b/lua/apisix/core/response.lua @@ -10,7 +10,7 @@ local type = type local ngx_exit = ngx.exit local insert_tab = table.insert local concat_tab = table.concat - +local str_sub = string.sub local _M = {version = 0.1} @@ -78,4 +78,8 @@ function _M.set_header(...) end +function _M.get_upstream_status(ctx) + return tonumber(str_sub(ctx.var.upstream_status or "", -3)) +end + return _M diff --git a/lua/apisix/plugins/zipkin.lua b/lua/apisix/plugins/zipkin.lua new file mode 100644 index 0000000000000..4d20cffc79563 --- /dev/null +++ b/lua/apisix/plugins/zipkin.lua @@ -0,0 +1,149 @@ +local core = require("apisix.core") +local new_tracer = require("opentracing.tracer").new +local zipkin_codec = require("apisix.plugins.zipkin.codec") +local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new +local new_reporter = require("apisix.plugins.zipkin.reporter").new + +local plugin_name = "zipkin" + + +-- You can follow this document to write schema: +-- https://github.com/Tencent/rapidjson/blob/master/bin/draft-04/schema +-- rapidjson not supported `format` in draft-04 yet +local schema = { + type = "object", + properties = { + endpoint = {type = "string"}, + sample_ratio = {type = "number", + default = 0.001, minimum = 0.00001, maximum = 1} + }, + required = {"endpoint"} +} + + +local _M = { + version = 0.1, + priority = -1000, -- last running plugin + name = plugin_name, + schema = schema, +} + + +function _M.check_schema(conf) + local ok, err = core.schema.check(schema, conf) + + if not ok then + return false, err + end + + return true +end + + +local function create_tracer(conf) + local tracer = new_tracer(new_reporter(conf), new_random_sampler(conf)) + tracer:register_injector("http_headers", zipkin_codec.new_injector()) + tracer:register_extractor("http_headers", zipkin_codec.new_extractor()) + return tracer +end + +local function report2endpoint(premature, reporter) + if premature then + return + end + + local ok, err = reporter:flush() + if not ok then + core.log.error("reporter flush ", err) + return + end +end + + +function _M.rewrite(conf, ctx) + local tracer = core.lrucache.plugin_ctx(plugin_name, ctx, + create_tracer, conf) + + local wire_context = tracer:extract("http_headers", core.request.headers(ctx)) + + local start_timestamp = ngx.req.start_time() + local request_span = tracer:start_span("apisix.request", { + child_of = wire_context, + start_timestamp = start_timestamp, + tags = { + component = "apisix", + ["span.kind"] = "server", + ["http.method"] = ctx.var.method, + ["http.url"] = ctx.var.request_uri, + ["peer.ipv4"] = core.request.get_remote_client_ip(ctx), -- TODO: support ipv6 + ["peer.port"] = core.request.get_remote_client_port(ctx), + } + }) + + local rewrite_span = ctx.opentracing.request_span:start_child_span( + "apisix.rewrite", start_timestamp) + + ctx.opentracing = { + tracer = tracer, + wire_context = wire_context, + request_span = request_span, + rewrite_span = rewrite_span, + access_span = nil, + proxy_span = nil, + } + + ctx.REWRITE_END_TIME = tracer:time() + ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME) +end + +function _M.access(conf, ctx) + local opentracing = ctx.opentracing + + opentracing.access_span = opentracing.request_span:start_child_span( + "apisix.access", ctx.REWRITE_END_TIME) + + -- send headers to upstream + local outgoing_headers = {} + opentracing.tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers) + for k, v in pairs(outgoing_headers) do + core.response.set_header(k, v) + end + + ctx.ACCESS_END_TIME = opentracing.tracer:time() + opentracing.access_span:finish(ctx.ACCESS_END_TIME) + + opentracing.proxy_span = opentracing.request_span:start_child_span( + "apisix.proxy", ctx.ACCESS_END_TIME) +end + + +function _M.http_header_filter_phase(conf, ctx) + local opentracing = ctx.opentracing + + ctx.HEADER_FILTER_END_TIME = opentracing.tracer:time() + opentracing.body_filter_span = opentracing.proxy_span:start_child_span( + "apisix.body_filter", ctx.HEADER_FILTER_END_TIME) +end + + +function _M.log(conf, ctx) + local opentracing = ctx.opentracing + + local log_end_time = opentracing.tracer:time() + opentracing.body_filter_span:finish(log_end_time) + + core.log.error(" ctx: ", core.json.encode(ctx, true)) + + local upstream_status = core.response.get_upstream_status(ctx) + opentracing.request_span:set_tag("http.status_code", upstream_status) + opentracing.proxy_span:finish(log_end_time) + opentracing.request_span:finish(log_end_time) + + local reporter = opentracing.tracer.reporter + local ok, err = ngx.timer.at(0, report2endpoint, reporter) + if not ok then + core.log.error("failed to create timer: ", err) + end +end + +return _M diff --git a/lua/apisix/plugins/zipkin/codec.lua b/lua/apisix/plugins/zipkin/codec.lua new file mode 100644 index 0000000000000..f80c36061902a --- /dev/null +++ b/lua/apisix/plugins/zipkin/codec.lua @@ -0,0 +1,101 @@ +local core = require("apisix.core") +local to_hex = require "resty.string".to_hex +local new_span_context = require("opentracing.span_context").new + +local function hex_to_char(c) + return string.char(tonumber(c, 16)) +end + +local function from_hex(str) + if str ~= nil then -- allow nil to pass through + str = str:gsub("%x%x", hex_to_char) + end + return str +end + +local function new_extractor() + return function(headers) + -- X-B3-Sampled: if an upstream decided to sample this request, we do too. + local sample = headers["x-b3-sampled"] + if sample == "1" or sample == "true" then + sample = true + elseif sample == "0" or sample == "false" then + sample = false + elseif sample ~= nil then + core.log.warn("x-b3-sampled header invalid; ignoring.") + sample = nil + end + + -- X-B3-Flags: if it equals '1' then it overrides sampling policy + -- We still want to warn on invalid sample header, so do this after the above + local debug = headers["x-b3-flags"] + if debug == "1" then + sample = true + elseif debug ~= nil then + core.log.warn("x-b3-flags header invalid; ignoring.") + end + + local had_invalid_id = false + + local trace_id = headers["x-b3-traceid"] + -- Validate trace id + if trace_id and ((#trace_id ~= 16 and #trace_id ~= 32) or trace_id:match("%X")) then + core.log.warn("x-b3-traceid header invalid; ignoring.") + had_invalid_id = true + end + + local parent_span_id = headers["x-b3-parentspanid"] + -- Validate parent_span_id + if parent_span_id and (#parent_span_id ~= 16 or parent_span_id:match("%X")) then + core.log.warn("x-b3-parentspanid header invalid; ignoring.") + had_invalid_id = true + end + + local request_span_id = headers["x-b3-spanid"] + -- Validate request_span_id + if request_span_id and (#request_span_id ~= 16 or request_span_id:match("%X")) then + core.log.warn("x-b3-spanid header invalid; ignoring.") + had_invalid_id = true + end + + if trace_id == nil or had_invalid_id then + return nil + end + + -- Process jaegar baggage header + local baggage = {} + for k, v in pairs(headers) do + local baggage_key = k:match("^uberctx%-(.*)$") + if baggage_key then + baggage[baggage_key] = ngx.unescape_uri(v) + end + end + + trace_id = from_hex(trace_id) + parent_span_id = from_hex(parent_span_id) + request_span_id = from_hex(request_span_id) + + return new_span_context(trace_id, request_span_id, parent_span_id, sample, baggage) + end +end + +local function new_injector() + return function(span_context, headers) + -- We want to remove headers if already present + headers["x-b3-traceid"] = to_hex(span_context.trace_id) + headers["x-b3-parentspanid"] = span_context.parent_id and to_hex(span_context.parent_id) or nil + headers["x-b3-spanid"] = to_hex(span_context.span_id) + local Flags = core.request.header(nil, "x-b3-flags") -- Get from request headers + headers["x-b3-flags"] = Flags + headers["x-b3-sampled"] = (not Flags) and (span_context.should_sample and "1" or "0") or nil + for key, value in span_context:each_baggage_item() do + -- XXX: https://github.com/opentracing/specification/issues/117 + headers["uberctx-"..key] = ngx.escape_uri(value) + end + end +end + +return { + new_extractor = new_extractor, + new_injector = new_injector, +} diff --git a/lua/apisix/plugins/zipkin/random_sampler.lua b/lua/apisix/plugins/zipkin/random_sampler.lua new file mode 100644 index 0000000000000..51c387914a131 --- /dev/null +++ b/lua/apisix/plugins/zipkin/random_sampler.lua @@ -0,0 +1,17 @@ +local _M = {} +local mt = { __index = _M } + +function _M.new(conf) + local sample_ratio = conf.sample_ratio + assert(type(sample_ratio) == "number" and sample_ratio >= 0 and sample_ratio <= 1, "invalid sample_ratio") + return setmetatable({ + sample_ratio = sample_ratio + }, mt) +end + +function _M.sample(self, name) + return math.random() < self.sample_ratio +end + + +return _M diff --git a/lua/apisix/plugins/zipkin/reporter.lua b/lua/apisix/plugins/zipkin/reporter.lua new file mode 100644 index 0000000000000..a3fc65b5a1dd3 --- /dev/null +++ b/lua/apisix/plugins/zipkin/reporter.lua @@ -0,0 +1,120 @@ +local resty_http = require "resty.http" +local to_hex = require "resty.string".to_hex +local cjson = require "cjson".new() +cjson.encode_number_precision(16) + + +local _M = {} +local mt = { __index = _M } + + +local span_kind_map = { + client = "CLIENT", + server = "SERVER", + producer = "PRODUCER", + consumer = "CONSUMER", +} + + +function _M.new(conf) + local http_endpoint = conf.http_endpoint + assert(type(http_endpoint) == "string", "invalid http endpoint") + return setmetatable({ + http_endpoint = http_endpoint, + pending_spans = {}, + pending_spans_n = 0, + }, mt) +end + + +function _M.report(self, span) + local span_context = span:context() + + local zipkin_tags = {} + for k, v in span:each_tag() do + -- Zipkin tag values should be strings + zipkin_tags[k] = tostring(v) + end + + local span_kind = zipkin_tags["span.kind"] + zipkin_tags["span.kind"] = nil + + local localEndpoint do + local serviceName = zipkin_tags["peer.service"] + if serviceName then + zipkin_tags["peer.service"] = nil + localEndpoint = { + serviceName = serviceName, + -- TODO: ip/port from ngx.var.server_name/ngx.var.server_port? + } + else + -- needs to be null, not the empty object + localEndpoint = cjson.null + end + end + + local remoteEndpoint do + local peer_port = span:get_tag "peer.port" -- get as number + if peer_port then + zipkin_tags["peer.port"] = nil + remoteEndpoint = { + ipv4 = zipkin_tags["peer.ipv4"], + -- ipv6 = zipkin_tags["peer.ipv6"], + port = peer_port, -- port is *not* optional + } + zipkin_tags["peer.ipv4"] = nil + zipkin_tags["peer.ipv6"] = nil + else + remoteEndpoint = cjson.null + end + end + + local zipkin_span = { + traceId = to_hex(span_context.trace_id), + name = span.name, + parentId = span_context.parent_id and to_hex(span_context.parent_id) or nil, + id = to_hex(span_context.span_id), + kind = span_kind_map[span_kind], + timestamp = span.timestamp * 1000000, + duration = math.floor(span.duration * 1000000), -- zipkin wants integer + -- shared = nil, -- We don't use shared spans (server reuses client generated spanId) + -- TODO: debug? + localEndpoint = localEndpoint, + remoteEndpoint = remoteEndpoint, + tags = zipkin_tags, + annotations = span.logs -- XXX: not guaranteed by documented opentracing-lua API to be in correct format + } + + local i = self.pending_spans_n + 1 + self.pending_spans[i] = zipkin_span + self.pending_spans_n = i +end + +function _M.flush(self) + if self.pending_spans_n == 0 then + return true + end + + local pending_spans = cjson.encode(self.pending_spans) + self.pending_spans = {} + self.pending_spans_n = 0 + + local httpc = resty_http.new() + local res, err = httpc:request_uri(self.http_endpoint, { + method = "POST", + headers = { + ["content-type"] = "application/json", + }, + body = pending_spans, + }) + -- TODO: on failure, retry? + if not res then + return nil, "failed to request: " .. err + elseif res.status < 200 or res.status >= 300 then + return nil, "failed: " .. res.status .. " " .. res.reason + end + return true +end + + +return _M diff --git a/rockspec/apisix-dev-0.rockspec b/rockspec/apisix-dev-0.rockspec index 38c70fbd8d29a..76f940b5b10ad 100644 --- a/rockspec/apisix-dev-0.rockspec +++ b/rockspec/apisix-dev-0.rockspec @@ -23,6 +23,7 @@ dependencies = { "lua-resty-jit-uuid = 0.0.7", "rapidjson = 0.6.0-1", "lua-resty-healthcheck-iresty = 1.0.0", + "opentracing-openresty = 0.1", } build = {