Skip to content

Commit

Permalink
replace existing MsgpackEncoder
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle-Verhoog committed Jun 9, 2020
1 parent 40c4d39 commit 5e58b1d
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 107 deletions.
34 changes: 2 additions & 32 deletions ddtrace/encoding.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import json
import struct

try:
import msgpack
except ImportError:
pass

from .internal.logger import get_logger
from .internal._encoding import TraceMsgpackEncoder
from .internal._encoding import MsgpackEncoder


log = get_logger(__name__)
Expand Down Expand Up @@ -118,33 +117,4 @@ def _decode_id_to_hex(hex_id):
return int(hex_id, 16)


class MsgpackEncoder(_EncoderBase):
content_type = 'application/msgpack'

@staticmethod
def encode(obj):
return msgpack.packb(obj)

@staticmethod
def decode(data):
if msgpack.version[:2] < (0, 6):
return msgpack.unpackb(data)
return msgpack.unpackb(data, raw=True)

@staticmethod
def join_encoded(objs):
"""Join a list of encoded objects together as a msgpack array"""
buf = b''.join(objs)

# Prepend array header to buffer
# https://github.com/msgpack/msgpack-python/blob/f46523b1af7ff2d408da8500ea36a4f9f2abe915/msgpack/fallback.py#L948-L955
count = len(objs)
if count <= 0xf:
return struct.pack('B', 0x90 + count) + buf
elif count <= 0xffff:
return struct.pack('>BH', 0xdc, count) + buf
else:
return struct.pack('>BI', 0xdd, count) + buf


Encoder = TraceMsgpackEncoder
Encoder = MsgpackEncoder
2 changes: 1 addition & 1 deletion ddtrace/internal/_encoding.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ cdef class Packer(object):
return buff_to_buff(self.pk.buf, self.pk.length)


cdef class TraceMsgpackEncoder(object):
cdef class MsgpackEncoder(object):
content_type = "application/msgpack"

cpdef decode(self, data):
Expand Down
62 changes: 5 additions & 57 deletions tests/benchmarks/test_encoding.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import random
import string

import msgpack
from msgpack.fallback import Packer
import pytest

from ddtrace import Span, Tracer
from ddtrace.encoding import _EncoderBase, MsgpackEncoder, TraceMsgpackEncoder
from ddtrace.encoding import _EncoderBase, MsgpackEncoder

from ..test_encoders import RefMsgpackEncoder, gen_trace


msgpack_encoder = MsgpackEncoder()
trace_encoder = TraceMsgpackEncoder()
msgpack_encoder = RefMsgpackEncoder()
trace_encoder = MsgpackEncoder()


class PPMsgpackEncoder(_EncoderBase):
Expand All @@ -27,56 +25,6 @@ def decode(data):
return msgpack.unpackb(data, raw=True)


def rands(size=6, chars=string.ascii_uppercase + string.digits):
return "".join(random.choice(chars) for _ in range(size))


def gen_span(length=None, **span_attrs):
# Helper to generate spans
name = span_attrs.pop("name", None)
if name is None:
name = "a" * length

span = Span(None, **span_attrs)

for attr in span_attrs:
if hasattr(span, attr):
setattr(span, attr, attr)
else:
pass

if length is not None:
pass


def gen_trace(nspans=1000, ntags=50, key_size=15, value_size=20, nmetrics=10):
t = Tracer()

root = None
trace = []
for i in range(0, nspans):
parent_id = root.span_id if root else None
with Span(
t, "span_name", resource="/fsdlajfdlaj/afdasd%s" % i, service="myservice", parent_id=parent_id,
) as span:
span._parent = root
span.set_tags({rands(key_size): rands(value_size) for _ in range(0, ntags)})

# only apply a span type to the root span
if not root:
span.span_type = "web"

for _ in range(0, nmetrics):
span.set_tag(rands(key_size), random.randint(0, 2 ** 16))

trace.append(span)

if not root:
root = span

return trace


trace_large = gen_trace(nspans=1000)
trace_small = gen_trace(nspans=50, key_size=10, ntags=5, nmetrics=4)

Expand Down
113 changes: 98 additions & 15 deletions tests/test_encoders.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,94 @@
import json
import random
import string
import struct
from unittest import TestCase

import msgpack

from ddtrace.tracer import Tracer
from ddtrace.span import Span
from ddtrace.compat import msgpack_type, string_type
from ddtrace.encoding import JSONEncoder, JSONEncoderV2, MsgpackEncoder, TraceMsgpackEncoder
from ddtrace.encoding import _EncoderBase, JSONEncoder, JSONEncoderV2, MsgpackEncoder

from .benchmarks.test_encoding import gen_trace

def rands(size=6, chars=string.ascii_uppercase + string.digits):
return "".join(random.choice(chars) for _ in range(size))


def gen_span(length=None, **span_attrs):
# Helper to generate spans
name = span_attrs.pop("name", None)
if name is None:
name = "a" * length

span = Span(None, **span_attrs)

for attr in span_attrs:
if hasattr(span, attr):
setattr(span, attr, attr)
else:
pass

if length is not None:
pass


