Skip to content

Commit

Permalink
feature: supported zipkin plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
moonming committed Jul 25, 2019
1 parent 3eaa17c commit 96475b3
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 2 deletions.
3 changes: 2 additions & 1 deletion conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ plugins: # plugin list
- example-plugin
- limit-req
- limit-count
- limit-conn
- key-auth
- prometheus
- limit-conn
- node-status
- zipkin
20 changes: 20 additions & 0 deletions lua/apisix/core/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ local _M = {version = 0.1}


local function _headers(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
local headers = ctx.headers
if not headers then
headers = get_headers()
Expand All @@ -20,6 +23,9 @@ _M.headers = _headers


function _M.header(ctx, name)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return _headers(ctx)[name]
end

Expand All @@ -28,15 +34,29 @@ end
-- so if there is a load balancer between downstream client and APISIX,
-- this function will return the ip of load balancer.
function _M.get_ip(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return ctx.var.realip_remote_addr or ctx.var.remote_addr or ''
end


-- get remote address of downstream client,
-- in cases there is a load balancer between downstream client and APISIX.
function _M.get_remote_client_ip(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return ctx.var.remote_addr or ''
end


function _M.get_remote_client_port(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return tonumber(ctx.var.remote_port)
end


return _M
6 changes: 5 additions & 1 deletion lua/apisix/core/response.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ local type = type
local ngx_exit = ngx.exit
local insert_tab = table.insert
local concat_tab = table.concat

local str_sub = string.sub

local _M = {version = 0.1}

Expand Down Expand Up @@ -78,4 +78,8 @@ function _M.set_header(...)
end


function _M.get_upstream_status(ctx)
return tonumber(str_sub(ctx.var.upstream_status or "", -3))
end

return _M
149 changes: 149 additions & 0 deletions lua/apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
local core = require("apisix.core")
local new_tracer = require("opentracing.tracer").new
local zipkin_codec = require("apisix.plugins.zipkin.codec")
local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new
local new_reporter = require("apisix.plugins.zipkin.reporter").new

local plugin_name = "zipkin"


-- You can follow this document to write schema:
-- https://github.com/Tencent/rapidjson/blob/master/bin/draft-04/schema
-- rapidjson not supported `format` in draft-04 yet
local schema = {
type = "object",
properties = {
endpoint = {type = "string"},
sample_ratio = {type = "number",
default = 0.001, minimum = 0.00001, maximum = 1}
},
required = {"endpoint"}
}


local _M = {
version = 0.1,
priority = -1000, -- last running plugin
name = plugin_name,
schema = schema,
}


function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)

if not ok then
return false, err
end

return true
end


local function create_tracer(conf)
local tracer = new_tracer(new_reporter(conf), new_random_sampler(conf))
tracer:register_injector("http_headers", zipkin_codec.new_injector())
tracer:register_extractor("http_headers", zipkin_codec.new_extractor())
return tracer
end

local function report2endpoint(premature, reporter)
if premature then
return
end

local ok, err = reporter:flush()
if not ok then
core.log.error("reporter flush ", err)
return
end
end


function _M.rewrite(conf, ctx)
local tracer = core.lrucache.plugin_ctx(plugin_name, ctx,
create_tracer, conf)

local wire_context = tracer:extract("http_headers", core.request.headers(ctx))

local start_timestamp = ngx.req.start_time()
local request_span = tracer:start_span("apisix.request", {
child_of = wire_context,
start_timestamp = start_timestamp,
tags = {
component = "apisix",
["span.kind"] = "server",
["http.method"] = ctx.var.method,
["http.url"] = ctx.var.request_uri,
["peer.ipv4"] = core.request.get_remote_client_ip(ctx), -- TODO: support ipv6
["peer.port"] = core.request.get_remote_client_port(ctx),
}
})

local rewrite_span = ctx.opentracing.request_span:start_child_span(
"apisix.rewrite", start_timestamp)

ctx.opentracing = {
tracer = tracer,
wire_context = wire_context,
request_span = request_span,
rewrite_span = rewrite_span,
access_span = nil,
proxy_span = nil,
}

ctx.REWRITE_END_TIME = tracer:time()
ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME)
end

function _M.access(conf, ctx)
local opentracing = ctx.opentracing

opentracing.access_span = opentracing.request_span:start_child_span(
"apisix.access", ctx.REWRITE_END_TIME)

-- send headers to upstream
local outgoing_headers = {}
opentracing.tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
for k, v in pairs(outgoing_headers) do
core.response.set_header(k, v)
end

ctx.ACCESS_END_TIME = opentracing.tracer:time()
opentracing.access_span:finish(ctx.ACCESS_END_TIME)

opentracing.proxy_span = opentracing.request_span:start_child_span(
"apisix.proxy", ctx.ACCESS_END_TIME)
end


function _M.http_header_filter_phase(conf, ctx)
local opentracing = ctx.opentracing

ctx.HEADER_FILTER_END_TIME = opentracing.tracer:time()
opentracing.body_filter_span = opentracing.proxy_span:start_child_span(
"apisix.body_filter", ctx.HEADER_FILTER_END_TIME)
end


function _M.log(conf, ctx)
local opentracing = ctx.opentracing

local log_end_time = opentracing.tracer:time()
opentracing.body_filter_span:finish(log_end_time)

core.log.error(" ctx: ", core.json.encode(ctx, true))

local upstream_status = core.response.get_upstream_status(ctx)
opentracing.request_span:set_tag("http.status_code", upstream_status)
opentracing.proxy_span:finish(log_end_time)
opentracing.request_span:finish(log_end_time)

local reporter = opentracing.tracer.reporter
local ok, err = ngx.timer.at(0, report2endpoint, reporter)
if not ok then
core.log.error("failed to create timer: ", err)
end
end

return _M
101 changes: 101 additions & 0 deletions lua/apisix/plugins/zipkin/codec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
local core = require("apisix.core")
local to_hex = require "resty.string".to_hex
local new_span_context = require("opentracing.span_context").new

local function hex_to_char(c)
return string.char(tonumber(c, 16))
end

local function from_hex(str)
if str ~= nil then -- allow nil to pass through
str = str:gsub("%x%x", hex_to_char)
end
return str
end

local function new_extractor()
return function(headers)
-- X-B3-Sampled: if an upstream decided to sample this request, we do too.
local sample = headers["x-b3-sampled"]
if sample == "1" or sample == "true" then
sample = true
elseif sample == "0" or sample == "false" then
sample = false
elseif sample ~= nil then
core.log.warn("x-b3-sampled header invalid; ignoring.")
sample = nil
end

-- X-B3-Flags: if it equals '1' then it overrides sampling policy
-- We still want to warn on invalid sample header, so do this after the above
local debug = headers["x-b3-flags"]
if debug == "1" then
sample = true
elseif debug ~= nil then
core.log.warn("x-b3-flags header invalid; ignoring.")
end

local had_invalid_id = false

local trace_id = headers["x-b3-traceid"]
-- Validate trace id
if trace_id and ((#trace_id ~= 16 and #trace_id ~= 32) or trace_id:match("%X")) then
core.log.warn("x-b3-traceid header invalid; ignoring.")
had_invalid_id = true
end

local parent_span_id = headers["x-b3-parentspanid"]
-- Validate parent_span_id
if parent_span_id and (#parent_span_id ~= 16 or parent_span_id:match("%X")) then
core.log.warn("x-b3-parentspanid header invalid; ignoring.")
had_invalid_id = true
end

local request_span_id = headers["x-b3-spanid"]
-- Validate request_span_id
if request_span_id and (#request_span_id ~= 16 or request_span_id:match("%X")) then
core.log.warn("x-b3-spanid header invalid; ignoring.")
had_invalid_id = true
end

if trace_id == nil or had_invalid_id then
return nil
end

-- Process jaegar baggage header
local baggage = {}
for k, v in pairs(headers) do
local baggage_key = k:match("^uberctx%-(.*)$")
if baggage_key then
baggage[baggage_key] = ngx.unescape_uri(v)
end
end

trace_id = from_hex(trace_id)
parent_span_id = from_hex(parent_span_id)
request_span_id = from_hex(request_span_id)

return new_span_context(trace_id, request_span_id, parent_span_id, sample, baggage)
end
end

local function new_injector()
return function(span_context, headers)
-- We want to remove headers if already present
headers["x-b3-traceid"] = to_hex(span_context.trace_id)
headers["x-b3-parentspanid"] = span_context.parent_id and to_hex(span_context.parent_id) or nil
headers["x-b3-spanid"] = to_hex(span_context.span_id)
local Flags = core.request.header(nil, "x-b3-flags") -- Get from request headers
headers["x-b3-flags"] = Flags
headers["x-b3-sampled"] = (not Flags) and (span_context.should_sample and "1" or "0") or nil
for key, value in span_context:each_baggage_item() do
-- XXX: https://github.com/opentracing/specification/issues/117
headers["uberctx-"..key] = ngx.escape_uri(value)
end
end
end

return {
new_extractor = new_extractor,
new_injector = new_injector,
}
17 changes: 17 additions & 0 deletions lua/apisix/plugins/zipkin/random_sampler.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
local _M = {}
local mt = { __index = _M }

function _M.new(conf)
local sample_ratio = conf.sample_ratio
assert(type(sample_ratio) == "number" and sample_ratio >= 0 and sample_ratio <= 1, "invalid sample_ratio")
return setmetatable({
sample_ratio = sample_ratio
}, mt)
end

function _M.sample(self, name)
return math.random() < self.sample_ratio
end


return _M
Loading

0 comments on commit 96475b3

Please sign in to comment.