Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt Sender.append() and flush() #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 18 additions & 46 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import tornado.gen
import tornado.ioloop
import tornado.queues
import socket
from tornado.concurrent import Future
from .constants import DEFAULT_FLUSH_INTERVAL
from . import thrift
from . import ioloop_util
from .metrics import Metrics, LegacyMetricsFactory
from .senders import UDPSender
Expand Down Expand Up @@ -122,9 +120,6 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
self.flush_interval = flush_interval or None
self.io_loop.spawn_callback(self._consume_queue)

self._process_lock = Lock()
self._process = None

@staticmethod
def fetch_io_loop(channel, sender):
if channel:
Expand All @@ -142,10 +137,7 @@ def _create_default_sender(self, channel):
return sender

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
service_name=service_name, tags=tags, max_length=max_length,
)
self._sender.set_process(service_name, tags, max_length)

def report_span(self, span):
# We should not be calling `queue.put_nowait()` from random threads,
Expand All @@ -168,14 +160,14 @@ def _report_span_from_ioloop(self, span):

@tornado.gen.coroutine
def _consume_queue(self):
spans = []
stopped = False

while not stopped:
while len(spans) < self.batch_size:
while self._sender.span_count < self.batch_size:
try:
# using timeout allows periodic flush with smaller packet
timeout = self.flush_interval + self.io_loop.time() \
if self.flush_interval and spans else None
if self.flush_interval and self._sender.span_count else None
span = yield self.queue.get(timeout=timeout)
except tornado.gen.TimeoutError:
break
Expand All @@ -186,44 +178,24 @@ def _consume_queue(self):
# don't return yet, submit accumulated spans first
break
else:
spans.append(span)
if spans:
yield self._submit(spans)
for _ in spans:
self._sender.append(span)

if self._sender.span_count:
num_spans = self._sender.span_count
try:
yield self._sender.flush()
except Exception as exc:
self.metrics.reporter_failure(num_spans)
self.error_reporter.error(exc)
else:
self.metrics.reporter_success(num_spans)

for _ in range(num_spans):
self.queue.task_done()
spans = spans[:0]

self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exited')

@tornado.gen.coroutine
def _submit(self, spans):
if not spans:
return
with self._process_lock:
process = self._process
if not process:
return
try:
batch = thrift.make_jaeger_batch(spans=spans, process=process)
yield self._send(batch)
self.metrics.reporter_success(len(spans))
except socket.error as e:
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent socket: %s', e)
except Exception as e:
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent: %s', e)

@tornado.gen.coroutine
def _send(self, batch):
"""
Send batch of spans out via thrift transport. Any exceptions thrown
will be caught above in the exception handler of _submit().
"""
return self._sender.send(batch)

def close(self):
"""
Ensure that all spans from the queue are submitted.
Expand Down
73 changes: 59 additions & 14 deletions jaeger_client/senders.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
# limitations under the License.
from __future__ import absolute_import

import sys
import socket
import logging
import tornado.gen
from six import reraise
from threadloop import ThreadLoop

from . import thrift
from .local_agent_net import LocalAgentSender
from thrift.protocol import TCompactProtocol

Expand All @@ -28,14 +33,49 @@


class Sender(object):
def __init__(self, host, port, io_loop=None):
self.host = host
self.port = port
def __init__(self, io_loop=None):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host and port are likely to be needed in every sender implementations. I would keep them in the base class

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my early HTTPSender implementation I found only a url is necessary, and constructing one via host, port, and trace endpoint seems overkill (other jaeger clients use JAEGER_ENDPOINT env var to specify the collector location).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's see if jaeger maintainers are ok with this interface

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean before merging into your PR branch?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nono, I mean after merging in my PR branch. The only one remaining issue is the one related to tornado.gen.Return, I replied to you there yesterday

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed that comment.

from threading import Lock
self.io_loop = io_loop or self._create_new_thread_loop()

self._process_lock = Lock()
self._process = None
self.spans = []

def append(self, span):
"""Queue a span for subsequent submission calls to flush()"""
self.spans.append(span)

@property
def span_count(self):
return len(self.spans)

@tornado.gen.coroutine
def flush(self):
"""Examine span and process state before yielding to _flush() for batching and transport."""
if self.spans:
with self._process_lock:
process = self._process
if process:
try:
yield self._flush(self.spans, self._process)
finally:
self.spans = []

@tornado.gen.coroutine
def _flush(self, spans, process):
"""Batch spans and invokes send(). Override with specific batching logic, if desired."""
batch = thrift.make_jaeger_batch(spans=spans, process=process)
yield self.send(batch)

@tornado.gen.coroutine
def send(self, batch):
raise NotImplementedError('This method should be implemented by subclasses')

def set_process(self, service_name, tags, max_length):
with self._process_lock:
self._process = thrift.make_process(
service_name=service_name, tags=tags, max_length=max_length,
)

def _create_new_thread_loop(self):
"""
Create a daemonized thread that will run Tornado IOLoop.
Expand All @@ -49,21 +89,26 @@ def _create_new_thread_loop(self):

