From 0ef05228e0c8f971e809ce148e63b0a35e9d6469 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Fri, 21 Mar 2025 12:17:39 -0700 Subject: [PATCH 1/3] Use a pipe to get non-blocking body reads --- awscrt/io.py | 8 ++++++-- source/io.c | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/awscrt/io.py b/awscrt/io.py index ad1443f11..60a45bb0a 100644 --- a/awscrt/io.py +++ b/awscrt/io.py @@ -323,7 +323,7 @@ def __init__(self): self.min_tls_ver = TlsVersion.DEFAULT self.cipher_pref = TlsCipherPref.DEFAULT - self.verify_peer = True + self.verify_peer = False # TODO: Temporary to work around lack of pass through config. @staticmethod def create_client_with_mtls_from_path(cert_filepath, pk_filepath): @@ -689,7 +689,7 @@ class InputStream(NativeResource): Args: stream (io.IOBase): Python binary I/O stream to wrap. """ - __slots__ = ('_stream') + __slots__ = ('_stream', '_read_fd') # TODO: Implement IOBase interface so Python can read from this class as well. def __init__(self, stream): @@ -700,6 +700,10 @@ def __init__(self, stream): assert not isinstance(stream, InputStream) super().__init__() + self._read_fd = None + if callable(getattr(stream, 'read_fd', None)): + self._read_fd = stream.read_fd() + self._stream = stream self._binding = _awscrt.input_stream_new(self) diff --git a/source/io.c b/source/io.c index de218299c..2e585bbba 100644 --- a/source/io.c +++ b/source/io.c @@ -772,6 +772,35 @@ int s_aws_input_stream_py_read(struct aws_input_stream *stream, struct aws_byte_ return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */ } + // Get the _read_fd attribute + PyObject *read_fd_attr = PyObject_GetAttrString(impl->py_self, "_read_fd"); + if (!read_fd_attr) { + PyErr_Clear(); // Clear the error if attribute doesn't exist + } else { + ssize_t bytesRead = 0; + if (PyLong_Check(read_fd_attr)) { + long read_fd = PyLong_AsLong(read_fd_attr); + Py_DECREF(read_fd_attr); + + PyGILState_Release(state); + + size_t available = dest->capacity - dest->len; + char *mem_start = (char *)(dest->buffer + dest->len); + bytesRead = read(read_fd, mem_start, available); + + if (bytesRead == 0) { + impl->is_end_of_stream = true; + close(read_fd); + } else { + dest->len += bytesRead; + } + + // memory_view and method_result have not been allocated, no need to clean them up. + return aws_result; + } + } + Py_XDECREF(read_fd_attr); + memory_view = aws_py_memory_view_from_byte_buffer(dest); if (!memory_view) { aws_result = aws_py_raise_error(); From 53cd4156a25daf506dc722ada15a83bd252d432b Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Fri, 21 Mar 2025 14:18:37 -0700 Subject: [PATCH 2/3] Add PipeInputStream to improve forward/backwrads compat --- awscrt/io.py | 23 +++++++++++++++++++++++ source/io.c | 2 +- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/awscrt/io.py b/awscrt/io.py index 60a45bb0a..c487fa0e5 100644 --- a/awscrt/io.py +++ b/awscrt/io.py @@ -751,6 +751,29 @@ def wrap(cls, stream, allow_none=False): return stream return cls(stream) +class PipeInputStream(InputStream): + """PipeInputStream allows `awscrt` native code to read from Python binary I/O classes using a pipe. + + Args: + stream: Python binary I/O stream to wrap - must implement read_fd() + """ + + def __init__(self, stream): + # duck-type instead of checking inheritance + # At the least, stream must have read_fd() + if not callable(getattr(stream, 'read', None)): + raise TypeError('I/O stream type expected') + assert not isinstance(stream, InputStream) + + super().__init__(stream) + + if not getattr(stream, 'read_fd', None): + raise TypeError('Stream must implement read_fd()') + + @property + def read_fd(self): + """Returns the readable file descriptor associated with the stream.""" + return self._stream.read_fd class Pkcs11Lib(NativeResource): """ diff --git a/source/io.c b/source/io.c index 2e585bbba..db7ea815b 100644 --- a/source/io.c +++ b/source/io.c @@ -773,7 +773,7 @@ int s_aws_input_stream_py_read(struct aws_input_stream *stream, struct aws_byte_ } // Get the _read_fd attribute - PyObject *read_fd_attr = PyObject_GetAttrString(impl->py_self, "_read_fd"); + PyObject *read_fd_attr = PyObject_GetAttrString(impl->py_self, "read_fd"); if (!read_fd_attr) { PyErr_Clear(); // Clear the error if attribute doesn't exist } else { From e06bf4c642616899eb411d44847932a906959240 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Fri, 21 Mar 2025 14:30:57 -0700 Subject: [PATCH 3/3] add export --- awscrt/http.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/awscrt/http.py b/awscrt/http.py index b3a89b868..e92ae5e56 100644 --- a/awscrt/http.py +++ b/awscrt/http.py @@ -11,9 +11,11 @@ from concurrent.futures import Future from awscrt import NativeResource import awscrt.exceptions -from awscrt.io import ClientBootstrap, InputStream, TlsConnectionOptions, SocketOptions +from awscrt.io import ClientBootstrap, InputStream, PipeInputStream, TlsConnectionOptions, SocketOptions from enum import IntEnum +PipeInputStream # noqa - export from this module + class HttpVersion(IntEnum): """HTTP protocol version enumeration"""