Skip to content
Permalink
Browse files

Add API Server Tracing with Zipkin (#511)

  • Loading branch information
hrmthw committed Feb 13, 2020
1 parent d95a887 commit e00756834f7c6dddde83ebdd0c620cc65728efcd
Showing with 188 additions and 2 deletions.
  1. +5 −0 bentoml/configuration/default_bentoml.cfg
  2. +11 −2 bentoml/server/bento_api_server.py
  3. +170 −0 bentoml/utils/trace.py
  4. +2 −0 setup.py
@@ -49,6 +49,11 @@ feedback_log_filename = feedback.log
feedback_log_json_format = "(service_name) (service_version) (request_id) (asctime)"


[tracing]
# example: http://127.0.0.1:9411/api/v1/spans
zipkin_api_url =


[apiserver]
default_port = 5000
enable_metrics = true
@@ -30,10 +30,12 @@

from bentoml import config
from bentoml.utils.usage_stats import track_server
from bentoml.utils.trace import trace
from bentoml.exceptions import BentoMLException
from .middlewares import InstrumentMiddleware


ZIPKIN_API_URL = config("tracing").get("zipkin_api_url")
CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8")

prediction_logger = logging.getLogger("bentoml.prediction")
@@ -316,7 +318,7 @@ def bento_service_api_func_wrapper(self, api):
service_name = self.bento_service.name
service_version = self.bento_service.version

def api_func_wrapper():
def api_func():
# Log image files in request if there is any
image_paths = self.log_image(request, request_id)

@@ -376,7 +378,14 @@ def api_func_wrapper():

return response

return api_func_wrapper
def _wrapped_api_func():
with trace(
ZIPKIN_API_URL, request.headers, service_name=self.__class__.__name__
):
resp = api_func()
return resp

return _wrapped_api_func

def log_exception(self, exc_info):
"""Logs an exception. This is called by :meth:`handle_exception`
@@ -0,0 +1,170 @@
# Copyright 2019 Atalaya Tech, Inc.

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import random
import aiohttp
import asyncio
from contextlib import contextmanager
from contextvars import ContextVar
from functools import partial

import requests
from py_zipkin import Tracer
from py_zipkin.zipkin import ZipkinAttrs, zipkin_span
from py_zipkin.transport import BaseTransportHandler
from py_zipkin.util import generate_random_64bit_string


trace_stack_var = ContextVar('trace_stack', default=list())


def load_http_headers(headers):
if not headers or "X-B3-TraceId" not in headers:
return None
return ZipkinAttrs(
headers.get("X-B3-TraceId"),
headers.get("X-B3-SpanId"),
headers.get("X-B3-ParentSpanId"),
headers.get("X-B3-Flags") or '0',
False if headers.get("X-B3-Sampled") == '0' else True,
)


def make_http_headers(attrs):
headers = {
"X-B3-TraceId": attrs.trace_id,
"X-B3-SpanId": attrs.span_id,
"X-B3-Flags": attrs.flags,
"X-B3-Sampled": attrs.is_sampled and '1' or '0',
}
if attrs.parent_span_id:
headers["X-B3-ParentSpanId"] = attrs.parent_span_id
return headers


def make_child_attrs(attrs):
return ZipkinAttrs(
attrs.trace_id,
generate_random_64bit_string(),
attrs.span_id,
attrs.flags,
attrs.is_sampled,
)


def make_new_attrs(sample_rate=1.0):
return ZipkinAttrs(
generate_random_64bit_string(),
generate_random_64bit_string(),
None,
'0',
sample_rate and random.random() < sample_rate or False,
)


class HttpTransport(BaseTransportHandler):
def __init__(self, server_url):
super(HttpTransport, self).__init__()
self.server_url = server_url

def get_max_payload_bytes(self):
return None

def send(self, payload):
requests.post(
self.server_url,
data=payload,
headers={'Content-Type': 'application/x-thrift'},
)


class AsyncHttpTransport(BaseTransportHandler):
'''
add trace data transporting task into default eventloop
'''

def __init__(self, server_url):
super(AsyncHttpTransport, self).__init__()
self.server_url = server_url

def get_max_payload_bytes(self):
return None

@staticmethod
async def _async_post(url, data, headers):
async with aiohttp.ClientSession() as client:
async with client.post(url, data=data, headers=headers) as resp:
resp = await resp.text()
return resp

def send(self, payload):
asyncio.get_event_loop().create_task(
self._async_post(
self.server_url,
data=payload,
headers={'Content-Type': 'application/x-thrift'},
)
)


@contextmanager
def trace(
server_url,
request_headers=None,
async_transport=False,
sample_rate=1.0,
service_name="some service",
span_name="service procedure",
port=0,
):
trace_stack = trace_stack_var.get()

parent_attrs = (
load_http_headers(request_headers) or trace_stack and trace_stack[-1] or None
)

if parent_attrs:
attrs = make_child_attrs(parent_attrs)
else:
attrs = make_new_attrs(sample_rate)

if not attrs.is_sampled or not server_url:
trace_stack.append(attrs)
trace_stack_var.set(trace_stack)
yield attrs
trace_stack.pop()
trace_stack_var.set(trace_stack)
return

if async_transport:
transport_handler = AsyncHttpTransport(server_url)
else:
transport_handler = HttpTransport(server_url)

with zipkin_span(
service_name=service_name,
span_name=span_name,
zipkin_attrs=attrs,
transport_handler=transport_handler,
port=port,
_tracer=Tracer(),
) as ctx:
trace_stack.append(attrs)
trace_stack_var.set(trace_stack)
yield ctx.zipkin_attrs
trace_stack.pop()
trace_stack_var.set(trace_stack)


async_trace = partial(trace, async_transport=True)
@@ -40,6 +40,8 @@
"humanfriendly",
"alembic",
"aiohttp",
"py_zipkin",
'contextvars;python_version < "3.7"',
# python-dateutil required by pandas and boto3, this makes sure the version
# works for both
"python-dateutil>=2.1,<2.8.1",

0 comments on commit e007568

Please sign in to comment.
You can’t perform that action at this time.