Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement metadata in __call__ for aio unaryunary call #21336

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
# limitations under the License.

cimport cpython

from collections import namedtuple

import grpc

_EMPTY_FLAGS = 0
_EMPTY_MASK = 0
_EMPTY_METADATA = None

UnaryUnaryOpsResult = namedtuple('UnaryUnaryOpsResult',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💅 you can make it in a more pythonic way [1]

[1] https://github.com/grpc/grpc/pull/21246/files

['initial_metadata', 'message', 'code', 'details', 'trailing_metadata'])


cdef class _AioCall:
Expand Down Expand Up @@ -66,7 +71,8 @@ cdef class _AioCall:
"""Destroys the corresponding Core object for this RPC."""
grpc_call_unref(self._grpc_call_wrapper.call)

async def unary_unary(self, bytes method, bytes request, object timeout, AioCancelStatus cancel_status):
async def unary_unary(self, bytes method, bytes request, object timeout, object metadata,
AioCancelStatus cancel_status):
cdef object loop = asyncio.get_event_loop()

cdef tuple operations
Expand All @@ -79,7 +85,7 @@ cdef class _AioCall:

cdef char *c_details = NULL

initial_metadata_operation = SendInitialMetadataOperation(_EMPTY_METADATA, GRPC_INITIAL_METADATA_USED_MASK)
initial_metadata_operation = SendInitialMetadataOperation(metadata, GRPC_INITIAL_METADATA_USED_MASK)
initial_metadata_operation.c()

send_message_operation = SendMessageOperation(request, _EMPTY_FLAGS)
Expand Down Expand Up @@ -139,7 +145,12 @@ cdef class _AioCall:
self._destroy_grpc_call()

if receive_status_on_client_operation.code() == StatusCode.ok:
return receive_message_operation.message()
return UnaryUnaryOpsResult(
initial_metadata=receive_initial_metadata_operation.initial_metadata(),
message=receive_message_operation.message(),
code=receive_status_on_client_operation.code(),
details=receive_status_on_client_operation.details(),
trailing_metadata=receive_status_on_client_operation.trailing_metadata())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest think twice about the UnaryUnaryOpsResult design. Under streaming cases, the information is not going to arrive in one batch, and allows it to return just once. Otherwise, there will be two totally different designs of returning information from C-Core to application.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR for implementing the interceptors Im gonna segregate these
attributed as attributes belonging to the aio call object. Just for
unblocking the work here, if you agree, I would consider this namedtuple as
a temporary thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, if this PR is blocking your work in interceptors, I would say they are probably better check in together? The amount of code is not huge, should be fine merging into what you have in your refactor version.

This temporary design here is on critical data path. My intention is that if you are sensing that this needs to be refactored soon, why not perform the refactor all together?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really blocking me but if it gets merged first than mine the namedtuple would be removed.

But true that the number of changes is not so big, If everybody agrees I can ask @ZHmao for opening the PR against my branch and deliver later a PR which will address both things, hte interceptor and support for metadata.

Do you kinda agree @ZHmao? If so, could you open the PR against this branch [1] and close this?

[1] https://github.com/Skyscanner/grpc/tree/client_unaryunary_interceptors


