Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
moonming committed Jul 24, 2019
1 parent b0b571e commit f320526
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 52 deletions.
15 changes: 15 additions & 0 deletions lua/apisix/core/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ local _M = {version = 0.1}


local function _headers(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
local headers = ctx.headers
if not headers then
headers = get_headers()
Expand All @@ -20,6 +23,9 @@ _M.headers = _headers


function _M.header(ctx, name)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return _headers(ctx)[name]
end

Expand All @@ -28,18 +34,27 @@ end
-- so if there is a load balancer between downstream client and APISIX,
-- this function will return the ip of load balancer.
function _M.get_ip(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return ctx.var.realip_remote_addr or ctx.var.remote_addr or ''
end


-- get remote address of downstream client,
-- in cases there is a load balancer between downstream client and APISIX.
function _M.get_remote_client_ip(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return ctx.var.remote_addr or ''
end


function _M.get_remote_client_port(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return tonumber(ctx.var.remote_port)
end

Expand Down
1 change: 0 additions & 1 deletion lua/apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ local new_tracer = require("opentracing.tracer").new
local zipkin_codec = require("apisix.plugins.zipkin.codec")
local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new
local new_reporter = require("apisix.plugins.zipkin.reporter").new
local ipairs = ipairs

local plugin_name = "zipkin"

Expand Down
17 changes: 9 additions & 8 deletions lua/apisix/plugins/zipkin/codec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local core = require("apisix.core")
local to_hex = require "resty.string".to_hex
local new_span_context = require "opentracing.span_context".new
local new_span_context = require("opentracing.span_context").new

local function hex_to_char(c)
return string.char(tonumber(c, 16))
Expand All @@ -12,7 +13,7 @@ local function from_hex(str)
return str
end

local function new_extractor(warn)
local function new_extractor()
return function(headers)
-- X-B3-Sampled: if an upstream decided to sample this request, we do too.
local sample = headers["x-b3-sampled"]
Expand All @@ -21,7 +22,7 @@ local function new_extractor(warn)
elseif sample == "0" or sample == "false" then
sample = false
elseif sample ~= nil then
warn("x-b3-sampled header invalid; ignoring.")
core.log.warn("x-b3-sampled header invalid; ignoring.")
sample = nil
end

Expand All @@ -31,29 +32,29 @@ local function new_extractor(warn)
if debug == "1" then
sample = true
elseif debug ~= nil then
warn("x-b3-flags header invalid; ignoring.")
core.log.warn("x-b3-flags header invalid; ignoring.")
end

local had_invalid_id = false

local trace_id = headers["x-b3-traceid"]
-- Validate trace id
if trace_id and ((#trace_id ~= 16 and #trace_id ~= 32) or trace_id:match("%X")) then
warn("x-b3-traceid header invalid; ignoring.")
core.log.warn("x-b3-traceid header invalid; ignoring.")
had_invalid_id = true
end

local parent_span_id = headers["x-b3-parentspanid"]
-- Validate parent_span_id
if parent_span_id and (#parent_span_id ~= 16 or parent_span_id:match("%X")) then
warn("x-b3-parentspanid header invalid; ignoring.")
core.log.warn("x-b3-parentspanid header invalid; ignoring.")
had_invalid_id = true
end

local request_span_id = headers["x-b3-spanid"]
-- Validate request_span_id
if request_span_id and (#request_span_id ~= 16 or request_span_id:match("%X")) then
warn("x-b3-spanid header invalid; ignoring.")
core.log.warn("x-b3-spanid header invalid; ignoring.")
had_invalid_id = true
end

Expand Down Expand Up @@ -84,7 +85,7 @@ local function new_injector()
headers["x-b3-traceid"] = to_hex(span_context.trace_id)
headers["x-b3-parentspanid"] = span_context.parent_id and to_hex(span_context.parent_id) or nil
headers["x-b3-spanid"] = to_hex(span_context.span_id)
local Flags = kong.request.get_header("x-b3-flags") -- Get from request headers
local Flags = core.request.header(nil, "x-b3-flags") -- Get from request headers
headers["x-b3-flags"] = Flags
headers["x-b3-sampled"] = (not Flags) and (span_context.should_sample and "1" or "0") or nil
for key, value in span_context:each_baggage_item() do
Expand Down
2 changes: 1 addition & 1 deletion lua/apisix/plugins/zipkin/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ZipkinLogHandler.VERSION = "scm"
function ZipkinLogHandler.new_tracer(conf)
local tracer = new_tracer(new_zipkin_reporter(conf), new_random_sampler(conf))
tracer:register_injector("http_headers", zipkin_codec.new_injector())
tracer:register_extractor("http_headers", zipkin_codec.new_extractor(kong.log.warn))
tracer:register_extractor("http_headers", zipkin_codec.new_extractor())
return tracer
end

Expand Down
82 changes: 40 additions & 42 deletions lua/apisix/plugins/zipkin/reporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,35 @@ local to_hex = require "resty.string".to_hex
local cjson = require "cjson".new()
cjson.encode_number_precision(16)

local zipkin_reporter_methods = {}
local zipkin_reporter_mt = {
__name = "kong.plugins.zipkin.reporter";
__index = zipkin_reporter_methods;
local _M = {}
local mt = { __index = _M }


local span_kind_map = {
client = "CLIENT",
server = "SERVER",
producer = "PRODUCER",
consumer = "CONSUMER",
}

local function new_zipkin_reporter(conf)

function _M.new(conf)
local http_endpoint = conf.http_endpoint
assert(type(http_endpoint) == "string", "invalid http endpoint")
return setmetatable({
http_endpoint = http_endpoint;
pending_spans = {};
pending_spans_n = 0;
}, zipkin_reporter_mt)
http_endpoint = http_endpoint,
pending_spans = {},
pending_spans_n = 0,
}, mt)
end

local span_kind_map = {
client = "CLIENT";
server = "SERVER";
producer = "PRODUCER";
consumer = "CONSUMER";
}
function zipkin_reporter_methods:report(span)

function _M.report(self, span)
local span_context = span:context()

local zipkin_tags = {}
for k, v in span:each_tag() do
-- Zipkin tag values should be strings
-- see https://zipkin.io/zipkin-api/#/default/post_spans
-- and https://github.com/Kong/kong-plugin-zipkin/pull/13#issuecomment-402389342
zipkin_tags[k] = tostring(v)
end

Expand All @@ -44,11 +43,11 @@ function zipkin_reporter_methods:report(span)
if serviceName then
zipkin_tags["peer.service"] = nil
localEndpoint = {
serviceName = serviceName;
serviceName = serviceName,
-- TODO: ip/port from ngx.var.server_name/ngx.var.server_port?
}
else
-- needs to be null; not the empty object
-- needs to be null, not the empty object
localEndpoint = cjson.null
end
end
Expand All @@ -58,9 +57,9 @@ function zipkin_reporter_methods:report(span)
if peer_port then
zipkin_tags["peer.port"] = nil
remoteEndpoint = {
ipv4 = zipkin_tags["peer.ipv4"];
ipv6 = zipkin_tags["peer.ipv6"];
port = peer_port; -- port is *not* optional
ipv4 = zipkin_tags["peer.ipv4"],
-- ipv6 = zipkin_tags["peer.ipv6"],
port = peer_port, -- port is *not* optional
}
zipkin_tags["peer.ipv4"] = nil
zipkin_tags["peer.ipv6"] = nil
Expand All @@ -70,18 +69,18 @@ function zipkin_reporter_methods:report(span)
end

local zipkin_span = {
traceId = to_hex(span_context.trace_id);
name = span.name;
parentId = span_context.parent_id and to_hex(span_context.parent_id) or nil;
id = to_hex(span_context.span_id);
kind = span_kind_map[span_kind];
timestamp = span.timestamp * 1000000;
duration = math.floor(span.duration * 1000000); -- zipkin wants integer
-- shared = nil; -- We don't use shared spans (server reuses client generated spanId)
traceId = to_hex(span_context.trace_id),
name = span.name,
parentId = span_context.parent_id and to_hex(span_context.parent_id) or nil,
id = to_hex(span_context.span_id),
kind = span_kind_map[span_kind],
timestamp = span.timestamp * 1000000,
duration = math.floor(span.duration * 1000000), -- zipkin wants integer
-- shared = nil, -- We don't use shared spans (server reuses client generated spanId)
-- TODO: debug?
localEndpoint = localEndpoint;
remoteEndpoint = remoteEndpoint;
tags = zipkin_tags;
localEndpoint = localEndpoint,
remoteEndpoint = remoteEndpoint,
tags = zipkin_tags,
annotations = span.logs -- XXX: not guaranteed by documented opentracing-lua API to be in correct format
}

Expand All @@ -90,7 +89,7 @@ function zipkin_reporter_methods:report(span)
self.pending_spans_n = i
end

function zipkin_reporter_methods:flush()
function _M.flush(self)
if self.pending_spans_n == 0 then
return true
end
Expand All @@ -101,11 +100,11 @@ function zipkin_reporter_methods:flush()

local httpc = resty_http.new()
local res, err = httpc:request_uri(self.http_endpoint, {
method = "POST";
method = "POST",
headers = {
["content-type"] = "application/json";
};
body = pending_spans;
["content-type"] = "application/json",
},
body = pending_spans,
})
-- TODO: on failure, retry?
if not res then
Expand All @@ -116,6 +115,5 @@ function zipkin_reporter_methods:flush()
return true
end

return {
new = new_zipkin_reporter;
}

return _M

0 comments on commit f320526

Please sign in to comment.