Skip to content

Commit

Permalink
Use Sender.span_count for initiating flush
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Fitzpatrick <rmfitzpatrick@signalfx.com>
  • Loading branch information
Ryan Fitzpatrick committed Jul 19, 2018
1 parent 0ac58e8 commit 29ead94
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 115 deletions.
19 changes: 10 additions & 9 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,14 @@ def _report_span_from_ioloop(self, span):

@tornado.gen.coroutine
def _consume_queue(self):
spans_appended = 0
stopped = False

while not stopped:
while spans_appended < self.batch_size:
while self._sender.span_count < self.batch_size:
try:
# using timeout allows periodic flush with smaller packet
timeout = self.flush_interval + self.io_loop.time() \
if self.flush_interval and spans_appended else None
if self.flush_interval and self._sender.span_count else None
span = yield self.queue.get(timeout=timeout)
except tornado.gen.TimeoutError:
break
Expand All @@ -180,18 +179,20 @@ def _consume_queue(self):
break
else:
self._sender.append(span)
spans_appended += 1

if spans_appended:
num_spans, exc, error_msg = yield self._sender.flush()
spans_appended = 0
if exc:
if self._sender.span_count:
num_spans = self._sender.span_count
try:
yield self._sender.flush()
except Exception as exc:
self.metrics.reporter_failure(num_spans)
self.error_reporter.error(error_msg, exc)
self.error_reporter.error(exc)
else:
self.metrics.reporter_success(num_spans)

for _ in range(num_spans):
self.queue.task_done()

self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

Expand Down
86 changes: 29 additions & 57 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.
from __future__ import absolute_import

import sys
import socket
import logging
import tornado.gen
from six import reraise
from threadloop import ThreadLoop

from . import thrift
Expand All @@ -42,52 +44,30 @@ def append(self, span):
"""Queue a span for subsequent submission calls to flush()"""
self.spans.append(span)

@property
def span_count(self):
return len(self.spans)

@tornado.gen.coroutine
def flush(self):
"""
Examine span and process state before yielding to _flush() for batching and transport.
Returns a tuple for accounting the number of spans attempted to send along with any
raised Exception and error message info, if any, yielded from _flush().
"""
if not self.spans:
return
with self._process_lock:
process = self._process
if not process:
return

num_spans, exc, error_msg = yield self._flush(self.spans, self._process)

self.spans = []

raise tornado.gen.Return((num_spans, exc, error_msg))
"""Examine span and process state before yielding to _flush() for batching and transport."""
if self.spans:
with self._process_lock:
process = self._process
if process:
try:
yield self._flush(self.spans, self._process)
finally:
self.spans = []

@tornado.gen.coroutine
def _flush(self, spans, process):
"""
Batch spans and invokes send() while calculating spans sent and providing all exception
handling. Override with specific batching logic and error messaging, if desired.
"""
num_spans = len(spans)
exc = None
error_msg = ''

"""Batch spans and invokes send(). Override with specific batching logic, if desired."""
batch = thrift.make_jaeger_batch(spans=spans, process=process)

try:
yield self.send(batch)
except Exception as e:
exc = e
error_msg = 'Failed to send batch: %s'

raise tornado.gen.Return((num_spans, exc, error_msg))
yield self.send(batch)

@tornado.gen.coroutine
def send(self, batch):
"""
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught in the flush exception handler of _flush().
"""
raise NotImplementedError('This method should be implemented by subclasses')

def set_process(self, service_name, tags, max_length):
Expand Down Expand Up @@ -117,7 +97,18 @@ def __init__(self, host, port, io_loop=None):

@tornado.gen.coroutine
def send(self, batch):
return self.agent.emitBatch(batch)
"""
Send batch of spans out via thrift transport.
"""
try:
yield self.agent.emitBatch(batch)
except socket.error as e:
reraise(type(e),
type(e)('Failed to submit traces to jaeger-agent socket: {}'.format(e)),
sys.exc_info()[2])
except Exception as e:
reraise(type(e), type(e)('Failed to submit traces to jaeger-agent: {}'.format(e)),
sys.exc_info()[2])

def _create_local_agent_channel(self, io_loop):
"""
Expand All @@ -135,25 +126,6 @@ def _create_local_agent_channel(self, io_loop):
io_loop=io_loop
)

@tornado.gen.coroutine
def _flush(self, spans, process):
num_spans = len(spans)
exc = None
error_msg = ''

batch = thrift.make_jaeger_batch(spans=spans, process=process)

try:
yield self.send(batch)
except socket.error as e:
exc = e
error_msg = 'Failed to submit traces to jaeger-agent socket: %s'
except Exception as e:
exc = e
error_msg = 'Failed to submit traces to jaeger-agent: %s'

raise tornado.gen.Return((num_spans, exc, error_msg))

# method for protocol factory
def getProtocol(self, transport):
"""
Expand Down
31 changes: 13 additions & 18 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from six.moves import range

