Skip to content

Commit

Permalink
Fix sending extremely large messages getting stuck when using asyncio
Browse files Browse the repository at this point in the history
Apparently the file descriptor returned by `sd_bus_get_fd` must be
monitored for both reading and writing depending on `sd_bus_get_events`
return value. This comes in to play when an extremely large message
needs to be sent over D-Bus. Such extremely large message will need
multiple `sendmsg` calls with file descriptor being monitored to
when the write processing of the message has to be done.

This commit will add or remove asyncio loop file descriptor watchers
based on changes to `sd_bus_get_events` value.
  • Loading branch information
igo95862 committed Feb 24, 2024
1 parent 899a7db commit 03593a1
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 35 deletions.
4 changes: 4 additions & 0 deletions src/sdbus/sd_bus_internals.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ PyObject* set_result_str = NULL;
PyObject* set_exception_str = NULL;
PyObject* add_reader_str = NULL;
PyObject* remove_reader_str = NULL;
PyObject* add_writer_str = NULL;
PyObject* remove_writer_str = NULL;
PyObject* empty_str = NULL;
PyObject* null_str = NULL;
PyObject* extend_str = NULL;
Expand Down Expand Up @@ -171,6 +173,8 @@ PyMODINIT_FUNC PyInit_sd_bus_internals(void) {
create_task_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("create_task"));
remove_reader_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("remove_reader"));
add_reader_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("add_reader"));
add_writer_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("add_writer"));
remove_writer_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("remove_writer"));
empty_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString(""));
null_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromStringAndSize("\0", 1));
extend_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("extend"));
Expand Down
5 changes: 4 additions & 1 deletion src/sdbus/sd_bus_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ extern PyObject* set_result_str;
extern PyObject* set_exception_str;
extern PyObject* add_reader_str;
extern PyObject* remove_reader_str;
extern PyObject* add_writer_str;
extern PyObject* remove_writer_str;
extern PyObject* empty_str;
extern PyObject* null_str;
extern PyObject* extend_str;
Expand Down Expand Up @@ -330,7 +332,8 @@ extern PyObject* SdBusMessage_class;
typedef struct {
PyObject_HEAD;
sd_bus* sd_bus_ref;
PyObject* reader_fd;
PyObject* bus_fd;
int asyncio_watchers_last_state;
} SdBusObject;

extern PyType_Spec SdBusType;
Expand Down
2 changes: 1 addition & 1 deletion src/sdbus/sd_bus_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def call_async(
/) -> Future[SdBusMessage]:
raise NotImplementedError(__STUB_ERROR)

def drive(self) -> None:
def process(self) -> None:
raise NotImplementedError(__STUB_ERROR)

def get_fd(self) -> int:
Expand Down
85 changes: 52 additions & 33 deletions src/sdbus/sd_bus_internals_bus.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include <errno.h>
#include <poll.h>
#include "sd_bus_internals.h"

static void SdBus_dealloc(SdBusObject* self) {
sd_bus_unref(self->sd_bus_ref);
Py_XDECREF(self->reader_fd);
Py_XDECREF(self->bus_fd);

SD_BUS_DEALLOC_TAIL;
}
Expand Down Expand Up @@ -229,46 +230,24 @@ int future_set_exception_from_message(PyObject* future, sd_bus_message* message)
return 0;
}

static PyObject* SdBus_drive(SdBusObject* self, PyObject* Py_UNUSED(args));

static PyObject* SdBus_get_fd(SdBusObject* self, PyObject* Py_UNUSED(args)) {
int file_descriptor = CALL_SD_BUS_AND_CHECK(sd_bus_get_fd(self->sd_bus_ref));

return PyLong_FromLong((long)file_descriptor);
}

#define CHECK_SD_BUS_READER \
({ \
if (self->reader_fd == NULL) { \
CALL_PYTHON_EXPECT_NONE(register_reader(self)); \
} \
})

