Skip to content

Commit

Permalink
added test cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
moonming committed Jul 25, 2019
1 parent 36c9da7 commit cf8e40f
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 38 deletions.
32 changes: 31 additions & 1 deletion COPYRIGHT
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,36 @@ https://github.com/iresty/opentracing-openresty

Apache License 2

This module is licensed under the Apache 2.0 license.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

%%%%%%%%%

kong-plugin-zipkin

https://github.com/Kong/kong-plugin-zipkin

Copyright 2018-2019 Kong Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

%%%%%%%%%
55 changes: 38 additions & 17 deletions lua/apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,23 @@ local function report2endpoint(premature, reporter)
if not ok then
core.log.error("reporter flush ", err)
return
end
end

core.log.info("report2endpoint ok")
end


function _M.rewrite(conf, ctx)
local tracer = core.lrucache.plugin_ctx(plugin_name, ctx,
create_tracer, conf)

local wire_context = tracer:extract("http_headers", core.request.headers(ctx))
ctx.opentracing_sample = tracer.sampler:sample()
if not ctx.opentracing_sample then
return
end

local wire_context = tracer:extract("http_headers",
core.request.headers(ctx))

local start_timestamp = ngx.req.start_time()
local request_span = tracer:start_span("apisix.request", {
Expand All @@ -75,49 +83,60 @@ function _M.rewrite(conf, ctx)
["span.kind"] = "server",
["http.method"] = ctx.var.method,
["http.url"] = ctx.var.request_uri,
["peer.ipv4"] = core.request.get_remote_client_ip(ctx), -- TODO: support ipv6
-- TODO: support ipv6
["peer.ipv4"] = core.request.get_remote_client_ip(ctx),
["peer.port"] = core.request.get_remote_client_port(ctx),
}
})

local rewrite_span = ctx.opentracing.request_span:start_child_span(
"apisix.rewrite", start_timestamp)

ctx.opentracing = {
tracer = tracer,
wire_context = wire_context,
request_span = request_span,
rewrite_span = rewrite_span,
rewrite_span = nil,
access_span = nil,
proxy_span = nil,
}

local request_span = ctx.opentracing.request_span
ctx.opentracing.rewrite_span = request_span:start_child_span(
"apisix.rewrite", start_timestamp)
ctx.REWRITE_END_TIME = tracer:time()
ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME)
end

function _M.access(conf, ctx)
if not ctx.opentracing_sample then
return
end

local opentracing = ctx.opentracing

opentracing.access_span = opentracing.request_span:start_child_span(
"apisix.access", ctx.REWRITE_END_TIME)

-- send headers to upstream
local outgoing_headers = {}
opentracing.tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
for k, v in pairs(outgoing_headers) do
core.response.set_header(k, v)
end
local tracer = opentracing.tracer

ctx.ACCESS_END_TIME = opentracing.tracer:time()
ctx.ACCESS_END_TIME = tracer:time()
opentracing.access_span:finish(ctx.ACCESS_END_TIME)

opentracing.proxy_span = opentracing.request_span:start_child_span(
"apisix.proxy", ctx.ACCESS_END_TIME)

-- send headers to upstream
local outgoing_headers = {}
tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
for k, v in pairs(outgoing_headers) do
core.response.set_header(k, v)
end
end


function _M.http_header_filter_phase(conf, ctx)
function _M.header_filter(conf, ctx)
if not ctx.opentracing_sample then
return
end

local opentracing = ctx.opentracing

ctx.HEADER_FILTER_END_TIME = opentracing.tracer:time()
Expand All @@ -127,13 +146,15 @@ end


function _M.log(conf, ctx)
if not ctx.opentracing_sample then
return
end

local opentracing = ctx.opentracing

local log_end_time = opentracing.tracer:time()
opentracing.body_filter_span:finish(log_end_time)

core.log.error(" ctx: ", core.json.encode(ctx, true))

local upstream_status = core.response.get_upstream_status(ctx)
opentracing.request_span:set_tag("http.status_code", upstream_status)
opentracing.proxy_span:finish(log_end_time)
Expand Down
27 changes: 16 additions & 11 deletions lua/apisix/plugins/zipkin/codec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ end

local function new_extractor()
return function(headers)
-- X-B3-Sampled: if an upstream decided to sample this request, we do too.
-- X-B3-Sampled: if an upstream decided to sample this request, we do too.
local sample = headers["x-b3-sampled"]
if sample == "1" or sample == "true" then
sample = true
Expand All @@ -26,8 +26,8 @@ local function new_extractor()
sample = nil
end

-- X-B3-Flags: if it equals '1' then it overrides sampling policy
-- We still want to warn on invalid sample header, so do this after the above
-- X-B3-Flags: if it equals '1' then it overrides sampling policy
-- We still want to warn on invalid sample header, so do this after the above
local debug = headers["x-b3-flags"]
if debug == "1" then
sample = true
Expand All @@ -39,21 +39,24 @@ local function new_extractor()