import socket
import logging
import time
import collections
Expand All @@ -29,7 +28,6 @@
from jaeger_client import Span, SpanContext
from jaeger_client.metrics import LegacyMetricsFactory, Metrics
from jaeger_client.utils import ErrorReporter
from jaeger_client import thrift
from tornado.ioloop import IOLoop
from tornado.testing import AsyncTestCase, gen_test
from jaeger_client.reporter import Reporter
Expand Down Expand Up @@ -192,22 +190,19 @@ def test_submit_batch_size_1(self):
assert 1 == reporter.metrics_factory.counters[span_dropped_key]

@gen_test
def test_submit_failure_with_default_sender(self):
for side_effect in (ValueError(), socket.error()):
reporter, sender = self._new_reporter(batch_size=1)
reporter.error_reporter = ErrorReporter(
metrics=Metrics(), logger=logging.getLogger())

reporter_failure_key = 'jaeger:reporter_spans.result_err'
assert reporter_failure_key not in reporter.metrics_factory.counters

# simulate exception in send
reporter._sender.send = mock.MagicMock(side_effect=side_effect)
reporter.report_span(self._new_span('1'))

yield self._wait_for(
lambda: reporter_failure_key in reporter.metrics_factory.counters)
assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key)
def test_submit_failure(self):
reporter, sender = self._new_reporter(batch_size=1)
reporter.error_reporter = ErrorReporter(
metrics=Metrics(), logger=logging.getLogger())

reporter_failure_key = 'jaeger:reporter_spans.result_err'
assert reporter_failure_key not in reporter.metrics_factory.counters

reporter._sender.send = mock.MagicMock(side_effect=ValueError())
reporter.report_span(self._new_span('1'))
yield self._wait_for(
lambda: reporter_failure_key in reporter.metrics_factory.counters)
assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key)

@gen_test
def test_submit_queue_full_batch_size_1(self):
Expand Down
76 changes: 45 additions & 31 deletions tests/test_senders.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import print_function

import time
import socket
import collections

import mock
Expand Down Expand Up @@ -54,60 +55,73 @@ def test_base_sender_set_process_instantiate_jaeger_process():
def test_base_sender_spanless_flush_is_noop():
sender = senders.Sender()
flushed = sender.flush().result()
assert flushed is None
assert flushed == None


def test_base_sender_processless_flush_is_noop():
sender = senders.Sender()
sender.spans.append('foo')
flushed = sender.flush().result()
assert flushed is None
assert flushed == None


class CustomException(Exception):
pass


class SenderFlushTest(AsyncTestCase):
@gen_test
def test_base_sender_flush_yields_exceptions(self):

def span(self):
FakeTracer = collections.namedtuple('FakeTracer', ['ip_address', 'service_name'])
tracer = FakeTracer(ip_address='127.0.0.1', service_name='reporter_test')
sender = senders.Sender()
sender.set_process('service', {}, max_length=0)

ctx = SpanContext(trace_id=1, span_id=1, parent_id=None, flags=1)
span = Span(context=ctx, tracer=tracer, operation_name='foo')
span.start_time = time.time()
span.end_time = span.start_time + 0.001 # 1ms
sender.spans = [span]

sender.send = mock.MagicMock(side_effect=CustomException)
num_spans, exc, error_msg = yield sender.flush()
assert num_spans == 1
assert isinstance(exc, CustomException)
assert error_msg == 'Failed to send batch: %s'
return span

@gen_test
def test_flush_batcher_accounting_information_yielded_from_flush(self):
@gen.coroutine
def stubbed_send(self, batch):
pass

@gen.coroutine
def stubbed__flush(self, spans, process):
raise gen.Return((100, CustomException(), 'SomeMessage'))

def test_base_sender_flush_raises_exceptions(self):
sender = senders.Sender()
sender.set_process('service', {}, max_length=0)
sender.spans = ['foo', 'bar']
sender._flush = stubbed__flush.__get__(sender)
sender.send = stubbed_send.__get__(sender)

num_spans, exc, error_msg = yield sender.flush()
assert num_spans == 100
assert isinstance(exc, CustomException)
assert error_msg == 'SomeMessage'

sender.spans = [self.span()]

sender.send = mock.MagicMock(side_effect=CustomException('Failed to send batch.'))
assert sender.span_count == 1

try:
yield sender.flush()
except Exception as exc:
assert isinstance(exc, CustomException)
assert str(exc) == 'Failed to send batch.'
else:
assert False, "Didn't Raise"
assert sender.span_count == 0

@gen_test
def test_udp_sender_flush_reraises_exceptions(self):
exceptions = ((CustomException, 'Failed to send batch.',
'Failed to submit traces to jaeger-agent: Failed to send batch.'),
(socket.error, 'Connection Failed',
'Failed to submit traces to jaeger-agent socket: Connection Failed'))
for exception, value, expected_value in exceptions:
sender = senders.UDPSender(host='mock', port=4242)
sender.set_process('service', {}, max_length=0)

sender.spans = [self.span()]

sender.agent.emitBatch = mock.MagicMock(side_effect=exception(value))
assert sender.span_count == 1

try:
yield sender.flush()
except Exception as exc:
assert isinstance(exc, exception)
assert str(exc) == expected_value
else:
assert False, "Didn't Raise"
assert sender.span_count == 0


def test_udp_sender_instantiate_thrift_agent():
Expand Down

0 comments on commit 29ead94

Please sign in to comment.