def gen_trace(nspans=1000, ntags=50, key_size=15, value_size=20, nmetrics=10):
t = Tracer()

root = None
trace = []
for i in range(0, nspans):
parent_id = root.span_id if root else None
with Span(
t, "span_name", resource="/fsdlajfdlaj/afdasd%s" % i, service="myservice", parent_id=parent_id,
) as span:
span._parent = root
span.set_tags({rands(key_size): rands(value_size) for _ in range(0, ntags)})

# only apply a span type to the root span
if not root:
span.span_type = "web"

for _ in range(0, nmetrics):
span.set_tag(rands(key_size), random.randint(0, 2 ** 16))

trace.append(span)

if not root:
root = span

return trace


class RefMsgpackEncoder(_EncoderBase):
content_type = "application/msgpack"

@staticmethod
def encode(obj):
return msgpack.packb(obj)

@staticmethod
def decode(data):
if msgpack.version[:2] < (0, 6):
return msgpack.unpackb(data)
return msgpack.unpackb(data, raw=True)

@staticmethod
def join_encoded(objs):
"""Join a list of encoded objects together as a msgpack array"""
buf = b"".join(objs)

# Prepend array header to buffer
# https://github.com/msgpack/msgpack-python/blob/f46523b1af7ff2d408da8500ea36a4f9f2abe915/msgpack/fallback.py#L948-L955
count = len(objs)
if count <= 0xF:
return struct.pack("B", 0x90 + count) + buf
elif count <= 0xFFFF:
return struct.pack(">BH", 0xDC, count) + buf
else:
return struct.pack(">BI", 0xDD, count) + buf


class TestEncoders(TestCase):
Expand Down Expand Up @@ -197,43 +278,45 @@ def test_join_encoded_msgpack(self):


def decode(obj):
if msgpack.version[:2] < (0, 6):
return msgpack.unpackb(obj)
return msgpack.unpackb(obj, raw=True)


def test_custom_msgpack_encode():
tencoder = TraceMsgpackEncoder()
mencoder = MsgpackEncoder()
encoder = MsgpackEncoder()
refencoder = RefMsgpackEncoder()

trace = gen_trace(nspans=50)

# Note that we assert on the decoded versions because the encoded
# can vary due to non-deterministic map key/value positioning
assert decode(mencoder.encode_trace(trace)) == decode(tencoder.encode_trace(trace))
assert decode(refencoder.encode_trace(trace)) == decode(encoder.encode_trace(trace))

ref_encoded = mencoder.encode_traces([trace, trace])
encoded = tencoder.encode_traces([trace, trace])
ref_encoded = refencoder.encode_traces([trace, trace])
encoded = encoder.encode_traces([trace, trace])
assert decode(encoded) == decode(ref_encoded)

# Empty trace (not that this should be done in practice)
assert decode(mencoder.encode_trace([])) == decode(tencoder.encode_trace([]))
assert decode(refencoder.encode_trace([])) == decode(encoder.encode_trace([]))

s = Span(None, None)
# Need to .finish() to have a duration since the old implementation will not encode
# duration_ns, the new one will encode as None
s.finish()
assert decode(mencoder.encode_trace([s])) == decode(tencoder.encode_trace([s]))
assert decode(refencoder.encode_trace([s])) == decode(encoder.encode_trace([s]))


def test_custom_msgpack_join_encoded():
tencoder = TraceMsgpackEncoder()
mencoder = MsgpackEncoder()
encoder = MsgpackEncoder()
refencoder = RefMsgpackEncoder()

trace = gen_trace(nspans=50)

ref = mencoder.join_encoded([mencoder.encode_trace(trace) for _ in range(10)])
custom = tencoder.join_encoded([tencoder.encode_trace(trace) for _ in range(10)])
ref = refencoder.join_encoded([refencoder.encode_trace(trace) for _ in range(10)])
custom = encoder.join_encoded([encoder.encode_trace(trace) for _ in range(10)])
assert decode(ref) == decode(custom)

ref = mencoder.join_encoded([mencoder.encode_trace(trace) for _ in range(1)])
custom = tencoder.join_encoded([tencoder.encode_trace(trace) for _ in range(1)])
ref = refencoder.join_encoded([refencoder.encode_trace(trace) for _ in range(1)])
custom = encoder.join_encoded([encoder.encode_trace(trace) for _ in range(1)])
assert decode(ref) == decode(custom)
4 changes: 2 additions & 2 deletions tests/utils/tracer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ddtrace.encoding import JSONEncoder, TraceMsgpackEncoder
from ddtrace.encoding import JSONEncoder, MsgpackEncoder
from ddtrace.internal.writer import AgentWriter
from ddtrace.tracer import Tracer

Expand All @@ -15,7 +15,7 @@ def __init__(self, *args, **kwargs):
self.traces = []
self.services = {}
self.json_encoder = JSONEncoder()
self.msgpack_encoder = TraceMsgpackEncoder()
self.msgpack_encoder = MsgpackEncoder()

def write(self, spans=None, services=None):
if spans:
Expand Down

0 comments on commit 5e58b1d

Please sign in to comment.