Skip to content

Commit

Permalink
[Aio] Unary unary client call barebones implementation
Browse files Browse the repository at this point in the history
Implement the minimal stuff for making a unary call with the new
experimental gRPC Python implementation for Asyncio, called Aio.

What has been added:

- Minimal iomgr code for performing the required network and timer
calls.
- Minimal Cython code implementing the channel, call and the callback
context.
- Minimal Python code that mimics the synchronous implementation but
designed to be asynchronous.

Testing considerations:

Tests have to be executed using the `GRPC_ENABLE_FORK_SUPPORT=0`
environment variable for skipping the fork handles installed by
the core library. This is due to the usage of a syncrhonous server
used as a fixture executed in another process.

Co-authored-by: Manuel Miranda <manuel.miranda@skyscanner.net>
Co-authored-by: Mariano Anaya <mariano.anaya@skyscanner.net>
Co-authored-by: Zhanghui Mao <zhanghui.mao@skyscanner.net>
Co-authored-by: Lidi Zheng <lidiz@google.com>
  • Loading branch information
5 people committed Sep 11, 2019
1 parent 1bfdbc1 commit a44e6d7
Show file tree
Hide file tree
Showing 40 changed files with 1,691 additions and 161 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Dropbox, Inc.
Google Inc.
Skyscanner Ltd.
WeWork Companies Inc.
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@
r'macosx-10.7-\1',
util.get_platform())


def cython_extensions_and_necessity():
cython_module_files = [os.path.join(PYTHON_STEM,
name.replace('.', '/') + '.pyx')
Expand Down Expand Up @@ -295,6 +296,8 @@ def cython_extensions_and_necessity():
need_cython = BUILD_WITH_CYTHON
if not BUILD_WITH_CYTHON:
need_cython = need_cython or not commands.check_and_update_cythonization(extensions)
# TODO: the strategy for conditional compiling and exposing the aio Cython
# dependencies will be revisited by https://github.com/grpc/grpc/issues/19728
return commands.try_cythonize(extensions, linetracing=ENABLE_CYTHON_TRACING, mandatory=BUILD_WITH_CYTHON), need_cython

CYTHON_EXTENSION_MODULES, need_cython = cython_extensions_and_necessity()
Expand Down
16 changes: 16 additions & 0 deletions src/python/grpcio/grpc/_cython/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ pyx_library(
"__init__.py",
"_cygrpc/_hooks.pxd.pxi",
"_cygrpc/_hooks.pyx.pxi",
"_cygrpc/aio/call.pxd.pxi",
"_cygrpc/aio/call.pyx.pxi",
"_cygrpc/aio/callbackcontext.pxd.pxi",
"_cygrpc/aio/channel.pxd.pxi",
"_cygrpc/aio/channel.pyx.pxi",
"_cygrpc/aio/grpc_aio.pxd.pxi",
"_cygrpc/aio/grpc_aio.pyx.pxi",
"_cygrpc/aio/iomgr/iomgr.pyx.pxi",
"_cygrpc/aio/iomgr/resolver.pxd.pxi",
"_cygrpc/aio/iomgr/resolver.pyx.pxi",
"_cygrpc/aio/iomgr/socket.pxd.pxi",
"_cygrpc/aio/iomgr/socket.pyx.pxi",
"_cygrpc/aio/iomgr/timer.pxd.pxi",
"_cygrpc/aio/iomgr/timer.pyx.pxi",
"_cygrpc/arguments.pxd.pxi",
"_cygrpc/arguments.pyx.pxi",
"_cygrpc/call.pxd.pxi",
Expand All @@ -27,6 +41,8 @@ pyx_library(
"_cygrpc/grpc_gevent.pxd.pxi",
"_cygrpc/grpc_gevent.pyx.pxi",
"_cygrpc/grpc_string.pyx.pxi",
"_cygrpc/iomgr.pxd.pxi",
"_cygrpc/iomgr.pyx.pxi",
"_cygrpc/metadata.pxd.pxi",
"_cygrpc/metadata.pyx.pxi",
"_cygrpc/operation.pxd.pxi",
Expand Down
27 changes: 27 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pxd.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


cdef class _AioCall:
cdef:
AioChannel _channel
CallbackContext _watcher_call
grpc_completion_queue * _cq
grpc_experimental_completion_queue_functor _functor
object _waiter_call

@staticmethod
cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int succeed)
@staticmethod
cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int succeed)
149 changes: 149 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cimport cpython

_EMPTY_FLAGS = 0
_EMPTY_METADATA = ()
_OP_ARRAY_LENGTH = 6


cdef class _AioCall:


def __cinit__(self, AioChannel channel):
self._channel = channel
self._functor.functor_run = _AioCall.functor_run

self._cq = grpc_completion_queue_create_for_callback(
<grpc_experimental_completion_queue_functor *> &self._functor,
NULL
)

self._watcher_call.functor.functor_run = _AioCall.watcher_call_functor_run
self._watcher_call.waiter = <cpython.PyObject *> self
self._waiter_call = None

def __dealloc__(self):
grpc_completion_queue_shutdown(self._cq)
grpc_completion_queue_destroy(self._cq)

def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"

@staticmethod
cdef void functor_run(grpc_experimental_completion_queue_functor* functor, int succeed):
pass

@staticmethod
cdef void watcher_call_functor_run(grpc_experimental_completion_queue_functor* functor, int succeed):
call = <_AioCall>(<CallbackContext *>functor).waiter

assert call._waiter_call

if succeed == 0:
call._waiter_call.set_exception(Exception("Some error ocurred"))
else:
call._waiter_call.set_result(None)

async def unary_unary(self, method, request):
cdef grpc_call * call
cdef grpc_slice method_slice
cdef grpc_op * ops

cdef Operation initial_metadata_operation
cdef Operation send_message_operation
cdef Operation send_close_from_client_operation
cdef Operation receive_initial_metadata_operation
cdef Operation receive_message_operation
cdef Operation receive_status_on_client_operation

cdef grpc_call_error call_status


method_slice = grpc_slice_from_copied_buffer(
<const char *> method,
<size_t> len(method)
)

call = grpc_channel_create_call(
self._channel.channel,
NULL,
0,
self._cq,
method_slice,
NULL,
_timespec_from_time(None),
NULL
)

grpc_slice_unref(method_slice)

ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * _OP_ARRAY_LENGTH)

