In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from opentelemetry import metrics
from opentelemetry.sdk.metrics import Counter, MeterProvider
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporterToo
from opentelemetry.sdk.metrics.export.controller import PushController

# global metrics config
metrics.set_meter_provider(MeterProvider())
meter = metrics.get_meter(__name__, False)

def compact_formatter(record):
    return '{}\t{}\t{}'.format(
        record.metric.name,
        ','.join("{}:{}".format(k, v) for k, v in record.labels),
        record.aggregator.checkpoint
    )

# hacked up to work like ConsoleSpanExporter on this branch, won't work on master...
exporter = ConsoleMetricsExporter(formatter=compact_formatter)
# collect and export metrics every second
controller = PushController(meter, exporter, 1)

In [3]:
from functools import wraps
from contextlib import contextmanager

from opentelemetry.util import time_ns

def timer(measure):
    def outer(func):
        @wraps(func)
        def inner(*args, **kwargs):
            start = time_ns()
            try:
                return func(*args, **kwargs)
            finally:
                measure.add(time_ns() - start, {"func": func.__name__})
        return inner
    return outer

def exception_counter(measure):
    def outer(func):
        @wraps(func)
        def inner(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as ex:
                measure.add(1, {"func": func.__name__, "error": ex.__class__.__name__})
                raise
        return inner
    return outer


timings = meter.create_metric(
    name="timings",
    description="Cumulative ns run time per function",
    unit="1",
    value_type=int,
    metric_type=Counter,
    label_keys=("func")
)

errors = meter.create_metric(
    name="errors",
    description="Error count per function",
    unit="1",
    value_type=int,
    metric_type=Counter,
    label_keys=("func", "error")
)

In [4]:
import time
from concurrent.futures import ThreadPoolExecutor
from random import random

@timer(timings)
@exception_counter(errors)
def buggy_sleep(x):
    rr = random()
    if rr < .1:
        raise ValueError()
    if rr < .2:
        raise ArgumentError()
    time.sleep(random())
    

with ThreadPoolExecutor(max_workers=10) as tpe:
    start = time.time()
    while time.time() - start < 10:
        tpe.submit(buggy_sleep)

timings	func:buggy_sleep	5617517000
errors	error:TypeError,func:buggy_sleep	336
timings	func:buggy_sleep	61375312000
errors	error:TypeError,func:buggy_sleep	205
timings	func:buggy_sleep	16682692000
errors	error:TypeError,func:buggy_sleep	45
timings	func:buggy_sleep	24293582000
errors	error:TypeError,func:buggy_sleep	22919
timings	func:buggy_sleep	9639899000
errors	error:TypeError,func:buggy_sleep	25312
timings	func:buggy_sleep	9639775000
errors	error:TypeError,func:buggy_sleep	25661
timings	func:buggy_sleep	9604135000
errors	error:TypeError,func:buggy_sleep	25370
timings	func:buggy_sleep	9609176000
errors	error:TypeError,func:buggy_sleep	25370
timings	func:buggy_sleep	9634748000
errors	error:TypeError,func:buggy_sleep	25356
timings	func:buggy_sleep	9633206000
errors	error:TypeError,func:buggy_sleep	25290
timings	func:buggy_sleep	9632477000
errors	error:TypeError,func:buggy_sleep	24493
timings	func:buggy_sleep	9629794000
errors	error:TypeError,func:buggy_sleep	24704
timings	func:buggy_s