Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed May 10, 2024
1 parent 4c6247b commit b550028
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 11 deletions.
35 changes: 35 additions & 0 deletions tests/contrib/grpc_aio/hellostreamingworld_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests/contrib/grpc_aio/hellostreamingworld_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional

DESCRIPTOR: _descriptor.FileDescriptor

class HelloReply(_message.Message):
__slots__ = ["message"]
MESSAGE_FIELD_NUMBER: _ClassVar[int]
message: str
def __init__(self, message: _Optional[str] = ...) -> None: ...

class HelloRequest(_message.Message):
__slots__ = ["name", "num_greetings"]
NAME_FIELD_NUMBER: _ClassVar[int]
NUM_GREETINGS_FIELD_NUMBER: _ClassVar[int]
name: str
num_greetings: str
def __init__(self, name: _Optional[str] = ..., num_greetings: _Optional[str] = ...) -> None: ...
77 changes: 77 additions & 0 deletions tests/contrib/grpc_aio/hellostreamingworld_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from tests.contrib.grpc_aio import hellostreamingworld_pb2 as hellostreamingworld__pb2


class MultiGreeterStub(object):
"""The greeting service definition."""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.sayHello = channel.unary_stream(
"/hellostreamingworld.MultiGreeter/sayHello",
request_serializer=hellostreamingworld__pb2.HelloRequest.SerializeToString,
response_deserializer=hellostreamingworld__pb2.HelloReply.FromString,
)


class MultiGreeterServicer(object):
"""The greeting service definition."""

def sayHello(self, request, context):
"""Sends multiple greetings"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")


def add_MultiGreeterServicer_to_server(servicer, server):
rpc_method_handlers = {
"sayHello": grpc.unary_stream_rpc_method_handler(
servicer.sayHello,
request_deserializer=hellostreamingworld__pb2.HelloRequest.FromString,
response_serializer=hellostreamingworld__pb2.HelloReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler("hellostreamingworld.MultiGreeter", rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))


# This class is part of an EXPERIMENTAL API.
class MultiGreeter(object):
"""The greeting service definition."""

@staticmethod
def sayHello(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_stream(
request,
target,
"/hellostreamingworld.MultiGreeter/sayHello",
hellostreamingworld__pb2.HelloRequest.SerializeToString,
hellostreamingworld__pb2.HelloReply.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
112 changes: 101 additions & 11 deletions tests/contrib/grpc_aio/test_grpc_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
from tests.contrib.grpc.hello_pb2_grpc import HelloServicer
from tests.contrib.grpc.hello_pb2_grpc import HelloStub
from tests.contrib.grpc.hello_pb2_grpc import add_HelloServicer_to_server
from tests.contrib.grpc_aio.hellostreamingworld_pb2 import HelloReply as HelloReplyStream
from tests.contrib.grpc_aio.hellostreamingworld_pb2 import HelloRequest as HelloRequestStream
from tests.contrib.grpc_aio.hellostreamingworld_pb2_grpc import MultiGreeterServicer
from tests.contrib.grpc_aio.hellostreamingworld_pb2_grpc import MultiGreeterStub
from tests.contrib.grpc_aio.hellostreamingworld_pb2_grpc import add_MultiGreeterServicer_to_server
from tests.utils import DummyTracer
from tests.utils import assert_is_measured
from tests.utils import override_config


_GRPC_PORT = 50531
NUMBER_OF_REPLY = 10


class _CoroHelloServicer(HelloServicer):
Expand Down Expand Up @@ -150,6 +156,12 @@ def SayHelloRepeatedly(self, request_iterator, context):
yield HelloReply(message="Good bye")


class Greeter(MultiGreeterServicer):
async def sayHello(self, request, context):
for i in range(NUMBER_OF_REPLY):
yield HelloReplyStream(message=f"Hello number {i}, {request.name}!")


class DummyClientInterceptor(aio.UnaryUnaryClientInterceptor):
async def intercept_unary_unary(self, continuation, client_call_details, request):
undone_call = await continuation(client_call_details, request)
Expand All @@ -175,6 +187,24 @@ def tracer():
tracer.pop()


@pytest.fixture
async def async_server_info(request, tracer, event_loop):
_ServerInfo = namedtuple("_ServerInfo", ("target", "abort_supported"))
_server = grpc.aio.server()
add_MultiGreeterServicer_to_server(Greeter(), _server)
_servicer = request.param
target = f"localhost:{_GRPC_PORT}"
_server.add_insecure_port(target)
# interceptor can not catch AbortError for sync servicer
abort_supported = not isinstance(_servicer, (_SyncHelloServicer,))

await _server.start()
wait_task = event_loop.create_task(_server.wait_for_termination())
yield _ServerInfo(target, abort_supported)
await _server.stop(grace=None)
await wait_task


# `pytest_asyncio.fixture` cannot be used
# with pytest-asyncio 0.16.0 which is the latest version available for Python3.6.
@pytest.fixture
Expand All @@ -183,7 +213,6 @@ async def server_info(request, tracer, event_loop):
tracer fixture is imported to make sure the tracer is pinned to the modules.
"""
_ServerInfo = namedtuple("_ServerInfo", ("target", "abort_supported"))