initial_metadata_operation = SendInitialMetadataOperation(_EMPTY_METADATA, GRPC_INITIAL_METADATA_USED_MASK)
initial_metadata_operation.c()
ops[0] = <grpc_op> initial_metadata_operation.c_op

send_message_operation = SendMessageOperation(request, _EMPTY_FLAGS)
send_message_operation.c()
ops[1] = <grpc_op> send_message_operation.c_op

send_close_from_client_operation = SendCloseFromClientOperation(_EMPTY_FLAGS)
send_close_from_client_operation.c()
ops[2] = <grpc_op> send_close_from_client_operation.c_op

receive_initial_metadata_operation = ReceiveInitialMetadataOperation(_EMPTY_FLAGS)
receive_initial_metadata_operation.c()
ops[3] = <grpc_op> receive_initial_metadata_operation.c_op

receive_message_operation = ReceiveMessageOperation(_EMPTY_FLAGS)
receive_message_operation.c()
ops[4] = <grpc_op> receive_message_operation.c_op

receive_status_on_client_operation = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
receive_status_on_client_operation.c()
ops[5] = <grpc_op> receive_status_on_client_operation.c_op

self._waiter_call = asyncio.get_event_loop().create_future()

call_status = grpc_call_start_batch(
call,
ops,
_OP_ARRAY_LENGTH,
&self._watcher_call.functor,
NULL
)

try:
if call_status != GRPC_CALL_OK:
self._waiter_call = None
raise Exception("Error with grpc_call_start_batch {}".format(call_status))

await self._waiter_call

finally:
initial_metadata_operation.un_c()
send_message_operation.un_c()
send_close_from_client_operation.un_c()
receive_initial_metadata_operation.un_c()
receive_message_operation.un_c()
receive_status_on_client_operation.un_c()

grpc_call_unref(call)
gpr_free(ops)

return receive_message_operation.message()
20 changes: 20 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/callbackcontext.pxd.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cimport cpython

cdef struct CallbackContext:
grpc_experimental_completion_queue_functor functor
cpython.PyObject *waiter

18 changes: 18 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pxd.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cdef class AioChannel:
cdef:
grpc_channel * channel
bytes _target
30 changes: 30 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/channel.pyx.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

cdef class AioChannel:
def __cinit__(self, bytes target):
self.channel = grpc_insecure_channel_create(<char *>target, NULL, NULL)
self._target = target

def __repr__(self):
class_name = self.__class__.__name__
id_ = id(self)
return f"<{class_name} {id_}>"

def close(self):
grpc_channel_destroy(self.channel)

async def unary_unary(self, method, request):
call = _AioCall(self)
return await call.unary_unary(method, request)
25 changes: 25 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pxd.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# distutils: language=c++

cdef extern from "src/core/lib/iomgr/timer_manager.h":
void grpc_timer_manager_set_threading(bint enabled);

cdef extern from "src/core/lib/iomgr/iomgr_internal.h":
void grpc_set_default_iomgr_platform();

cdef extern from "src/core/lib/iomgr/executor.h" namespace "grpc_core":
cdef cppclass Executor:
@staticmethod
void SetThreadingAll(bint enable);
37 changes: 37 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/grpc_aio.pyx.pxi
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2019 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


cdef bint _grpc_aio_initialized = 0


def init_grpc_aio():
global _grpc_aio_initialized

if _grpc_aio_initialized:
return

install_asyncio_iomgr()
grpc_init()

# Timers are triggered by the Asyncio loop. We disable
# the background thread that is being used by the native
# gRPC iomgr.
grpc_timer_manager_set_threading(0)

# gRPC callbaks are executed within the same thread used by the Asyncio
# event loop, as it is being done by the other Asyncio callbacks.
Executor.SetThreadingAll(0)

_grpc_aio_initialized = 1
Loading

0 comments on commit a44e6d7

Please sign in to comment.