PyObject* register_reader(SdBusObject* self) {
PyObject* running_loop CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallFunctionObjArgs(asyncio_get_running_loop, NULL));
PyObject* new_reader_fd CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(SdBus_get_fd(self, NULL));
PyObject* drive_method CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString((PyObject*)self, "drive"));
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, add_reader_str, new_reader_fd, drive_method, NULL)));
Py_INCREF(new_reader_fd);
self->reader_fd = new_reader_fd;
Py_RETURN_NONE;
}
static PyObject* SdBus_asyncio_update_fd_watchers(SdBusObject* self);

PyObject* unregister_reader(SdBusObject* self) {
PyObject* running_loop CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallFunctionObjArgs(asyncio_get_running_loop, NULL));
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, remove_reader_str, self->reader_fd, NULL)));
Py_RETURN_NONE;
}
#define CHECK_ASYNCIO_WATCHERS ({ CALL_PYTHON_EXPECT_NONE(SdBus_asyncio_update_fd_watchers(self)); })

static PyObject* SdBus_drive(SdBusObject* self, PyObject* Py_UNUSED(args)) {
static PyObject* SdBus_process(SdBusObject* self, PyObject* Py_UNUSED(args)) {
int return_value = 1;
while (return_value > 0) {
return_value = sd_bus_process(self->sd_bus_ref, NULL);
if (return_value < 0) {
CALL_PYTHON_AND_CHECK(unregister_reader(self));
if (-ECONNRESET == return_value) {
// Connection gracefully terminated
Py_RETURN_NONE;
break;
} else {
// Error occurred processing sdbus
CALL_SD_BUS_AND_CHECK(return_value);
Expand All @@ -280,6 +259,7 @@ static PyObject* SdBus_drive(SdBusObject* self, PyObject* Py_UNUSED(args)) {
return NULL;
}
}
CHECK_ASYNCIO_WATCHERS;

Py_RETURN_NONE;
}
Expand All @@ -291,7 +271,7 @@ int SdBus_async_callback(sd_bus_message* m,
PyObject* py_future = userdata;
PyObject* is_cancelled CLEANUP_PY_OBJECT = PyObject_CallMethod(py_future, "cancelled", "");
if (Py_True == is_cancelled) {
// A bit unpythonic but SdBus_drive does not error out
// A bit unpythonic but SdBus_process does not error out
return 0;
}

Expand Down Expand Up @@ -340,7 +320,7 @@ static PyObject* SdBus_call_async(SdBusObject* self, PyObject* args) {
if (PyObject_SetAttrString(new_future, "_sd_bus_py_slot", (PyObject*)new_slot_object) < 0) {
return NULL;
}
CHECK_SD_BUS_READER;
CHECK_ASYNCIO_WATCHERS;
return new_future;
}

Expand Down Expand Up @@ -454,7 +434,7 @@ static PyObject* SdBus_match_signal_async(SdBusObject* self, PyObject* args) {
interface_name_char_ptr, member_name_char_ptr, _SdBus_signal_callback,
_SdBus_match_signal_instant_callback, new_future));

CHECK_SD_BUS_READER;
CHECK_ASYNCIO_WATCHERS;
Py_INCREF(new_future);
return new_future;
}
Expand All @@ -465,7 +445,7 @@ int SdBus_request_name_callback(sd_bus_message* m,
PyObject* py_future = userdata;
PyObject* is_cancelled CLEANUP_PY_OBJECT = PyObject_CallMethod(py_future, "cancelled", "");
if (Py_True == is_cancelled) {
// A bit unpythonic but SdBus_drive does not error out
// A bit unpythonic but SdBus_process does not error out
return 0;
}

Expand Down Expand Up @@ -531,7 +511,7 @@ static PyObject* SdBus_request_name_async(SdBusObject* self, PyObject* args) {
sd_bus_request_name_async(self->sd_bus_ref, &new_slot_object->slot_ref, service_name_char_ptr, flags, SdBus_request_name_callback, new_future));

CALL_PYTHON_INT_CHECK(PyObject_SetAttrString(new_future, "_sd_bus_py_slot", (PyObject*)new_slot_object));
CHECK_SD_BUS_READER;
CHECK_ASYNCIO_WATCHERS;
return new_future;
}

Expand Down Expand Up @@ -635,10 +615,49 @@ static PyObject* SdBus_start(SdBusObject* self, PyObject* Py_UNUSED(args)) {
Py_RETURN_NONE;
}

static inline int sd_bus_get_events_zero_on_closed(SdBusObject* self) {
int events = sd_bus_get_events(self->sd_bus_ref);
if (-ENOTCONN == events) {
return 0;
}
return events;
};

static PyObject* SdBus_asyncio_update_fd_watchers(SdBusObject* self) {
int events_to_watch = CALL_SD_BUS_AND_CHECK(sd_bus_get_events_zero_on_closed(self));
if (events_to_watch == self->asyncio_watchers_last_state) {
// Do not update the watchers because state is the same
Py_RETURN_NONE;
} else {
self->asyncio_watchers_last_state = events_to_watch;
}

PyObject* running_loop CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallFunctionObjArgs(asyncio_get_running_loop, NULL));
PyObject* drive_method CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString((PyObject*)self, "process"));

if (NULL == self->bus_fd) {
self->bus_fd = CALL_PYTHON_AND_CHECK(SdBus_get_fd(self, NULL));
}

if (events_to_watch & POLLIN) {
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, add_reader_str, self->bus_fd, drive_method, NULL)));
} else {
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, remove_reader_str, self->bus_fd, NULL)));
}