_servicer = request.param
target = f"localhost:{_GRPC_PORT}"
_server = _create_server(_servicer, target)
Expand All @@ -208,16 +237,16 @@ def _get_spans(tracer):
return tracer._writer.spans


def _check_client_span(span, service, method_name, method_kind):
def _check_client_span(span, service, method_name, method_kind, resource="helloworld.Hello"):
assert_is_measured(span)
assert span.name == "grpc"
assert span.resource == "/helloworld.Hello/{}".format(method_name)
assert span.resource == "/{}/{}".format(resource, method_name)
assert span.service == service
assert span.error == 0
assert span.span_type == "grpc"
assert span.get_tag("grpc.method.path") == "/helloworld.Hello/{}".format(method_name)
assert span.get_tag("grpc.method.package") == "helloworld"
assert span.get_tag("grpc.method.service") == "Hello"
assert span.get_tag("grpc.method.path") == "/{}/{}".format(resource, method_name)
assert span.get_tag("grpc.method.package") == resource.split(".")[0]
assert span.get_tag("grpc.method.service") == resource.split(".")[1]
assert span.get_tag("grpc.method.name") == method_name
assert span.get_tag("grpc.method.kind") == method_kind
assert span.get_tag("grpc.status.code") == "StatusCode.OK"
Expand All @@ -228,16 +257,16 @@ def _check_client_span(span, service, method_name, method_kind):
assert span.get_tag("span.kind") == "client"


def _check_server_span(span, service, method_name, method_kind):
def _check_server_span(span, service, method_name, method_kind, resource="helloworld.Hello"):
assert_is_measured(span)
assert span.name == "grpc"
assert span.resource == "/helloworld.Hello/{}".format(method_name)
assert span.resource == "/{}/{}".format(resource, method_name)
assert span.service == service
assert span.error == 0
assert span.span_type == "grpc"
assert span.get_tag("grpc.method.path") == "/helloworld.Hello/{}".format(method_name)
assert span.get_tag("grpc.method.package") == "helloworld"
assert span.get_tag("grpc.method.service") == "Hello"
assert span.get_tag("grpc.method.path") == "/{}/{}".format(resource, method_name)
assert span.get_tag("grpc.method.package") == resource.split(".")[0]
assert span.get_tag("grpc.method.service") == resource.split(".")[1]
assert span.get_tag("grpc.method.name") == method_name
assert span.get_tag("grpc.method.kind") == method_kind
assert span.get_tag("component") == "grpc_aio_server"
Expand Down Expand Up @@ -1005,3 +1034,64 @@ async def test_client_streaming(server_info, tracer):
out, err, status, _ = ddtrace_run_python_code_in_subprocess(code, env=env)
assert status == 0, (err.decode(), out.decode())
assert err == b"", err.decode()


class StreamInterceptor(grpc.aio.UnaryStreamClientInterceptor):
async def intercept_unary_stream(self, continuation, call_details, request):
response_iterator = await continuation(call_details, request)
return response_iterator


async def run_streaming_example(server_info, use_generator=False):
i = 0
async with grpc.aio.insecure_channel(server_info.target, interceptors=[StreamInterceptor()]) as channel:
stub = MultiGreeterStub(channel)

# Read from an async generator
if use_generator:
async for response in stub.sayHello(HelloRequestStream(name="you")):
assert response.message == "Hello number {}, you!".format(i)
i += 1

# Direct read from the stub
else:
hello_stream = stub.sayHello(HelloRequestStream(name="will"))
while True:
response = await hello_stream.read()
if response == grpc.aio.EOF:
break
assert response.message == "Hello number {}, will!".format(i)
i += 1


@pytest.mark.asyncio
@pytest.mark.skip("Bug/error from grpc when adding an async streaming client interceptor throws StopAsyncIteration")
@pytest.mark.parametrize("async_server_info", [_CoroHelloServicer()], indirect=True)
async def test_async_streaming_direct_read(async_server_info, tracer):
await run_streaming_example(async_server_info)

spans = _get_spans(tracer)
assert len(spans) == 2
client_span, server_span = spans

# No error because cancelled after execution
_check_client_span(client_span, "grpc-aio-client", "SayHelloRepeatedly", "bidi_streaming")
_check_server_span(server_span, "grpc-aio-server", "SayHelloRepeatedly", "bidi_streaming")


@pytest.mark.asyncio
@pytest.mark.parametrize("async_server_info", [_CoroHelloServicer()], indirect=True)
async def test_async_streaming_generator(async_server_info, tracer):
await run_streaming_example(async_server_info, use_generator=True)

spans = _get_spans(tracer)
assert len(spans) == 2
client_span, server_span = spans

# No error because cancelled after execution
_check_client_span(
client_span, "grpc-aio-client", "sayHello", "server_streaming", "hellostreamingworld.MultiGreeter"
)
_check_server_span(
server_span, "grpc-aio-server", "sayHello", "server_streaming", "hellostreamingworld.MultiGreeter"
)

0 comments on commit b550028

Please sign in to comment.