Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

Commit

Permalink
fix(handler) send TCP traffic correctly (#70)
Browse files Browse the repository at this point in the history
This commit fixes a problem when tracing TCP traffic, and adds an
integration test which exercises that branch.
  • Loading branch information
kikito committed Feb 28, 2020
1 parent b50600e commit af61642
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 6 deletions.
2 changes: 1 addition & 1 deletion kong/plugins/zipkin/handler.lua
Expand Up @@ -185,8 +185,8 @@ elseif subsystem == "stream" then

initialize_request = function(conf, ctx)
local request_span = new_span(
"kong.stream",
"SERVER",
"stream",
ngx.req.start_time(),
math_random() < conf.sample_ratio
)
Expand Down
141 changes: 136 additions & 5 deletions spec/zipkin_spec.lua
Expand Up @@ -35,13 +35,13 @@ end


for _, strategy in helpers.each_strategy() do
describe("integration tests with zipkin server [#" .. strategy .. "]", function()
describe("http integration tests with zipkin server [#" .. strategy .. "]", function()
local proxy_client_grpc
local route, grpc_route
local tcp_service
local route, grpc_route, tcp_route
local zipkin_client
local proxy_client


local function wait_for_spans(trace_id, number_of_spans)
local spans = {}
helpers.wait_until(function()
Expand All @@ -52,6 +52,23 @@ describe("integration tests with zipkin server [#" .. strategy .. "]", function(
return utils.unpack(spans)
end

local function wait_for_stream_spans(remoteServiceName, number_of_spans)
local spans = {}
helpers.wait_until(function()
local res = zipkin_client:get("/api/v2/traces", {
query = {
limit = 10,
remoteServiceName = remoteServiceName,
}
})
local all_spans = cjson.decode(assert.response(res).has.status(200))
if #all_spans > 0 then
spans = all_spans[1]
return #spans == number_of_spans
end
end)
return utils.unpack(spans)
end

-- the following assertions should be true on any span list, even in error mode
local function assert_span_invariants(request_span, proxy_span, expected_name, trace_id)
Expand Down Expand Up @@ -113,12 +130,14 @@ describe("integration tests with zipkin server [#" .. strategy .. "]", function(
end


setup(function()
lazy_setup(function()
local bp = helpers.get_db_utils(strategy, { "services", "routes", "plugins" })

-- enable zipkin plugin globally pointing to mock server
bp.plugins:insert({
name = "zipkin",
-- enable on TCP as well (by default it is only enabled on http, https, grpc, grpcs)
protocols = { "http", "https", "tcp", "tls", "grpc", "grpcs" },
config = {
sample_ratio = 1,
http_endpoint = "http://127.0.0.1:9411/api/v2/spans",
Expand Down Expand Up @@ -148,9 +167,24 @@ describe("integration tests with zipkin server [#" .. strategy .. "]", function(
hosts = { "mock-grpc-route" },
}

-- tcp upstream
tcp_service = bp.services:insert({
name = string.lower("tcp-" .. utils.random_string()),
protocol = "tcp",
host = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_port,
})

tcp_route = bp.routes:insert {
destinations = { { port = 19000 } },
protocols = { "tcp" },
service = tcp_service,
}

helpers.start_kong({
database = strategy,
nginx_conf = "spec/fixtures/custom_nginx.template",
stream_listen = helpers.get_proxy_ip(false) .. ":19000",
})

proxy_client = helpers.proxy_client()
Expand Down Expand Up @@ -303,6 +337,103 @@ describe("integration tests with zipkin server [#" .. strategy .. "]", function(
}, balancer_span.tags)
end)

it("generates spans, tags and annotations for regular #stream requests", function()
local tcp = ngx.socket.tcp()
assert(tcp:connect(helpers.get_proxy_ip(false), 19000))

assert(tcp:send("hello\n"))

local body = assert(tcp:receive("*a"))
assert.equal("hello\n", body)

assert(tcp:close())

local balancer_span, proxy_span, request_span = wait_for_stream_spans(tcp_service.name, 3)

-- request span
assert.same("table", type(request_span))
assert.same("string", type(request_span.id))
assert.same("stream", request_span.name)
assert.same(request_span.id, proxy_span.parentId)

assert.same("SERVER", request_span.kind)

assert.same("string", type(request_span.traceId))
assert_is_integer(request_span.timestamp)

if request_span.duration and proxy_span.duration then
assert.truthy(request_span.duration >= proxy_span.duration)
end

assert.is_nil(request_span.annotations)
assert.same({ serviceName = "kong" }, request_span.localEndpoint)

local request_tags = request_span.tags
assert.truthy(request_tags["kong.node.id"]:match("^[%x-]+$"))
request_tags["kong.node.id"] = nil
assert.same({ lc = "kong" }, request_tags)
local consumer_port = request_span.remoteEndpoint.port
assert_is_integer(consumer_port)
assert.same({
ipv4 = "127.0.0.1",
port = consumer_port,
}, request_span.remoteEndpoint)

-- proxy span
assert.same("table", type(proxy_span))
assert.same("string", type(proxy_span.id))
assert.same(request_span.name .. " (proxy)", proxy_span.name)
assert.same(request_span.id, proxy_span.parentId)

assert.same("CLIENT", proxy_span.kind)

assert.same("string", type(proxy_span.traceId))
assert_is_integer(proxy_span.timestamp)

if proxy_span.duration then
assert.truthy(proxy_span.duration >= 0)
end

assert.equals(2, #proxy_span.annotations)
local pann = annotations_to_hash(proxy_span.annotations)

assert_is_integer(pann["kps"])
assert_is_integer(pann["kpf"])

assert.truthy(pann["kps"] <= pann["kpf"])
assert.same({
["kong.route"] = tcp_route.id,
["kong.service"] = tcp_service.id,
["peer.hostname"] = "127.0.0.1",
}, proxy_span.tags)

assert.same({
ipv4 = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_port,
serviceName = tcp_service.name,
}, proxy_span.remoteEndpoint)

-- specific assertions for balancer_span
assert.equals(balancer_span.parentId, request_span.id)
assert.equals(request_span.name .. " (balancer try 1)", balancer_span.name)
assert.equals("number", type(balancer_span.timestamp))
if balancer_span.duration then
assert.equals("number", type(balancer_span.duration))
end

assert.same({
ipv4 = helpers.mock_upstream_host,
port = helpers.mock_upstream_stream_port,
serviceName = tcp_service.name,
}, balancer_span.remoteEndpoint)
assert.same({ serviceName = "kong" }, balancer_span.localEndpoint)
assert.same({
["kong.balancer.try"] = "1",
["kong.route"] = tcp_route.id,
["kong.service"] = tcp_service.id,
}, balancer_span.tags)
end)

it("generates spans, tags and annotations for non-matched requests", function()
local trace_id = gen_trace_id()

Expand Down Expand Up @@ -360,6 +491,7 @@ describe("integration tests with zipkin server [#" .. strategy .. "]", function(
assert.equals(trace_id, request_span.traceId)
end)


describe("b3 single header propagation", function()
it("works on regular calls", function()
local trace_id = gen_trace_id()
Expand Down Expand Up @@ -463,6 +595,5 @@ describe("integration tests with zipkin server [#" .. strategy .. "]", function(
assert.equals(span_id, proxy_span.parentId)
end)
end)

end)
end

1 comment on commit af61642

@kikito
Copy link
Member Author

@kikito kikito commented on af61642 Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this change was introduced in the "remove opentracing" refactor, there's no need to include it on the changelog.

Please sign in to comment.