Skip to content

Commit

Permalink
Adopt Sender.append() and flush()
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Fitzpatrick <rmfitzpatrick@signalfx.com>
  • Loading branch information
Ryan Fitzpatrick committed Jul 16, 2018
1 parent 435b532 commit 0ac58e8
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 95 deletions.
63 changes: 17 additions & 46 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import tornado.gen
import tornado.ioloop
import tornado.queues
import socket
from tornado.concurrent import Future
from .constants import DEFAULT_FLUSH_INTERVAL
from . import thrift
from . import ioloop_util
from .metrics import Metrics, LegacyMetricsFactory
from .senders import UDPSender
Expand Down Expand Up @@ -122,9 +120,6 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
self.flush_interval = flush_interval or None
self.io_loop.spawn_callback(self._consume_queue)

self._process_lock = Lock()
self._process = None

@staticmethod
def fetch_io_loop(channel, sender):
if channel:
Expand All @@ -142,10 +137,7 @@ def _create_default_sender(self, channel):
return sender

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
service_name=service_name, tags=tags, max_length=max_length,
)
self._sender.set_process(service_name, tags, max_length)

def report_span(self, span):
# We should not be calling `queue.put_nowait()` from random threads,
Expand All @@ -168,14 +160,15 @@ def _report_span_from_ioloop(self, span):

@tornado.gen.coroutine
def _consume_queue(self):
spans = []
spans_appended = 0
stopped = False

while not stopped:
while len(spans) < self.batch_size:
while spans_appended < self.batch_size:
try:
# using timeout allows periodic flush with smaller packet
timeout = self.flush_interval + self.io_loop.time() \
if self.flush_interval and spans else None
if self.flush_interval and spans_appended else None
span = yield self.queue.get(timeout=timeout)
except tornado.gen.TimeoutError:
break
Expand All @@ -186,44 +179,22 @@ def _consume_queue(self):
# don't return yet, submit accumulated spans first
break
else:
spans.append(span)
if spans:
yield self._submit(spans)
for _ in spans:
self._sender.append(span)
spans_appended += 1

if spans_appended:
num_spans, exc, error_msg = yield self._sender.flush()
spans_appended = 0
if exc:
self.metrics.reporter_failure(num_spans)
self.error_reporter.error(error_msg, exc)
else:
self.metrics.reporter_success(num_spans)
for _ in range(num_spans):
self.queue.task_done()
spans = spans[:0]
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
return
with self._process_lock:
process = self._process
if not process:
return
try:
batch = thrift.make_jaeger_batch(spans=spans, process=process)
yield self._send(batch)
self.metrics.reporter_success(len(spans))
except socket.error as e:
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent socket: %s', e)
except Exception as e:
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent: %s', e)

@tornado.gen.coroutine
def _send(self, batch):
"""
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self._sender.send(batch)

def close(self):
"""
Ensure that all spans from the queue are submitted.
Expand Down
99 changes: 86 additions & 13 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
# limitations under the License.
from __future__ import absolute_import

import socket
import logging
import tornado.gen
from threadloop import ThreadLoop

from . import thrift
from .local_agent_net import LocalAgentSender
from thrift.protocol import TCompactProtocol

Expand All @@ -28,14 +31,71 @@


class Sender(object):
def __init__(self, host, port, io_loop=None):
self.host = host
self.port = port
def __init__(self, io_loop=None):
from threading import Lock
self.io_loop = io_loop or self._create_new_thread_loop()
self._process_lock = Lock()
self._process = None
self.spans = []

def append(self, span):
"""Queue a span for subsequent submission calls to flush()"""
self.spans.append(span)

@tornado.gen.coroutine
def flush(self):
"""
Examine span and process state before yielding to _flush() for batching and transport.
Returns a tuple for accounting the number of spans attempted to send along with any
raised Exception and error message info, if any, yielded from _flush().
"""
if not self.spans:
return
with self._process_lock:
process = self._process
if not process:
return

num_spans, exc, error_msg = yield self._flush(self.spans, self._process)

self.spans = []

raise tornado.gen.Return((num_spans, exc, error_msg))

@tornado.gen.coroutine
def _flush(self, spans, process):
"""
Batch spans and invokes send() while calculating spans sent and providing all exception
handling. Override with specific batching logic and error messaging, if desired.
"""
num_spans = len(spans)
exc = None
error_msg = ''

batch = thrift.make_jaeger_batch(spans=spans, process=process)

try:
yield self.send(batch)
except Exception as e:
exc = e
error_msg = 'Failed to send batch: %s'

raise tornado.gen.Return((num_spans, exc, error_msg))

@tornado.gen.coroutine
def send(self, batch):
"""
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught in the flush exception handler of _flush().
"""
raise NotImplementedError('This method should be implemented by subclasses')

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
service_name=service_name, tags=tags, max_length=max_length,
)

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
Expand All @@ -49,20 +109,14 @@ def _create_new_thread_loop(self):

