diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 035d850d..3ee066da 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -20,10 +20,8 @@ import tornado.gen import tornado.ioloop import tornado.queues -import socket from tornado.concurrent import Future from .constants import DEFAULT_FLUSH_INTERVAL -from . import thrift from . import ioloop_util from .metrics import Metrics, LegacyMetricsFactory from .senders import UDPSender @@ -122,9 +120,6 @@ def __init__(self, channel, queue_capacity=100, batch_size=10, self.flush_interval = flush_interval or None self.io_loop.spawn_callback(self._consume_queue) - self._process_lock = Lock() - self._process = None - @staticmethod def fetch_io_loop(channel, sender): if channel: @@ -142,10 +137,7 @@ def _create_default_sender(self, channel): return sender def set_process(self, service_name, tags, max_length): - with self._process_lock: - self._process = thrift.make_process( - service_name=service_name, tags=tags, max_length=max_length, - ) + self._sender.set_process(service_name, tags, max_length) def report_span(self, span): # We should not be calling `queue.put_nowait()` from random threads, @@ -168,14 +160,14 @@ def _report_span_from_ioloop(self, span): @tornado.gen.coroutine def _consume_queue(self): - spans = [] stopped = False + while not stopped: - while len(spans) < self.batch_size: + while self._sender.span_count < self.batch_size: try: # using timeout allows periodic flush with smaller packet timeout = self.flush_interval + self.io_loop.time() \ - if self.flush_interval and spans else None + if self.flush_interval and self._sender.span_count else None span = yield self.queue.get(timeout=timeout) except tornado.gen.TimeoutError: break @@ -186,44 +178,24 @@ def _consume_queue(self): # don't return yet, submit accumulated spans first break else: - spans.append(span) - if spans: - yield self._submit(spans) - for _ in spans: + self._sender.append(span) + + if self._sender.span_count: + num_spans = self._sender.span_count + try: + yield self._sender.flush() + except Exception as exc: + self.metrics.reporter_failure(num_spans) + self.error_reporter.error(exc) + else: + self.metrics.reporter_success(num_spans) + + for _ in range(num_spans): self.queue.task_done() - spans = spans[:0] + self.metrics.reporter_queue_length(self.queue.qsize()) self.logger.info('Span publisher exited') - @tornado.gen.coroutine - def _submit(self, spans): - if not spans: - return - with self._process_lock: - process = self._process - if not process: - return - try: - batch = thrift.make_jaeger_batch(spans=spans, process=process) - yield self._send(batch) - self.metrics.reporter_success(len(spans)) - except socket.error as e: - self.metrics.reporter_failure(len(spans)) - self.error_reporter.error( - 'Failed to submit traces to jaeger-agent socket: %s', e) - except Exception as e: - self.metrics.reporter_failure(len(spans)) - self.error_reporter.error( - 'Failed to submit traces to jaeger-agent: %s', e) - - @tornado.gen.coroutine - def _send(self, batch): - """ - Send batch of spans out via thrift transport. Any exceptions thrown - will be caught above in the exception handler of _submit(). - """ - return self._sender.send(batch) - def close(self): """ Ensure that all spans from the queue are submitted. diff --git a/jaeger_client/senders.py b/jaeger_client/senders.py index 30eb5277..f88ddc35 100644 --- a/jaeger_client/senders.py +++ b/jaeger_client/senders.py @@ -13,9 +13,14 @@ # limitations under the License. from __future__ import absolute_import +import sys +import socket import logging +import tornado.gen +from six import reraise from threadloop import ThreadLoop +from . import thrift from .local_agent_net import LocalAgentSender from thrift.protocol import TCompactProtocol @@ -28,14 +33,49 @@ class Sender(object): - def __init__(self, host, port, io_loop=None): - self.host = host - self.port = port + def __init__(self, io_loop=None): + from threading import Lock self.io_loop = io_loop or self._create_new_thread_loop() - + self._process_lock = Lock() + self._process = None + self.spans = [] + + def append(self, span): + """Queue a span for subsequent submission calls to flush()""" + self.spans.append(span) + + @property + def span_count(self): + return len(self.spans) + + @tornado.gen.coroutine + def flush(self): + """Examine span and process state before yielding to _flush() for batching and transport.""" + if self.spans: + with self._process_lock: + process = self._process + if process: + try: + yield self._flush(self.spans, self._process) + finally: + self.spans = [] + + @tornado.gen.coroutine + def _flush(self, spans, process): + """Batch spans and invokes send(). Override with specific batching logic, if desired.""" + batch = thrift.make_jaeger_batch(spans=spans, process=process) + yield self.send(batch) + + @tornado.gen.coroutine def send(self, batch): raise NotImplementedError('This method should be implemented by subclasses') + def set_process(self, service_name, tags, max_length): + with self._process_lock: + self._process = thrift.make_process( + service_name=service_name, tags=tags, max_length=max_length, + ) + def _create_new_thread_loop(self): """ Create a daemonized thread that will run Tornado IOLoop. @@ -49,21 +89,26 @@ def _create_new_thread_loop(self): class UDPSender(Sender): def __init__(self, host, port, io_loop=None): - super(UDPSender, self).__init__( - host=host, - port=port, - io_loop=io_loop - ) + super(UDPSender, self).__init__(io_loop=io_loop) + self.host = host + self.port = port self.channel = self._create_local_agent_channel(self.io_loop) self.agent = Agent.Client(self.channel, self) + @tornado.gen.coroutine def send(self, batch): - """ Send batch of spans out via thrift transport. - - Any exceptions thrown will be caught by the caller. """ - - return self.agent.emitBatch(batch) + Send batch of spans out via thrift transport. + """ + try: + yield self.agent.emitBatch(batch) + except socket.error as e: + reraise(type(e), + type(e)('Failed to submit traces to jaeger-agent socket: {}'.format(e)), + sys.exc_info()[2]) + except Exception as e: + reraise(type(e), type(e)('Failed to submit traces to jaeger-agent: {}'.format(e)), + sys.exc_info()[2]) def _create_local_agent_channel(self, io_loop): """ diff --git a/tests/test_reporter.py b/tests/test_reporter.py index ce875467..ca767805 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -96,7 +96,7 @@ def test_composite_reporter(): class FakeSender(object): """ - Mock the _send() method of the reporter by capturing requests + Mock the send() method of the reporter's Sender by capturing requests and returning incomplete futures that can be completed from inside the test. """ @@ -158,7 +158,7 @@ def _new_reporter(batch_size, flush=None, queue_cap=100): queue_capacity=queue_cap) reporter.set_process('service', {}, max_length=0) sender = FakeSender() - reporter._send = sender + reporter._sender.send = sender return reporter, sender @tornado.gen.coroutine @@ -198,17 +198,12 @@ def test_submit_failure(self): reporter_failure_key = 'jaeger:reporter_spans.result_err' assert reporter_failure_key not in reporter.metrics_factory.counters - # simulate exception in send - reporter._send = mock.MagicMock(side_effect=ValueError()) + reporter._sender.send = mock.MagicMock(side_effect=ValueError()) reporter.report_span(self._new_span('1')) - yield self._wait_for( lambda: reporter_failure_key in reporter.metrics_factory.counters) assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key) - # silly test, for code coverage only - yield reporter._submit([]) - @gen_test def test_submit_queue_full_batch_size_1(self): reporter, sender = self._new_reporter(batch_size=1, queue_cap=1) @@ -275,7 +270,7 @@ def send(_): count[0] += 1 return future_result(True) - reporter._send = send + reporter._sender.send = send reporter.batch_size = 3 for i in range(10): reporter.report_span(self._new_span('%s' % i)) @@ -294,22 +289,6 @@ def send(_): class TestReporterUnit: - def test_reporter_calls_sender_correctly(self): - reporter = Reporter( - channel=None, - sender=mock.MagicMock(), - io_loop=IOLoop.current(), - batch_size=10, - flush_interval=None, - metrics_factory=FakeMetricsFactory(), - error_reporter=HardErrorReporter(), - queue_capacity=100 - ) - test_data = {'foo': 'bar'} - - reporter._send(test_data) - reporter._sender.send.assert_called_once_with(test_data) - @pytest.mark.parametrize( 'channel, sender, expected', [ diff --git a/tests/test_senders.py b/tests/test_senders.py index 41b5860d..25717187 100644 --- a/tests/test_senders.py +++ b/tests/test_senders.py @@ -13,23 +13,117 @@ # limitations under the License. from __future__ import print_function +import time +import socket +import collections import mock +import pytest from tornado import ioloop +from tornado import gen +from tornado.testing import AsyncTestCase, gen_test from jaeger_client import senders +from jaeger_client import Span, SpanContext from jaeger_client.local_agent_net import LocalAgentSender from jaeger_client.thrift_gen.agent import Agent +from jaeger_client.thrift_gen.jaeger import ttypes from thrift.protocol import TCompactProtocol def test_base_sender_create_io_loop_if_not_provided(): - sender = senders.Sender(host='mock', port=4242) + sender = senders.Sender() assert sender.io_loop is not None assert isinstance(sender.io_loop, ioloop.IOLoop) + +def test_base_sender_send_not_implemented(): + sender = senders.Sender() + with pytest.raises(NotImplementedError): + sender.send(1).result() + + +def test_base_sender_set_process_instantiate_jaeger_process(): + sender = senders.Sender() + sender.set_process('service', {}, max_length=0) + assert isinstance(sender._process, ttypes.Process) + assert sender._process.serviceName == 'service' + + +def test_base_sender_spanless_flush_is_noop(): + sender = senders.Sender() + flushed = sender.flush().result() + assert flushed is None + + +def test_base_sender_processless_flush_is_noop(): + sender = senders.Sender() + sender.spans.append('foo') + flushed = sender.flush().result() + assert flushed is None + + +class CustomException(Exception): + pass + + +class SenderFlushTest(AsyncTestCase): + + def span(self): + FakeTracer = collections.namedtuple('FakeTracer', ['ip_address', 'service_name']) + tracer = FakeTracer(ip_address='127.0.0.1', service_name='reporter_test') + ctx = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1) + span = Span(context=ctx, tracer=tracer, operation_name='foo') + span.start_time = time.time() + span.end_time = span.start_time + 0.001 # 1ms + return span + + @gen_test + def test_base_sender_flush_raises_exceptions(self): + sender = senders.Sender() + sender.set_process('service', {}, max_length=0) + + sender.spans = [self.span()] + + sender.send = mock.MagicMock(side_effect=CustomException('Failed to send batch.')) + assert sender.span_count == 1 + + try: + yield sender.flush() + except Exception as exc: + assert isinstance(exc, CustomException) + assert str(exc) == 'Failed to send batch.' + else: + assert False, "Didn't Raise" + assert sender.span_count == 0 + + @gen_test + def test_udp_sender_flush_reraises_exceptions(self): + exceptions = ((CustomException, 'Failed to send batch.', + 'Failed to submit traces to jaeger-agent: Failed to send batch.'), + (socket.error, 'Connection Failed', + 'Failed to submit traces to jaeger-agent socket: Connection Failed')) + for exception, value, expected_value in exceptions: + sender = senders.UDPSender(host='mock', port=4242) + sender.set_process('service', {}, max_length=0) + + sender.spans = [self.span()] + + sender.agent.emitBatch = mock.MagicMock(side_effect=exception(value)) + assert sender.span_count == 1 + + try: + yield sender.flush() + except Exception as exc: + assert isinstance(exc, exception) + assert str(exc) == expected_value + else: + assert False, "Didn't Raise" + assert sender.span_count == 0 + + def test_udp_sender_instantiate_thrift_agent(): sender = senders.UDPSender(host='mock', port=4242) @@ -46,7 +140,8 @@ def test_udp_sender_intantiate_local_agent_channel(): assert sender.channel.io_loop == sender.io_loop assert isinstance(sender.channel, LocalAgentSender) -def test_udp_sender_calls_agent_emitBatch_on_sending(): + +def test_udp_sender_calls_agent_emitBatch_on_send(): test_data = {'foo': 'bar'} sender = senders.UDPSender(host='mock', port=4242)