local trace_id = headers["x-b3-traceid"]
-- Validate trace id
if trace_id and ((#trace_id ~= 16 and #trace_id ~= 32) or trace_id:match("%X")) then
if trace_id and
((#trace_id ~= 16 and #trace_id ~= 32) or trace_id:match("%X")) then
core.log.warn("x-b3-traceid header invalid; ignoring.")
had_invalid_id = true
end

local parent_span_id = headers["x-b3-parentspanid"]
-- Validate parent_span_id
if parent_span_id and (#parent_span_id ~= 16 or parent_span_id:match("%X")) then
if parent_span_id and
(#parent_span_id ~= 16 or parent_span_id:match("%X")) then
core.log.warn("x-b3-parentspanid header invalid; ignoring.")
had_invalid_id = true
end

local request_span_id = headers["x-b3-spanid"]
-- Validate request_span_id
if request_span_id and (#request_span_id ~= 16 or request_span_id:match("%X")) then
if request_span_id and
(#request_span_id ~= 16 or request_span_id:match("%X")) then
core.log.warn("x-b3-spanid header invalid; ignoring.")
had_invalid_id = true
end
Expand All @@ -75,19 +78,21 @@ local function new_extractor()
parent_span_id = from_hex(parent_span_id)
request_span_id = from_hex(request_span_id)

return new_span_context(trace_id, request_span_id, parent_span_id, sample, baggage)
return new_span_context(trace_id, request_span_id, parent_span_id,
sample, baggage)
end
end

local function new_injector()
return function(span_context, headers)
-- We want to remove headers if already present
headers["x-b3-traceid"] = to_hex(span_context.trace_id)
headers["x-b3-parentspanid"] = span_context.parent_id and to_hex(span_context.parent_id) or nil
headers["x-b3-parentspanid"] = span_context.parent_id
and to_hex(span_context.parent_id) or nil
headers["x-b3-spanid"] = to_hex(span_context.span_id)
local Flags = core.request.header(nil, "x-b3-flags") -- Get from request headers
headers["x-b3-flags"] = Flags
headers["x-b3-sampled"] = (not Flags) and (span_context.should_sample and "1" or "0") or nil
local flags = core.request.header(nil, "x-b3-flags")
headers["x-b3-flags"] = flags
headers["x-b3-sampled"] = (not flags)
for key, value in span_context:each_baggage_item() do
-- XXX: https://github.com/opentracing/specification/issues/117
headers["uberctx-"..key] = ngx.escape_uri(value)
Expand Down
5 changes: 3 additions & 2 deletions lua/apisix/plugins/zipkin/random_sampler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ local mt = { __index = _M }

function _M.new(conf)
local sample_ratio = conf.sample_ratio
assert(type(sample_ratio) == "number" and sample_ratio >= 0 and sample_ratio <= 1, "invalid sample_ratio")
assert(type(sample_ratio) == "number" and
sample_ratio >= 0 and sample_ratio <= 1, "invalid sample_ratio")
return setmetatable({
sample_ratio = sample_ratio
}, mt)
end

function _M.sample(self, name)
function _M.sample(self)
return math.random() < self.sample_ratio
end

Expand Down
17 changes: 10 additions & 7 deletions lua/apisix/plugins/zipkin/reporter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ local span_kind_map = {


function _M.new(conf)
local http_endpoint = conf.http_endpoint
assert(type(http_endpoint) == "string", "invalid http endpoint")
local endpoint = conf.endpoint
assert(type(endpoint) == "string", "invalid http endpoint")
return setmetatable({
http_endpoint = http_endpoint,
endpoint = endpoint,
pending_spans = {},
pending_spans_n = 0,
}, mt)
Expand Down Expand Up @@ -72,17 +72,17 @@ function _M.report(self, span)
local zipkin_span = {
traceId = to_hex(span_context.trace_id),
name = span.name,
parentId = span_context.parent_id and to_hex(span_context.parent_id) or nil,
parentId = span_context.parent_id and
to_hex(span_context.parent_id) or nil,
id = to_hex(span_context.span_id),
kind = span_kind_map[span_kind],
timestamp = span.timestamp * 1000000,
duration = math.floor(span.duration * 1000000), -- zipkin wants integer
-- shared = nil, -- We don't use shared spans (server reuses client generated spanId)
-- TODO: debug?
localEndpoint = localEndpoint,
remoteEndpoint = remoteEndpoint,
tags = zipkin_tags,
annotations = span.logs -- XXX: not guaranteed by documented opentracing-lua API to be in correct format
annotations = span.logs
}

local i = self.pending_spans_n + 1
Expand All @@ -92,6 +92,7 @@ end

function _M.flush(self)
if self.pending_spans_n == 0 then

return true
end

Expand All @@ -100,19 +101,21 @@ function _M.flush(self)
self.pending_spans_n = 0

local httpc = resty_http.new()
local res, err = httpc:request_uri(self.http_endpoint, {
local res, err = httpc:request_uri(self.endpoint, {
method = "POST",
headers = {
["content-type"] = "application/json",
},
body = pending_spans,
})

-- TODO: on failure, retry?
if not res then
return nil, "failed to request: " .. err
elseif res.status < 200 or res.status >= 300 then
return nil, "failed: " .. res.status .. " " .. res.reason
end

return true
end

Expand Down
26 changes: 26 additions & 0 deletions t/lib/server.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
local json_decode = require("cjson").decode

local _M = {}


Expand Down Expand Up @@ -31,6 +33,30 @@ function _M.sleep1()
end


function _M.opentracing()
ngx.say("opentracing")
end


function _M.mock_zipkin()
ngx.req.read_body()
local data = ngx.req.get_body_data()
local spans = json_decode(data)
if #spans < 5 then
ngx.exit(400)
end

for _, span in pairs(spans) do
if string.sub(span.name, 1, 6) ~= 'apisix' then
ngx.exit(400)
end
if not span.traceId then
ngx.exit(400)
end
end
end


function _M.go()
local action = string.sub(ngx.var.uri, 2)
if not _M[action] then
Expand Down
Loading

0 comments on commit cf8e40f

Please sign in to comment.