class UDPSender(Sender):
def __init__(self, host, port, io_loop=None):
super(UDPSender, self).__init__(
host=host,
port=port,
io_loop=io_loop
)
super(UDPSender, self).__init__(io_loop=io_loop)
self.host = host
self.port = port
self.channel = self._create_local_agent_channel(self.io_loop)
self.agent = Agent.Client(self.channel, self)

@tornado.gen.coroutine
def send(self, batch):
""" Send batch of spans out via thrift transport.
Any exceptions thrown will be caught by the caller.
"""

return self.agent.emitBatch(batch)

def _create_local_agent_channel(self, io_loop):
Expand All @@ -81,6 +135,25 @@ def _create_local_agent_channel(self, io_loop):
io_loop=io_loop
)

@tornado.gen.coroutine
def _flush(self, spans, process):
num_spans = len(spans)
exc = None
error_msg = ''

batch = thrift.make_jaeger_batch(spans=spans, process=process)

try:
yield self.send(batch)
except socket.error as e:
exc = e
error_msg = 'Failed to submit traces to jaeger-agent socket: %s'
except Exception as e:
exc = e
error_msg = 'Failed to submit traces to jaeger-agent: %s'

raise tornado.gen.Return((num_spans, exc, error_msg))

# method for protocol factory
def getProtocol(self, transport):
"""
Expand Down
52 changes: 18 additions & 34 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from six.moves import range

import socket
import logging
import time
import collections
Expand All @@ -28,6 +29,7 @@
from jaeger_client import Span, SpanContext
from jaeger_client.metrics import LegacyMetricsFactory, Metrics
from jaeger_client.utils import ErrorReporter
from jaeger_client import thrift
from tornado.ioloop import IOLoop
from tornado.testing import AsyncTestCase, gen_test
from jaeger_client.reporter import Reporter
Expand Down Expand Up @@ -96,7 +98,7 @@ def test_composite_reporter():

class FakeSender(object):
"""
Mock the _send() method of the reporter by capturing requests
Mock the send() method of the reporter's Sender by capturing requests
and returning incomplete futures that can be completed from
inside the test.
"""
Expand Down Expand Up @@ -158,7 +160,7 @@ def _new_reporter(batch_size, flush=None, queue_cap=100):
queue_capacity=queue_cap)
reporter.set_process('service', {}, max_length=0)
sender = FakeSender()
reporter._send = sender
reporter._sender.send = sender
return reporter, sender

@tornado.gen.coroutine
Expand Down Expand Up @@ -190,24 +192,22 @@ def test_submit_batch_size_1(self):
assert 1 == reporter.metrics_factory.counters[span_dropped_key]

@gen_test
def test_submit_failure(self):
reporter, sender = self._new_reporter(batch_size=1)
reporter.error_reporter = ErrorReporter(
metrics=Metrics(), logger=logging.getLogger())

reporter_failure_key = 'jaeger:reporter_spans.result_err'
assert reporter_failure_key not in reporter.metrics_factory.counters
def test_submit_failure_with_default_sender(self):
for side_effect in (ValueError(), socket.error()):
reporter, sender = self._new_reporter(batch_size=1)
reporter.error_reporter = ErrorReporter(
metrics=Metrics(), logger=logging.getLogger())

# simulate exception in send
reporter._send = mock.MagicMock(side_effect=ValueError())
reporter.report_span(self._new_span('1'))
reporter_failure_key = 'jaeger:reporter_spans.result_err'
assert reporter_failure_key not in reporter.metrics_factory.counters

yield self._wait_for(
lambda: reporter_failure_key in reporter.metrics_factory.counters)
assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key)
# simulate exception in send
reporter._sender.send = mock.MagicMock(side_effect=side_effect)
reporter.report_span(self._new_span('1'))

# silly test, for code coverage only
yield reporter._submit([])
yield self._wait_for(
lambda: reporter_failure_key in reporter.metrics_factory.counters)
assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key)

@gen_test
def test_submit_queue_full_batch_size_1(self):
Expand Down Expand Up @@ -275,7 +275,7 @@ def send(_):
count[0] += 1
return future_result(True)

reporter._send = send
reporter._sender.send = send
reporter.batch_size = 3
for i in range(10):
reporter.report_span(self._new_span('%s' % i))
Expand All @@ -294,22 +294,6 @@ def send(_):


class TestReporterUnit:
def test_reporter_calls_sender_correctly(self):
reporter = Reporter(
channel=None,
sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=10,
flush_interval=None,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=100
)
test_data = {'foo': 'bar'}

reporter._send(test_data)
reporter._sender.send.assert_called_once_with(test_data)

@pytest.mark.parametrize(
'channel, sender, expected',
[
Expand Down
Loading

0 comments on commit 0ac58e8

Please sign in to comment.