Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ocagent stats exporter. #617

Merged
merged 4 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ Trace Exporter
Stats Exporter
--------------

- `OCAgent`_
- `Prometheus`_
- `Stackdriver`_

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
import unittest

import mock
import grpc

from opencensus.ext.google_cloud_clientlibs import trace


class Test_google_cloud_clientlibs_trace(unittest.TestCase):
def setUp(self):
self._insecure_channel_func = getattr(grpc, 'insecure_channel')

def tearDown(self):
setattr(grpc, 'insecure_channel', self._insecure_channel_func)

def test_trace_integration(self):
mock_trace_grpc = mock.Mock()
mock_trace_http = mock.Mock()
Expand Down
1 change: 1 addition & 0 deletions contrib/opencensus-ext-grpc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Create WrappedResponseIterator for intercepted bi-directional rpc stream.

## 0.1.1
Released 2019-04-08
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,9 @@ def intercept_unary_stream(
request_iterator=iter((request,)),
grpc_type=oc_grpc.UNARY_STREAM)

response_it = continuation(
new_details,
next(new_request_iterator))
response_it = grpc_utils.wrap_iter_with_message_events(
request_or_response_iter=response_it,
span=current_span,
message_event_type=time_event.Type.RECEIVED
)
response_it = grpc_utils.wrap_iter_with_end_span(response_it)

return response_it
return grpc_utils.WrappedResponseIterator(
continuation(new_details, next(new_request_iterator)),
current_span)

def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
Expand Down Expand Up @@ -225,17 +217,8 @@ def intercept_stream_stream(
request_iterator=request_iterator,
grpc_type=oc_grpc.STREAM_STREAM)

response_it = continuation(
new_details,
new_request_iterator)
response_it = grpc_utils.wrap_iter_with_message_events(
request_or_response_iter=response_it,
span=current_span,
message_event_type=time_event.Type.RECEIVED
)
response_it = grpc_utils.wrap_iter_with_end_span(response_it)

return response_it
return grpc_utils.WrappedResponseIterator(
continuation(new_details, new_request_iterator), current_span)


def _get_span_name(client_call_details):
Expand Down
94 changes: 93 additions & 1 deletion contrib/opencensus-ext-grpc/opencensus/ext/grpc/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from datetime import datetime

from opencensus.trace import time_event
from grpc.framework.foundation import future
from grpc.framework.interfaces.face import face
from opencensus.trace import execution_context
from opencensus.trace import time_event


def add_message_event(proto_message, span, message_event_type, message_id=1):
Expand Down Expand Up @@ -43,3 +45,93 @@ def wrap_iter_with_end_span(response_iter):
for response in response_iter:
yield response
execution_context.get_opencensus_tracer().end_span()


class WrappedResponseIterator(future.Future, face.Call):
"""Wraps the rpc response iterator.

The grpc.StreamStreamClientInterceptor abstract class states stream
interceptor method should return an object that's both a call (implementing
the response iterator) and a future. Thus, this class is a thin wrapper
around the rpc response to provide the opencensus extension.

:type iterator: (future.Future, face.Call)
:param iterator: rpc response iterator

:type span: opencensus.trace.Span
:param span: rpc span
"""
def __init__(self, iterator, span):
self._iterator = iterator
self._span = span

self._messages_received = 0

def add_done_callback(self, fn):
self._iterator.add_done_callback(lambda ignored_callback: fn(self))

def __iter__(self):
return self

def __next__(self):
try:
message = next(self._iterator)
except StopIteration:
execution_context.get_opencensus_tracer().end_span()
raise

self._messages_received += 1
add_message_event(
proto_message=message,
span=self._span,
message_event_type=time_event.Type.RECEIVED,
message_id=self._messages_received)
return message

def next(self):
return self.__next__()

def cancel(self):
return self._iterator.cancel()

def is_active(self):
return self._iterator.is_active()

def cancelled(self):
raise NotImplementedError() # pragma: NO COVER

def running(self):
raise NotImplementedError() # pragma: NO COVER

def done(self):
raise NotImplementedError() # pragma: NO COVER

def result(self, timeout=None):
raise NotImplementedError() # pragma: NO COVER

def exception(self, timeout=None):
raise NotImplementedError() # pragma: NO COVER

def traceback(self, timeout=None):
raise NotImplementedError() # pragma: NO COVER

def initial_metadata(self):
raise NotImplementedError() # pragma: NO COVER

def terminal_metadata(self):
raise NotImplementedError() # pragma: NO COVER

def code(self):
raise NotImplementedError() # pragma: NO COVER

def details(self):
raise NotImplementedError() # pragma: NO COVER

def time_remaining(self):
raise NotImplementedError() # pragma: NO COVER

def add_abortion_callback(self, abortion_callback):
raise NotImplementedError() # pragma: NO COVER

def protocol_context(self):
raise NotImplementedError() # pragma: NO COVER
110 changes: 109 additions & 1 deletion contrib/opencensus-ext-grpc/tests/test_client_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import mock
import threading
import unittest

import mock
from google.api_core import bidi
from google.protobuf import proto_builder
from grpc.framework.foundation import logging_pool
import grpc