if (events_to_watch & POLLOUT) {
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, add_writer_str, self->bus_fd, drive_method, NULL)));
} else {
Py_XDECREF(CALL_PYTHON_AND_CHECK(PyObject_CallMethodObjArgs(running_loop, remove_writer_str, self->bus_fd, NULL)));
}

Py_RETURN_NONE;
}

static PyMethodDef SdBus_methods[] = {
{"call", (SD_BUS_PY_FUNC_TYPE)SdBus_call, SD_BUS_PY_METH, PyDoc_STR("Send message and block until the reply.")},
{"call_async", (SD_BUS_PY_FUNC_TYPE)SdBus_call_async, SD_BUS_PY_METH, PyDoc_STR("Async send message, returns awaitable future.")},
{"drive", (PyCFunction)SdBus_drive, METH_NOARGS, PyDoc_STR("Drive connection.")},
{"process", (PyCFunction)SdBus_process, METH_NOARGS, PyDoc_STR("Process pending IO work.")},
{"get_fd", (SD_BUS_PY_FUNC_TYPE)SdBus_get_fd, SD_BUS_PY_METH, PyDoc_STR("Get file descriptor to poll on.")},
{"new_method_call_message", (SD_BUS_PY_FUNC_TYPE)SdBus_new_method_call_message, SD_BUS_PY_METH, PyDoc_STR("Create new empty method call message.")},
{"new_property_get_message", (SD_BUS_PY_FUNC_TYPE)SdBus_new_property_get_message, SD_BUS_PY_METH, PyDoc_STR("Create new empty property get message.")},
Expand Down
25 changes: 25 additions & 0 deletions test/test_sdbus_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ async def takes_struct_method(
a, b, c, d = int_struct
return a*b*c*d

@dbus_method_async("s", "x")
async def return_length(self, input_str: str) -> int:
return len(input_str)


class DbusErrorTest(DbusFailedError):
dbus_error_name = 'org.example.Error'
Expand Down Expand Up @@ -939,3 +943,24 @@ async def two(self) -> int:

class CombinedInterface(OneInterface, TwoInterface):
...

async def test_extremely_large_string(self) -> None:
test_object, test_object_connection = initialize_object()

extremely_large_string = "a" * 8423681

remote_len = await wait_for(
test_object_connection.return_length(
extremely_large_string
),
timeout=10,
)

self.assertEqual(
remote_len,
len(extremely_large_string),
)

# Check that calling regular methods still works.
for _ in range(5):
await test_object_connection.returns_none_method()

0 comments on commit 03593a1

Please sign in to comment.