class UDPSender(Sender):
def __init__(self, host, port, io_loop=None):
super(UDPSender, self).__init__(
host=host,
port=port,
io_loop=io_loop
)
super(UDPSender, self).__init__(io_loop=io_loop)
self.host = host
self.port = port
self.channel = self._create_local_agent_channel(self.io_loop)
self.agent = Agent.Client(self.channel, self)

@tornado.gen.coroutine
def send(self, batch):
""" Send batch of spans out via thrift transport.

Any exceptions thrown will be caught by the caller.
"""

return self.agent.emitBatch(batch)
Send batch of spans out via thrift transport.
"""
try:
yield self.agent.emitBatch(batch)
except socket.error as e:
reraise(type(e),
type(e)('Failed to submit traces to jaeger-agent socket: {}'.format(e)),
sys.exc_info()[2])
except Exception as e:
reraise(type(e), type(e)('Failed to submit traces to jaeger-agent: {}'.format(e)),
sys.exc_info()[2])

def _create_local_agent_channel(self, io_loop):
"""
Expand Down
29 changes: 4 additions & 25 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_composite_reporter():

class FakeSender(object):
"""
Mock the _send() method of the reporter by capturing requests
Mock the send() method of the reporter's Sender by capturing requests
and returning incomplete futures that can be completed from
inside the test.
"""
Expand Down Expand Up @@ -158,7 +158,7 @@ def _new_reporter(batch_size, flush=None, queue_cap=100):
queue_capacity=queue_cap)
reporter.set_process('service', {}, max_length=0)
sender = FakeSender()
reporter._send = sender
reporter._sender.send = sender
return reporter, sender

@tornado.gen.coroutine
Expand Down Expand Up @@ -198,17 +198,12 @@ def test_submit_failure(self):
reporter_failure_key = 'jaeger:reporter_spans.result_err'
assert reporter_failure_key not in reporter.metrics_factory.counters

# simulate exception in send
reporter._send = mock.MagicMock(side_effect=ValueError())
reporter._sender.send = mock.MagicMock(side_effect=ValueError())
reporter.report_span(self._new_span('1'))

yield self._wait_for(
lambda: reporter_failure_key in reporter.metrics_factory.counters)
assert 1 == reporter.metrics_factory.counters.get(reporter_failure_key)

# silly test, for code coverage only
yield reporter._submit([])

@gen_test
def test_submit_queue_full_batch_size_1(self):
reporter, sender = self._new_reporter(batch_size=1, queue_cap=1)
Expand Down Expand Up @@ -275,7 +270,7 @@ def send(_):
count[0] += 1
return future_result(True)

reporter._send = send
reporter._sender.send = send
reporter.batch_size = 3
for i in range(10):
reporter.report_span(self._new_span('%s' % i))
Expand All @@ -294,22 +289,6 @@ def send(_):


class TestReporterUnit:
def test_reporter_calls_sender_correctly(self):
reporter = Reporter(
channel=None,
sender=mock.MagicMock(),
io_loop=IOLoop.current(),
batch_size=10,
flush_interval=None,
metrics_factory=FakeMetricsFactory(),
error_reporter=HardErrorReporter(),
queue_capacity=100
)
test_data = {'foo': 'bar'}

reporter._send(test_data)
reporter._sender.send.assert_called_once_with(test_data)

@pytest.mark.parametrize(
'channel, sender, expected',
[
Expand Down
Loading