from opencensus.ext.grpc import client_interceptor
from opencensus.trace import execution_context
Expand Down Expand Up @@ -282,6 +288,108 @@ def test_intercept_stream_stream_not_trace(self):
self.assertFalse(mock_tracer.end_span.called)


class TestGrpcInterface(unittest.TestCase):

def setUp(self):
self._server = _start_server()
self._port = self._server.add_insecure_port('[::]:0')
self._channel = grpc.insecure_channel('localhost:%d' % self._port)

def tearDown(self):
self._server.stop(None)
self._channel.close()

def _intercepted_channel(self, tracer=None):
return grpc.intercept_channel(
self._channel,
client_interceptor.OpenCensusClientInterceptor(tracer=tracer))

def test_bidi_rpc_stream(self):
event = threading.Event()

def _helper(request_iterator, context):
counter = 0
for _ in request_iterator:
counter += 1
if counter == 2:
event.set()
yield

self._server.add_generic_rpc_handlers(
(StreamStreamRpcHandler(_helper),))
self._server.start()

rpc = bidi.BidiRpc(
self._intercepted_channel().stream_stream(
'', EmptyMessage.SerializeToString),
initial_request=EmptyMessage())
done_event = threading.Event()
rpc.add_done_callback(lambda _: done_event.set())

rpc.open()
rpc.send(EmptyMessage())
self.assertTrue(event.wait(timeout=1))
rpc.close()
self.assertTrue(done_event.wait(timeout=1))

@mock.patch('opencensus.trace.execution_context.get_opencensus_tracer')
def test_close_span_on_done(self, mock_tracer):
def _helper(request_iterator, context):
for _ in request_iterator:
yield EmptyMessage()
yield

self._server.add_generic_rpc_handlers(
(StreamStreamRpcHandler(_helper), ))
self._server.start()

mock_tracer.return_value = mock_tracer
rpc = self._intercepted_channel(NoopTracer()).stream_stream(
method='',
request_serializer=EmptyMessage.SerializeToString,
response_deserializer=EmptyMessage.FromString)(iter(
[EmptyMessage()]))

for resp in rpc:
pass

self.assertEqual(mock_tracer.end_span.call_count, 1)


EmptyMessage = proto_builder.MakeSimpleProtoClass(
collections.OrderedDict([]),
full_name='tests.test_client_interceptor.EmptyMessage')


def _start_server():
"""Starts an insecure grpc server."""
return grpc.server(logging_pool.pool(max_workers=1),
options=(('grpc.so_reuseport', 0), ))


class StreamStreamMethodHandler(grpc.RpcMethodHandler):

def __init__(self, stream_handler_func):
self.request_streaming = True
self.response_streaming = True
self.request_deserializer = None
self.response_serializer = EmptyMessage.SerializeToString
self.unary_unary = None
self.unary_stream = None
self.stream_unary = None
self.stream_stream = stream_handler_func


class StreamStreamRpcHandler(grpc.GenericRpcHandler):

def __init__(self, stream_stream_handler):
self._stream_stream_handler = stream_stream_handler

def service(self, handler_call_details):
resp = StreamStreamMethodHandler(self._stream_stream_handler)
return resp


class MockTracer(object):
def __init__(self, current_span):
self.current_span = current_span
Expand Down
12 changes: 12 additions & 0 deletions contrib/opencensus-ext-grpc/tests/test_server_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from google.rpc import code_pb2

from opencensus.ext.grpc import server_interceptor
from opencensus.ext.grpc import utils as grpc_utils
from opencensus.trace import execution_context
from opencensus.trace import span as span_module

Expand Down Expand Up @@ -149,6 +150,17 @@ def test_intercept_handler_exception(self):
self.assertEqual(current_span.status.code, code_pb2.UNKNOWN)
self.assertEqual(current_span.status.message, 'Test')

@mock.patch(
'opencensus.trace.execution_context.get_opencensus_tracer')
def test_resp_streaming_span_end(self, mock_tracer):
mock_tracer.return_value = mock_tracer

it = grpc_utils.wrap_iter_with_end_span(iter(['test']))
for i in it:
pass

self.assertEqual(mock_tracer.end_span.call_count, 1)

def test__wrap_rpc_behavior_none(self):
new_handler = server_interceptor._wrap_rpc_behavior(None, lambda: None)
self.assertEqual(new_handler, None)
Expand Down
1 change: 1 addition & 0 deletions contrib/opencensus-ext-ocagent/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## Unreleased
- Add stats exporter

## 0.2.0
Released 2019-04-08
Expand Down
11 changes: 9 additions & 2 deletions contrib/opencensus-ext-ocagent/README.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
OpenCensus OC-Agent Trace Exporter
OpenCensus OC-Agent Exporter
============================================================================

|pypi|
Expand All @@ -16,6 +16,13 @@ Installation
Usage
-----

Stats
~~~~~

.. code:: python

# TBD
from opencensus.ext.ocagent import stats_exporter as ocagent_stats_exporter

ocagent_stats_exporter.new_stats_exporter(
service_name='service_name',
endpoint='localhost:55678')