raise AioRpcError(
receive_initial_metadata_operation.initial_metadata(),
Expand Down
4 changes: 2 additions & 2 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ cdef class AioChannel:
def close(self):
grpc_channel_destroy(self.channel)

async def unary_unary(self, method, request, timeout, cancel_status):
async def unary_unary(self, method, request, timeout, metadata, cancel_status):
call = _AioCall(self)
return await call.unary_unary(method, request, timeout, cancel_status)
return await call.unary_unary(method, request, timeout, metadata, cancel_status)
30 changes: 23 additions & 7 deletions src/python/grpcio/grpc/experimental/aio/_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import asyncio
import enum
from typing import Callable, Dict, Optional, ClassVar
from typing import Sequence, Tuple, Text, AnyStr

import grpc
from grpc import _common
Expand Down Expand Up @@ -182,11 +183,23 @@ def done(self) -> bool:
"""
return self._state is not _RpcState.ONGOING

async def initial_metadata(self):
raise NotImplementedError()
async def initial_metadata(self) -> Sequence[Tuple[Text, AnyStr]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening the can of worms, would make sense on changing this Sequence[Tuple[Text, AnyStr] for a Dict[Text, AnyStr]? Having the feeling that it would make life easier for the developers. Otherwise, they would be forced to wrap the sequence for implementing a dict view on their own.

Thoughts @ZHmao @gnossen @lidi? If so, let's do that this in this PR or another one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning a plain old Dict would lose the order of the metadata entries, which we currently preserve with Sequence[Tuple[Text, AnyStr]]. Perhaps OrderedDict instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a pity that we can not rely on default dict implementation that
preserves insertion order since Python 3.6, because we are giving support
to python 3.5+.

In any case, agree on using ordered dict.

if not self.done():
try:
await self
except (asyncio.CancelledError, AioRpcError):
pass

return self._initial_metadata

async def trailing_metadata(self):
raise NotImplementedError()
async def trailing_metadata(self) -> Sequence[Tuple[Text, AnyStr]]:
if not self.done():
try:
await self
except (asyncio.CancelledError, AioRpcError):
pass

return self._trailing_metadata

async def code(self) -> grpc.StatusCode:
"""Returns the `grpc.StatusCode` if the RPC is finished,
Expand Down Expand Up @@ -236,7 +249,7 @@ def __await__(self):
raise self._exception

try:
buffer_ = yield from self._call.__await__()
ops_result = yield from self._call.__await__()
except cygrpc.AioRpcError as aio_rpc_error:
self._state = _RpcState.ABORT
self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[
Expand All @@ -255,8 +268,11 @@ def __await__(self):
self._exception = cancel_error
raise

self._response = _common.deserialize(buffer_,
self._response = _common.deserialize(ops_result.message,
self._response_deserializer)
self._code = grpc.StatusCode.OK
self._code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE[ops_result.code]
self._details = ops_result.details
self._state = _RpcState.FINISHED
self._initial_metadata = ops_result.initial_metadata
self._trailing_metadata = ops_result.trailing_metadata
return self._response
5 changes: 1 addition & 4 deletions src/python/grpcio/grpc/experimental/aio/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ def __call__(self,
metadata, status code, and details.
"""

if metadata:
raise NotImplementedError("TODO: metadata not implemented yet")

if credentials:
raise NotImplementedError("TODO: credentials not implemented yet")

Expand All @@ -92,7 +89,7 @@ def __call__(self,
aio_cancel_status = cygrpc.AioCancelStatus()
aio_call = asyncio.ensure_future(
self._channel.unary_unary(self._method, serialized_request, timeout,
aio_cancel_status),
metadata, aio_cancel_status),
loop=self._loop)
return Call(aio_call, self._response_deserializer, aio_cancel_status)

Expand Down
17 changes: 17 additions & 0 deletions src/python/grpcio_tests/tests_aio/unit/_test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,27 @@
from src.proto.grpc.testing import test_pb2_grpc
from tests.unit.framework.common import test_constants

_INITIAL_METADATA_KEY = "initial-md-key"
_TRAILING_METADATA_KEY = "trailing-md-key-bin"


def _maybe_echo_metadata(servicer_context):
"""Copies metadata from request to response if it is present."""
invocation_metadata = dict(servicer_context.invocation_metadata())
if _INITIAL_METADATA_KEY in invocation_metadata:
initial_metadatum = (_INITIAL_METADATA_KEY,
invocation_metadata[_INITIAL_METADATA_KEY])
servicer_context.send_initial_metadata((initial_metadatum,))
if _TRAILING_METADATA_KEY in invocation_metadata:
trailing_metadatum = (_TRAILING_METADATA_KEY,
invocation_metadata[_TRAILING_METADATA_KEY])
servicer_context.set_trailing_metadata((trailing_metadatum,))


class _TestServiceServicer(test_pb2_grpc.TestServiceServicer):

async def UnaryCall(self, request, context):
# _maybe_echo_metadata(context)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can not get metadata from here until server-side implement it, the context is a placeholder[1] and server-side just send fake metadata[2] now, so I comment this line.

[1] https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi#L78
[2] https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi#L88-L94

return messages_pb2.SimpleResponse()

async def EmptyCall(self, request, context):
Expand Down
2 changes: 1 addition & 1 deletion src/python/grpcio_tests/tests_aio/unit/call_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ async def coro():
response_deserializer=messages_pb2.SimpleResponse.FromString
)
call = hi(messages_pb2.SimpleRequest())
self.assertEqual(await call.details(), None)
self.assertEqual(await call.details(), '')

self.loop.run_until_complete(coro())

Expand Down
27 changes: 27 additions & 0 deletions src/python/grpcio_tests/tests_aio/unit/channel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@
_UNARY_CALL_METHOD = '/grpc.testing.TestService/UnaryCall'
_EMPTY_CALL_METHOD = '/grpc.testing.TestService/EmptyCall'

_INVOCATION_METADATA = (
(
'initial-md-key',
'initial-md-value',
),
(
'trailing-md-key-bin',
b'\x00\x02',
),
)


class TestChannel(AioTestBase):

Expand Down Expand Up @@ -114,6 +125,22 @@ async def coro():

self.loop.run_until_complete(coro())

def test_unary_unary_metadata(self):

async def coro():
server_target, _ = await start_test_server() # pylint: disable=unused-variable

async with aio.insecure_channel(server_target) as channel:
hi = channel.unary_unary(
_UNARY_CALL_METHOD,
request_serializer=messages_pb2.SimpleRequest.SerializeToString,
response_deserializer=messages_pb2.SimpleResponse.FromString)
call = hi(messages_pb2.SimpleRequest(), metadata=_INVOCATION_METADATA)
self.assertIsNotNone(await call.initial_metadata())
self.assertIsNotNone(await call.trailing_metadata())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I'm wrong, but we are basically testing the default value here? because the _INVOCATION_METADATA is not finally wired in the server-side since _maybe_echo_metadata does not work, amb I wrong?

If so, maybe for having a reliable coverage we would need to implement something that allows the code to test that the metadata is received. For that - one solution that does not require the context- is create another gRPC endpoint, for example, MetadataEcho which would go through to all of hte metadata and will make an echo of the metadata found as metadata.

Besides that, and in the same way as being done in the cal_test.py for the details, I would implement a couple of tests for testing that initial_metadata and trailing_metadata are awaitable, in these tests we could double-check that the value returned is the default value when there is no metada returned by the user.


self.loop.run_until_complete(coro())


if __name__ == '__main__':
logging.basicConfig()
Expand Down