Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion awscrt/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from concurrent.futures import Future
from awscrt import NativeResource
import awscrt.exceptions
from awscrt.io import ClientBootstrap, InputStream, TlsConnectionOptions, SocketOptions
from awscrt.io import ClientBootstrap, InputStream, PipeInputStream, TlsConnectionOptions, SocketOptions
from enum import IntEnum

PipeInputStream # noqa - export from this module


class HttpVersion(IntEnum):
"""HTTP protocol version enumeration"""
Expand Down
31 changes: 29 additions & 2 deletions awscrt/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def __init__(self):

self.min_tls_ver = TlsVersion.DEFAULT
self.cipher_pref = TlsCipherPref.DEFAULT
self.verify_peer = True
self.verify_peer = False # TODO: Temporary to work around lack of pass through config.

@staticmethod
def create_client_with_mtls_from_path(cert_filepath, pk_filepath):
Expand Down Expand Up @@ -689,7 +689,7 @@ class InputStream(NativeResource):
Args:
stream (io.IOBase): Python binary I/O stream to wrap.
"""
__slots__ = ('_stream')
__slots__ = ('_stream', '_read_fd')
# TODO: Implement IOBase interface so Python can read from this class as well.

def __init__(self, stream):
Expand All @@ -700,6 +700,10 @@ def __init__(self, stream):
assert not isinstance(stream, InputStream)

super().__init__()
self._read_fd = None
if callable(getattr(stream, 'read_fd', None)):
self._read_fd = stream.read_fd()

self._stream = stream
self._binding = _awscrt.input_stream_new(self)

Expand Down Expand Up @@ -747,6 +751,29 @@ def wrap(cls, stream, allow_none=False):
return stream
return cls(stream)

class PipeInputStream(InputStream):
"""PipeInputStream allows `awscrt` native code to read from Python binary I/O classes using a pipe.

Args:
stream: Python binary I/O stream to wrap - must implement read_fd()
"""

def __init__(self, stream):
# duck-type instead of checking inheritance
# At the least, stream must have read_fd()
if not callable(getattr(stream, 'read', None)):
raise TypeError('I/O stream type expected')
assert not isinstance(stream, InputStream)

super().__init__(stream)

if not getattr(stream, 'read_fd', None):
raise TypeError('Stream must implement read_fd()')

@property
def read_fd(self):
"""Returns the readable file descriptor associated with the stream."""
return self._stream.read_fd

class Pkcs11Lib(NativeResource):
"""
Expand Down
29 changes: 29 additions & 0 deletions source/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,35 @@ int s_aws_input_stream_py_read(struct aws_input_stream *stream, struct aws_byte_
return AWS_OP_ERR; /* Python has shut down. Nothing matters anymore, but don't crash */
}

// Get the _read_fd attribute
PyObject *read_fd_attr = PyObject_GetAttrString(impl->py_self, "read_fd");
if (!read_fd_attr) {
PyErr_Clear(); // Clear the error if attribute doesn't exist
} else {
ssize_t bytesRead = 0;
if (PyLong_Check(read_fd_attr)) {
long read_fd = PyLong_AsLong(read_fd_attr);
Py_DECREF(read_fd_attr);

PyGILState_Release(state);

size_t available = dest->capacity - dest->len;
char *mem_start = (char *)(dest->buffer + dest->len);
bytesRead = read(read_fd, mem_start, available);

if (bytesRead == 0) {
impl->is_end_of_stream = true;
close(read_fd);
} else {
dest->len += bytesRead;
}

// memory_view and method_result have not been allocated, no need to clean them up.
return aws_result;
}
}
Py_XDECREF(read_fd_attr);

memory_view = aws_py_memory_view_from_byte_buffer(dest);
if (!memory_view) {
aws_result = aws_py_raise_error();
Expand Down