Skip to content
This repository was archived by the owner on Jul 8, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ file `guide <https://github.com/grpc/grpc/blob/master/BUILDING.md>`_ and follow
$ cd grpc-asyncio/vendor/grpc
$ git submodule init
$ git submodule update --recursive
$ git submodule update --init
$ make

For debugging purposes the library can be compiled in debug mode, as can be seen in the following command::
Expand Down
21 changes: 15 additions & 6 deletions grpc_asyncio/socket.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import asyncio
import socket

from libc cimport string

include "streams.pyx"


cdef class Socket:
def __cinit__(self):
self.g_socket = NULL
Expand Down Expand Up @@ -80,19 +84,24 @@ cdef class Socket:
cdef void connect(self, object host, object port, grpc_custom_connect_callback g_connect_cb):
assert not self.task_connect

self.g_connect_cb = g_connect_cb
self.task_connect = asyncio.create_task(
asyncio.open_connection(host, port)
open_connection(host, port)
)
self.task_connect.add_done_callback(self._connect_cb)
self.g_connect_cb = g_connect_cb

cdef void read(self, char * buffer_, size_t length, grpc_custom_read_callback g_read_cb):
assert not self.task_read

self.task_read = asyncio.create_task(
self.reader.read(n=length)
)
self.task_read.add_done_callback(self._read_cb)
if self.reader.is_data_available(length):
self._read_cb(self.reader.read(n=length))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a coro and a coro does not implement the Future interface, indeed the coro won't be scheduled in that scenario, not even started.

My suggestion is to implement a new method in the StreamReader for reading without having to wait and then if you want to wrap the result in a Future this will allow you to reuse the _read_cb function, for example

data = self.reader.read_nowait(n)
if data:
    f = Future()
    f.set_result(data)
    self._read_cb(f)
else:
    # awaitable version of the code


else:
self.task_read = asyncio.create_task(
self.reader.read(n=length)
)
self.task_read.add_done_callback(self._read_cb)

self.g_read_cb = g_read_cb
self.read_buffer = buffer_

Expand Down
Empty file added grpc_asyncio/streams.pxd
Empty file.
25 changes: 25 additions & 0 deletions grpc_asyncio/streams.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import asyncio


async def open_connection(
host=None, port=None, *, loop=None, limit=2 ** 16, **kwargs):

if loop is None:
loop = asyncio.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwargs)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
return reader, writer


class StreamReader(asyncio.StreamReader):
def is_data_available(self, n):
"""
Check if there is n length data in the buffer
"""
if len(self._buffer) >= n:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments here.

The final implementation of the read(n) [1] function always returns immediately if there is any data but having the data size bound to n. So theoretically the is_data_available should check only if there is something in the buffer. BTW this might explain why you are not seeing your code path executed in the acceptance test

Second, but TBH taking into account that this is a POC is not a stopper for me is the accessing of an internal attribute like _buffer that theoretically could be changed so breaking derivated classes. If we would go for a code that would have to be executed in production I won't rely on having always that attribute, so we will be forced to implement our own buffer attribute.

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/streams.py#L1006

return True

return False