From 5635cb0deef8694afe5ae1620244e3a3e0035390 Mon Sep 17 00:00:00 2001 From: Daniele Rolando Date: Tue, 25 Sep 2018 18:35:59 -0700 Subject: [PATCH] Add V1 to V2 conversion logic. --- py_zipkin/encoding/__init__.py | 96 ++++++ py_zipkin/encoding/_decoders.py | 292 ++++++++++++++++++ tests/conftest.py | 38 --- tests/encoding/__init__.py | 0 tests/encoding/__init__test.py | 58 ++++ tests/encoding/_decoders_test.py | 207 +++++++++++++ tests/integration/decoding_test.py | 58 ++++ tests/integration/encoding_test.py | 2 +- tests/logging_helper_test.py | 4 +- tests/profiling/zipkin_span_benchmark_test.py | 2 +- tests/test_helpers.py | 133 ++++++++ tests/transport_test.py | 2 +- tests/zipkin_test.py | 2 +- tox.ini | 2 +- 14 files changed, 851 insertions(+), 45 deletions(-) create mode 100644 py_zipkin/encoding/_decoders.py create mode 100644 tests/encoding/__init__.py create mode 100644 tests/encoding/__init__test.py create mode 100644 tests/encoding/_decoders_test.py create mode 100644 tests/integration/decoding_test.py create mode 100644 tests/test_helpers.py diff --git a/py_zipkin/encoding/__init__.py b/py_zipkin/encoding/__init__.py index e69de29..20975ad 100644 --- a/py_zipkin/encoding/__init__.py +++ b/py_zipkin/encoding/__init__.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +import json + +import six + +from py_zipkin.encoding._types import Encoding +from py_zipkin.encoding._decoders import get_decoder +from py_zipkin.encoding._encoders import get_encoder +from py_zipkin.exception import ZipkinError + +_V2_ATTRIBUTES = ["tags", "localEndpoint", "remoteEndpoint", "shared", "kind"] + + +def detect_span_version_and_encoding(message): + """Returns the span type and encoding for the message provided. + + The logic in this function is a Python port of + https://github.com/openzipkin/zipkin/blob/master/zipkin/src/main/java/zipkin/internal/DetectingSpanDecoder.java + + :param message: span to perform operations on. + :type message: byte array + :returns: span encoding. + :rtype: Encoding + """ + # In case message is sent in as non-bytearray format, + # safeguard convert to bytearray before handling + if isinstance(message, six.string_types): + message = six.b(message) + + if len(message) < 2: + raise ZipkinError("Invalid span format. Message too short.") + + # Check for binary format + if six.byte2int(message) <= 16: + if message[0] == 10 and message[1] != 0: + # Excluding from coverage since Protobuf encoding is + # not yet implemented. + return Encoding.V2_PROTOBUF # pragma: no cover + return Encoding.V1_THRIFT + + str_msg = message.decode('utf-8') + + # JSON case for list of spans + if str_msg[0] == '[': + span_list = json.loads(str_msg) + if len(span_list) > 0: + # Assumption: All spans in a list are the same version + # Logic: Search for identifying fields in all spans, if any span can + # be strictly identified to a version, return that version. + # Otherwise, if no spans could be strictly identified, default to V2. + for span in span_list: + if any(word in span for word in _V2_ATTRIBUTES): + return Encoding.V2_JSON + elif ( + 'binaryAnnotations' in span or + ( + 'annotations' in span and + 'endpoint' in span['annotations'] + ) + ): + return Encoding.V1_JSON + return Encoding.V2_JSON + + raise ZipkinError("Unknown or unsupported span encoding") + + +def convert_spans(spans, output_encoding, input_encoding=None): + """Converts encoded spans to a different encoding. + + param spans: encoded input spans. + type spans: byte array + param output_encoding: desired output encoding. + type output_encoding: Encoding + param input_encoding: optional input encoding. If this is not specified, it'll + try to understand the encoding automatically by inspecting the input spans. + type input_encoding: Encoding + :returns: encoded spans. + :rtype: byte array + """ + if not isinstance(input_encoding, Encoding): + input_encoding = detect_span_version_and_encoding(message=spans) + + if input_encoding == output_encoding: + return spans + + decoder = get_decoder(input_encoding) + encoder = get_encoder(output_encoding) + span_builders = decoder.decode_spans(spans) + output_spans = [] + + # Encode each indivicual span + for sb in span_builders: + output_spans.append(encoder.encode_span(sb)) + + # Outputs from encoder.encode_span() can be easily concatenated in a list + return encoder.encode_queue(output_spans) diff --git a/py_zipkin/encoding/_decoders.py b/py_zipkin/encoding/_decoders.py new file mode 100644 index 0000000..b2a1b42 --- /dev/null +++ b/py_zipkin/encoding/_decoders.py @@ -0,0 +1,292 @@ +# -*- coding: utf-8 -*- +import logging +import socket +import struct + +import six +from thriftpy.protocol.binary import read_list_begin +from thriftpy.protocol.binary import TBinaryProtocol +from thriftpy.thrift import TType +from thriftpy.transport import TMemoryBuffer + +from py_zipkin.encoding._types import Encoding +from py_zipkin.encoding._types import Kind +from py_zipkin.exception import ZipkinError +from py_zipkin.thrift import zipkin_core +from py_zipkin.encoding._helpers import Endpoint +from py_zipkin.encoding._helpers import SpanBuilder + +_HEX_DIGITS = "0123456789abcdef" +_DROP_ANNOTATIONS = {'cs', 'sr', 'ss', 'cr'} + +log = logging.getLogger('py_zipkin.encoding') + + +def get_decoder(encoding): + """Creates encoder object for the given encoding. + + :param encoding: desired output encoding protocol + :type encoding: Encoding + :return: corresponding IEncoder object + :rtype: IEncoder + """ + if encoding == Encoding.V1_THRIFT: + return _V1ThriftDecoder() + if encoding == Encoding.V1_JSON: + raise NotImplementedError( + '{} decoding not yet implemented'.format(encoding)) + if encoding == Encoding.V2_JSON: + raise NotImplementedError( + '{} decoding not yet implemented'.format(encoding)) + raise ZipkinError('Unknown encoding: {}'.format(encoding)) + + +class IDecoder(object): + """Decoder interface.""" + + def decode_spans(self, spans): + """Decodes an encoded list of spans. + + :param spans: encoded list of spans + :type spans: bytes + :return: list of span builders + :rtype: list + """ + raise NotImplementedError() + + +class _V1ThriftDecoder(IDecoder): + + def decode_spans(self, spans): + """Decodes an encoded list of spans. + + :param spans: encoded list of spans + :type spans: bytes + :return: list of span builders + :rtype: list + """ + span_builders = [] + transport = TMemoryBuffer(spans) + + if six.byte2int(spans) == TType.STRUCT: + _, size = read_list_begin(transport) + else: + size = 1 + + for _ in range(size): + span = zipkin_core.Span() + span.read(TBinaryProtocol(transport)) + span_builders.append(self._decode_thrift_span(span)) + return span_builders + + def _convert_from_thrift_endpoint(self, thrift_endpoint): + """Accepts a thrift decoded endpoint and converts it to an Endpoint. + + :param thrift_endpoint: thrift encoded endpoint + :type thrift_endpoint: thrift endpoint + :returns: decoded endpoint + :rtype: Encoding + """ + ipv4 = None + ipv6 = None + port = struct.unpack('H', struct.pack('h', thrift_endpoint.port))[0] + + if thrift_endpoint.ipv4 != 0: + ipv4 = socket.inet_ntop( + socket.AF_INET, + struct.pack('!i', thrift_endpoint.ipv4), + ) + + if thrift_endpoint.ipv6: + ipv6 = socket.inet_ntop(socket.AF_INET6, thrift_endpoint.ipv6) + + return Endpoint( + service_name=thrift_endpoint.service_name, + ipv4=ipv4, + ipv6=ipv6, + port=port, + ) + + def _decode_thrift_annotations(self, thrift_annotations): + """Accepts a thrift annotation and converts it to a v1 annotation. + + :param thrift_annotations: list of thrift annotations. + :type thrift_annotations: list of zipkin_core.Span.Annotation + :returns: (annotations, local_endpoint, kind) + """ + local_endpoint = None + kind = Kind.LOCAL + all_annotations = {} + timestamp = None + duration = None + + for thrift_annotation in thrift_annotations: + all_annotations[thrift_annotation.value] = thrift_annotation.timestamp + if thrift_annotation.host: + local_endpoint = self._convert_from_thrift_endpoint( + thrift_annotation.host, + ) + + if 'cs' in all_annotations and 'sr' not in all_annotations: + kind = Kind.CLIENT + timestamp = all_annotations['cs'] + duration = all_annotations['cr'] - all_annotations['cs'] + elif 'cs' not in all_annotations and 'sr' in all_annotations: + kind = Kind.SERVER + timestamp = all_annotations['sr'] + duration = all_annotations['ss'] - all_annotations['sr'] + + annotations = { + name: self.seconds(ts) for name, ts in all_annotations.items() + if name not in _DROP_ANNOTATIONS + } + + return annotations, local_endpoint, kind, timestamp, duration + + def _convert_from_thrift_binary_annotations(self, thrift_binary_annotations): + """Accepts a thrift decoded binary annotation and converts it + to a v1 binary annotation. + """ + tags = {} + local_endpoint = None + remote_endpoint = None + + for binary_annotation in thrift_binary_annotations: + if binary_annotation.key == 'sa': + remote_endpoint = self._convert_from_thrift_endpoint( + thrift_endpoint=binary_annotation.host, + ) + else: + key = binary_annotation.key + + annotation_type = binary_annotation.annotation_type + value = binary_annotation.value + + if annotation_type == zipkin_core.AnnotationType.BOOL: + tags[key] = "true" if value == 1 else "false" + elif annotation_type == zipkin_core.AnnotationType.STRING: + tags[key] = str(value) + else: + log.warning('Only STRING and BOOL binary annotations are ' + 'supported right now and can be properly decoded.') + + if binary_annotation.host: + local_endpoint = self._convert_from_thrift_endpoint( + thrift_endpoint=binary_annotation.host, + ) + + return tags, local_endpoint, remote_endpoint + + def seconds(self, us): + return round(float(us) / 1000 / 1000, 6) + + def _decode_thrift_span(self, thrift_span): + """Decodes a thrift span. + + :param thrift_span: thrift span + :type thrift_span: thrift Span object + :returns: span builder representing this span + :rtype: SpanBuilder + """ + parent_id = None + local_endpoint = None + annotations = {} + tags = {} + kind = Kind.LOCAL + service_name = '' + sa_endpoint = None + timestamp = None + duration = None + + if thrift_span.parent_id: + parent_id = self._convert_unsigned_long_to_lower_hex( + thrift_span.parent_id, + ) + + if thrift_span.annotations: + annotations, local_endpoint, kind, timestamp, duration = \ + self._decode_thrift_annotations(thrift_span.annotations) + + if thrift_span.binary_annotations: + tags, local_endpoint, sa_endpoint = \ + self._convert_from_thrift_binary_annotations( + thrift_span.binary_annotations, + ) + + trace_id = self._convert_trace_id_to_string( + thrift_span.trace_id, + thrift_span.trace_id_high, + ) + + return SpanBuilder( + trace_id=trace_id, + name=thrift_span.name, + parent_id=parent_id, + span_id=self._convert_unsigned_long_to_lower_hex(thrift_span.id), + timestamp=self.seconds(timestamp or thrift_span.timestamp), + duration=self.seconds(duration or thrift_span.duration), + annotations=annotations, + tags=tags, + kind=kind, + local_endpoint=local_endpoint, + service_name=service_name, + sa_endpoint=sa_endpoint, + report_timestamp=thrift_span.timestamp is not None, + ) + + def _convert_trace_id_to_string(self, trace_id, trace_id_high=None): + """ + Converts the provided traceId hex value with optional high bits + to a string. + + :param trace_id: the value of the trace ID + :type trace_id: int + :param trace_id_high: the high bits of the trace ID + :type trace_id: int + :returns: trace_id_high + trace_id as a string + """ + if trace_id_high is not None: + result = bytearray(32) + self._write_hex_long(result, 0, trace_id_high) + self._write_hex_long(result, 16, trace_id) + return result.decode("utf8") + + result = bytearray(16) + self._write_hex_long(result, 0, trace_id) + return result.decode("utf8") + + def _convert_unsigned_long_to_lower_hex(self, value): + """ + Converts the provided unsigned long value to a hex string. + + :param value: the value to convert + :type value: unsigned long + :returns: value as a hex string + """ + result = bytearray(16) + self._write_hex_long(result, 0, value) + return result.decode("utf8") + + def _write_hex_long(self, data, pos, value): + """ + Writes an unsigned long value across a byte array. + + :param data: the buffer to write the value to + :type data: bytearray + :param pos: the starting position + :type pos: int + :param value: the value to write + :type value: unsigned long + """ + self._write_hex_byte(data, pos + 0, (value >> 56) & 0xff) + self._write_hex_byte(data, pos + 2, (value >> 48) & 0xff) + self._write_hex_byte(data, pos + 4, (value >> 40) & 0xff) + self._write_hex_byte(data, pos + 6, (value >> 32) & 0xff) + self._write_hex_byte(data, pos + 8, (value >> 24) & 0xff) + self._write_hex_byte(data, pos + 10, (value >> 16) & 0xff) + self._write_hex_byte(data, pos + 12, (value >> 8) & 0xff) + self._write_hex_byte(data, pos + 14, (value & 0xff)) + + def _write_hex_byte(self, data, pos, byte): + data[pos + 0] = ord(_HEX_DIGITS[int((byte >> 4) & 0xf)]) + data[pos + 1] = ord(_HEX_DIGITS[int(byte & 0xf)]) diff --git a/tests/conftest.py b/tests/conftest.py index 7f7b7e4..824607b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,5 @@ -import mock import pytest -import six -from py_zipkin.encoding._encoders import IEncoder -from py_zipkin.transport import BaseTransportHandler from py_zipkin.zipkin import ZipkinAttrs @@ -25,37 +21,3 @@ def sampled_zipkin_attr(zipkin_attributes): @pytest.fixture def unsampled_zipkin_attr(zipkin_attributes): return ZipkinAttrs(is_sampled=False, **zipkin_attributes) - - -class MockTransportHandler(BaseTransportHandler): - - def __init__(self, max_payload_bytes=None): - self.max_payload_bytes = max_payload_bytes - self.payloads = [] - - def send(self, payload): - self.payloads.append(payload) - return payload - - def get_max_payload_bytes(self): - return self.max_payload_bytes - - def get_payloads(self): - return self.payloads - - -class MockEncoder(IEncoder): - - def __init__(self, fits=True, encoded_span='', encoded_queue=''): - self.fits_bool = fits - self.encode_span = mock.Mock( - return_value=(encoded_span, len(encoded_span)), - ) - self.encode_queue = mock.Mock(return_value=encoded_queue) - - def fits(self, current_count, current_size, max_size, new_span): - assert isinstance(current_count, int) - assert isinstance(current_size, int) - assert isinstance(max_size, int) - assert isinstance(new_span, six.string_types) - return self.fits_bool diff --git a/tests/encoding/__init__.py b/tests/encoding/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/encoding/__init__test.py b/tests/encoding/__init__test.py new file mode 100644 index 0000000..3d36b57 --- /dev/null +++ b/tests/encoding/__init__test.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +import pytest + +from py_zipkin import Encoding +from py_zipkin.encoding import convert_spans +from py_zipkin.encoding import detect_span_version_and_encoding +from py_zipkin.exception import ZipkinError +from tests.test_helpers import generate_list_of_spans + + +@pytest.mark.parametrize('encoding', [ + Encoding.V1_THRIFT, + Encoding.V1_JSON, + Encoding.V2_JSON, +]) +def test_detect_span_version_and_encoding(encoding): + spans, _, _, _ = generate_list_of_spans(encoding) + old_type = type(spans) + + assert detect_span_version_and_encoding(spans) == encoding + + if encoding in [Encoding.V1_JSON, Encoding.V2_JSON]: + assert type(spans) == old_type + spans = spans.encode() + assert detect_span_version_and_encoding(spans) == encoding + + +def test_detect_span_version_and_encoding_incomplete_message(): + with pytest.raises(ZipkinError): + detect_span_version_and_encoding('[') + + +def test_detect_span_version_and_encoding_ambiguous_json(): + """JSON spans that don't have any v1 or v2 keyword default to V2""" + assert detect_span_version_and_encoding( + '[{"traceId": "aaa", "id": "bbb"}]', + ) == Encoding.V2_JSON + + +def test_detect_span_version_and_encoding_unknown_encoding(): + with pytest.raises(ZipkinError): + detect_span_version_and_encoding('foobar') + + +def test_convert_spans_thrift_to_v2_json(): + spans, _, _, _ = generate_list_of_spans(Encoding.V1_THRIFT) + + converted_spans = convert_spans(spans=spans, output_encoding=Encoding.V2_JSON) + + assert detect_span_version_and_encoding(converted_spans) == Encoding.V2_JSON + + +def test_convert_spans_v2_json_to_v2_json(): + spans, _, _, _ = generate_list_of_spans(Encoding.V2_JSON) + + converted_spans = convert_spans(spans=spans, output_encoding=Encoding.V2_JSON) + + assert detect_span_version_and_encoding(converted_spans) == Encoding.V2_JSON diff --git a/tests/encoding/_decoders_test.py b/tests/encoding/_decoders_test.py new file mode 100644 index 0000000..fb801be --- /dev/null +++ b/tests/encoding/_decoders_test.py @@ -0,0 +1,207 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import mock +import pytest + +from py_zipkin import thrift +from py_zipkin.encoding._decoders import get_decoder +from py_zipkin.encoding._decoders import IDecoder +from py_zipkin.encoding._decoders import _V1ThriftDecoder +from py_zipkin.encoding._helpers import Endpoint +from py_zipkin.encoding._types import Encoding +from py_zipkin.encoding._types import Kind +from py_zipkin.exception import ZipkinError +from py_zipkin.thrift import create_binary_annotation +from py_zipkin.thrift import zipkin_core +from py_zipkin.util import generate_random_128bit_string +from py_zipkin.util import generate_random_64bit_string +from tests.test_helpers import generate_list_of_spans +from tests.test_helpers import generate_single_thrift_span + +USEC = 1000 * 1000 + + +@pytest.fixture +def thrift_endpoint(): + return thrift.create_endpoint(8888, 'test_service', '10.0.0.1', None) + + +def test_get_decoder(): + assert isinstance(get_decoder(Encoding.V1_THRIFT), _V1ThriftDecoder) + with pytest.raises(NotImplementedError): + get_decoder(Encoding.V1_JSON) + with pytest.raises(NotImplementedError): + get_decoder(Encoding.V2_JSON) + with pytest.raises(ZipkinError): + get_decoder(None) + + +def test_idecoder_throws_not_implemented_errors(): + encoder = IDecoder() + with pytest.raises(NotImplementedError): + encoder.decode_spans(b'[]') + + +class TestV1ThriftDecoder(object): + + def test_decode_spans_list(self): + spans, _, _, _ = generate_list_of_spans(Encoding.V1_THRIFT) + decoder = _V1ThriftDecoder() + with mock.patch.object(decoder, '_decode_thrift_span') as mock_decode: + decoder.decode_spans(spans) + assert mock_decode.call_count == 2 + + def test_decode_old_style_thrift_span(self): + """Test it can handle single thrift spans (not a list with 1 span). + + Years ago you'd just thrift encode spans one by one and then write them to + the transport singularly. The zipkin kafka consumer still supports this. + Let's make sure we properly detect this case and don't just assume that + it's a thrift list. + """ + span = generate_single_thrift_span() + decoder = _V1ThriftDecoder() + with mock.patch.object(decoder, '_decode_thrift_span') as mock_decode: + decoder.decode_spans(span) + assert mock_decode.call_count == 1 + + def test__convert_from_thrift_endpoint(self, thrift_endpoint): + decoder = _V1ThriftDecoder() + + ipv4_endpoint = decoder._convert_from_thrift_endpoint(thrift_endpoint) + assert ipv4_endpoint == Endpoint('test_service', '10.0.0.1', None, 8888) + + ipv6_thrift_endpoint = \ + thrift.create_endpoint(8888, 'test_service', None, '::1') + ipv6_endpoint = decoder._convert_from_thrift_endpoint(ipv6_thrift_endpoint) + assert ipv6_endpoint == Endpoint('test_service', None, '::1', 8888) + + def test__decode_thrift_annotations(self, thrift_endpoint): + timestamp = 1.0 + decoder = _V1ThriftDecoder() + thrift_annotations = thrift.annotation_list_builder( + { + 'cs': timestamp, + 'cr': timestamp + 10, + 'my_annotation': timestamp + 15, + }, + thrift_endpoint, + ) + + annotations, end, kind, ts, dur = decoder._decode_thrift_annotations( + thrift_annotations, + ) + assert annotations == {'my_annotation': 16.0} + assert end == Endpoint('test_service', '10.0.0.1', None, 8888) + assert kind == Kind.CLIENT + assert ts == timestamp * USEC + assert dur == 10 * USEC + + def test__decode_thrift_annotations_server_span(self, thrift_endpoint): + timestamp = 1.0 + decoder = _V1ThriftDecoder() + thrift_annotations = thrift.annotation_list_builder( + { + 'sr': timestamp, + 'ss': timestamp + 10, + }, + thrift_endpoint, + ) + + annotations, end, kind, ts, dur = decoder._decode_thrift_annotations( + thrift_annotations, + ) + assert annotations == {} + assert end == Endpoint('test_service', '10.0.0.1', None, 8888) + assert kind == Kind.SERVER + assert ts == timestamp * USEC + assert dur == 10 * USEC + + def test__decode_thrift_annotations_local_span(self, thrift_endpoint): + timestamp = 1.0 + decoder = _V1ThriftDecoder() + thrift_annotations = thrift.annotation_list_builder( + { + 'cs': timestamp, + 'sr': timestamp, + 'ss': timestamp + 10, + 'cr': timestamp + 10, + }, + thrift_endpoint, + ) + + annotations, end, kind, ts, dur = decoder._decode_thrift_annotations( + thrift_annotations, + ) + assert annotations == {} + assert end == Endpoint('test_service', '10.0.0.1', None, 8888) + assert kind == Kind.LOCAL + # ts and dur are not computed for a local span since those always have + # timestamp and duration set as span arguments. + assert ts is None + assert dur is None + + def test__convert_from_thrift_binary_annotations(self): + decoder = _V1ThriftDecoder() + local_host = thrift.create_endpoint(8888, 'test_service', '10.0.0.1', None) + remote_host = thrift.create_endpoint(9999, 'rem_service', '10.0.0.2', None) + ann_type = zipkin_core.AnnotationType + thrift_binary_annotations = [ + create_binary_annotation('key1', True, ann_type.BOOL, local_host), + create_binary_annotation('key2', 'val2', ann_type.STRING, local_host), + create_binary_annotation('key3', False, ann_type.BOOL, local_host), + create_binary_annotation('key4', b'04', ann_type.I16, local_host), + create_binary_annotation('key5', b'0004', ann_type.I32, local_host), + create_binary_annotation('sa', True, ann_type.BOOL, remote_host), + ] + + tags, local_endpoint, remote_endpoint = \ + decoder._convert_from_thrift_binary_annotations( + thrift_binary_annotations, + ) + + assert tags == { + 'key1': 'true', + 'key2': 'val2', + 'key3': 'false', + } + assert local_endpoint == Endpoint('test_service', '10.0.0.1', None, 8888) + assert remote_endpoint == Endpoint('rem_service', '10.0.0.2', None, 9999) + + @pytest.mark.parametrize('trace_id_generator', [ + (generate_random_64bit_string), + (generate_random_128bit_string), + ]) + def test__convert_trace_id_to_string(self, trace_id_generator): + decoder = _V1ThriftDecoder() + trace_id = trace_id_generator() + span = thrift.create_span( + generate_random_64bit_string(), + None, + trace_id, + 'test_span', + [], + [], + None, + None, + ) + assert decoder._convert_trace_id_to_string( + span.trace_id, + span.trace_id_high, + ) == trace_id + + def test__convert_unsigned_long_to_lower_hex(self): + decoder = _V1ThriftDecoder() + span_id = generate_random_64bit_string() + span = thrift.create_span( + span_id, + None, + generate_random_64bit_string(), + 'test_span', + [], + [], + None, + None, + ) + assert decoder._convert_unsigned_long_to_lower_hex(span.id) == span_id diff --git a/tests/integration/decoding_test.py b/tests/integration/decoding_test.py new file mode 100644 index 0000000..5845252 --- /dev/null +++ b/tests/integration/decoding_test.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals +import json + + +from py_zipkin import Encoding +from py_zipkin.encoding import convert_spans +from tests.test_helpers import generate_list_of_spans + + +def us(seconds): + return int(seconds * 1000 * 1000) + + +def test_encoding(): + thrift_spans, zipkin_attrs, inner_span_id, ts = \ + generate_list_of_spans(Encoding.V1_THRIFT) + + json_spans = convert_spans(thrift_spans, Encoding.V2_JSON) + + inner_span, root_span = json.loads(json_spans) + + assert root_span == { + 'traceId': zipkin_attrs.trace_id, + 'name': 'test_span_name', + 'parentId': zipkin_attrs.parent_span_id, + 'id': zipkin_attrs.span_id, + 'timestamp': us(ts), + 'duration': us(10), + 'kind': 'CLIENT', + 'shared': True, + 'localEndpoint': { + 'ipv4': '10.0.0.0', + 'port': 8080, + 'serviceName': 'test_service_name', + }, + 'remoteEndpoint': { + 'ipv6': '2001:db8:85a3::8a2e:370:7334', + 'port': 8888, + 'serviceName': 'sa_service', + }, + 'tags': {'some_key': 'some_value'}, + } + + assert inner_span == { + 'traceId': zipkin_attrs.trace_id, + 'name': 'inner_span', + 'parentId': zipkin_attrs.span_id, + 'id': inner_span_id, + 'timestamp': us(ts), + 'duration': us(5), + 'localEndpoint': { + 'ipv4': '10.0.0.0', + 'port': 8080, + 'serviceName': 'test_service_name', + }, + 'annotations': [{'timestamp': us(ts), 'value': 'ws'}], + } diff --git a/tests/integration/encoding_test.py b/tests/integration/encoding_test.py index 588c8c7..2495a20 100644 --- a/tests/integration/encoding_test.py +++ b/tests/integration/encoding_test.py @@ -16,7 +16,7 @@ from py_zipkin import thrift from py_zipkin.util import generate_random_64bit_string from py_zipkin.zipkin import ZipkinAttrs -from tests.conftest import MockTransportHandler +from tests.test_helpers import MockTransportHandler def _decode_binary_thrift_objs(obj): diff --git a/tests/logging_helper_test.py b/tests/logging_helper_test.py index 72c9852..586ed68 100644 --- a/tests/logging_helper_test.py +++ b/tests/logging_helper_test.py @@ -11,8 +11,8 @@ from py_zipkin.exception import ZipkinError from py_zipkin.storage import SpanStorage from py_zipkin.zipkin import ZipkinAttrs -from tests.conftest import MockEncoder -from tests.conftest import MockTransportHandler +from tests.test_helpers import MockEncoder +from tests.test_helpers import MockTransportHandler @pytest.fixture diff --git a/tests/profiling/zipkin_span_benchmark_test.py b/tests/profiling/zipkin_span_benchmark_test.py index fa59a01..df8d6a6 100644 --- a/tests/profiling/zipkin_span_benchmark_test.py +++ b/tests/profiling/zipkin_span_benchmark_test.py @@ -1,7 +1,7 @@ import pytest import py_zipkin.zipkin as zipkin -from tests.conftest import MockTransportHandler +from tests.test_helpers import MockTransportHandler def _create_root_span(is_sampled, firehose_enabled): diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..8fc61ed --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,133 @@ +# -*- coding: utf-8 -*- +import time + +import mock +import six + +from py_zipkin import Kind +from py_zipkin import zipkin +from py_zipkin import thrift +from py_zipkin.encoding._encoders import IEncoder +from py_zipkin.thrift import zipkin_core +from py_zipkin.transport import BaseTransportHandler +from py_zipkin.util import generate_random_128bit_string +from py_zipkin.util import generate_random_64bit_string +from py_zipkin.zipkin import ZipkinAttrs + + +class MockTransportHandler(BaseTransportHandler): + + def __init__(self, max_payload_bytes=None): + self.max_payload_bytes = max_payload_bytes + self.payloads = [] + + def send(self, payload): + self.payloads.append(payload) + return payload + + def get_max_payload_bytes(self): + return self.max_payload_bytes + + def get_payloads(self): + return self.payloads + + +class MockEncoder(IEncoder): + + def __init__(self, fits=True, encoded_span='', encoded_queue=''): + self.fits_bool = fits + self.encode_span = mock.Mock( + return_value=(encoded_span, len(encoded_span)), + ) + self.encode_queue = mock.Mock(return_value=encoded_queue) + + def fits(self, current_count, current_size, max_size, new_span): + assert isinstance(current_count, int) + assert isinstance(current_size, int) + assert isinstance(max_size, int) + assert isinstance(new_span, six.string_types) + + return self.fits_bool + + +def generate_list_of_spans(encoding): + zipkin_attrs = ZipkinAttrs( + trace_id=generate_random_64bit_string(), + span_id=generate_random_64bit_string(), + parent_span_id=generate_random_64bit_string(), + is_sampled=True, + flags=None, + ) + inner_span_id = generate_random_64bit_string() + transport_handler = MockTransportHandler() + # Let's hardcode the timestamp rather than call time.time() every time. + # The issue with time.time() is that the convertion to int of the + # returned float value * 1000000 is not precise and in the same test + # sometimes returns N and sometimes N+1. This ts value doesn't have that + # issue afaict, probably since it ends in zeros. + ts = 1538544126.115900 + with mock.patch('time.time', autospec=True) as mock_time: + # zipkin.py start, logging_helper.start, 3 x logging_helper.stop + # I don't understand why logging_helper.stop would run 3 times, but + # that's what I'm seeing in the test + mock_time.side_effect = iter([ts, ts, ts + 10, ts + 10, ts + 10]) + with zipkin.zipkin_span( + service_name='test_service_name', + span_name='test_span_name', + transport_handler=transport_handler, + binary_annotations={'some_key': 'some_value'}, + encoding=encoding, + zipkin_attrs=zipkin_attrs, + host='10.0.0.0', + port=8080, + kind=Kind.CLIENT, + ) as span: + with mock.patch.object( + zipkin, + 'generate_random_64bit_string', + return_value=inner_span_id, + ): + with zipkin.zipkin_span( + service_name='test_service_name', + span_name='inner_span', + timestamp=ts, + duration=5, + annotations={'ws': ts}, + ): + span.add_sa_binary_annotation( + 8888, + 'sa_service', + '2001:0db8:85a3:0000:0000:8a2e:0370:7334', + ) + + return transport_handler.get_payloads()[0], zipkin_attrs, inner_span_id, ts + + +def generate_single_thrift_span(): + trace_id = generate_random_128bit_string() + span_id = generate_random_64bit_string() + timestamp_s = round(time.time(), 3) + duration_s = 2.0 + host = thrift.create_endpoint(port=8000, service_name='host') + host.ipv4 = 2130706433 + span = thrift.create_span( + span_id=span_id, + parent_span_id=None, + trace_id=trace_id, + span_name='foo', + annotations=[ + thrift.create_annotation(1472470996199000, "cs", host), + ], + binary_annotations=[ + thrift.create_binary_annotation( + "key", + "value", + zipkin_core.AnnotationType.STRING, + host, + ), + ], + timestamp_s=timestamp_s, + duration_s=duration_s, + ) + + return thrift.span_to_bytes(span) diff --git a/tests/transport_test.py b/tests/transport_test.py index 012dccd..13afb64 100644 --- a/tests/transport_test.py +++ b/tests/transport_test.py @@ -1,6 +1,6 @@ import mock -from tests.conftest import MockTransportHandler +from tests.test_helpers import MockTransportHandler class TestBaseTransportHandler(object): diff --git a/tests/zipkin_test.py b/tests/zipkin_test.py index ac43495..5a42752 100644 --- a/tests/zipkin_test.py +++ b/tests/zipkin_test.py @@ -16,7 +16,7 @@ from py_zipkin.thread_local import get_zipkin_attrs from py_zipkin.util import generate_random_64bit_string from py_zipkin.zipkin import ZipkinAttrs -from tests.conftest import MockTransportHandler +from tests.test_helpers import MockTransportHandler @pytest.fixture diff --git a/tox.ini b/tox.ini index 871d6da..4cd734a 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = pre-commit, py27, py34, py35, py36, flake8 +envlist = pre-commit, py27, py34, py35, py36, pypy, pypy3, flake8 [testenv] deps = -rrequirements-dev.txt