Skip to content

Commit

Permalink
Rework signals integration with libsystemd
Browse files Browse the repository at this point in the history
Instead of relying on asyncio Queues use the `call_soon` of event
loop to schedule callbacks. The callback should be a blocking
function that accepts an SdBusMessage and returns None.

The `get_signal_queue_async` of SdBus was renamed to `match_signal_async`
which matches the sd-bus call used.

Callback system is more flexible. The existing signal API will
create the asyncio.Queue inside the `catch` methods and register
the callbacks to the `put_nowait` method of the Queue. This means
the code using public API is fully backwards compatible.
  • Loading branch information
igo95862 committed Dec 31, 2023
1 parent 63d77b4 commit 8b11d3f
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 79 deletions.
59 changes: 36 additions & 23 deletions src/sdbus/dbus_proxy_async_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

from asyncio import Queue
from contextlib import closing
from types import FunctionType
from typing import (
TYPE_CHECKING,
Expand All @@ -43,7 +44,7 @@
from typing import Any, Callable, Optional, Sequence, Tuple, Type

from .dbus_proxy_async_interface_base import DbusInterfaceBaseAsync
from .sd_bus_internals import SdBus
from .sd_bus_internals import SdBus, SdBusMessage


T = TypeVar('T')
Expand Down Expand Up @@ -97,16 +98,20 @@ def __init__(
self.__doc__ = dbus_signal.__doc__

async def catch(self) -> AsyncIterator[T]:
dbus_queue = await self.proxy_meta.attached_bus.get_signal_queue_async(
message_queue: Queue[SdBusMessage] = Queue()

match_slot = await self.proxy_meta.attached_bus.match_signal_async(
self.proxy_meta.service_name,
self.proxy_meta.object_path,
self.dbus_signal.interface_name,
self.dbus_signal.signal_name,
message_queue.put_nowait,
)

while True:
next_signal_message = await dbus_queue.get()
yield cast(T, next_signal_message.get_contents())
with closing(match_slot):
while True:
next_signal_message = await message_queue.get()
yield cast(T, next_signal_message.get_contents())

__aiter__ = catch

Expand All @@ -121,21 +126,25 @@ async def catch_anywhere(
if service_name is None:
service_name = self.proxy_meta.service_name

message_queue = await bus.get_signal_queue_async(
message_queue: Queue[SdBusMessage] = Queue()

match_slot = await bus.match_signal_async(
service_name,
None,
self.dbus_signal.interface_name,
self.dbus_signal.signal_name,
message_queue.put_nowait,
)

while True:
next_signal_message = await message_queue.get()
signal_path = next_signal_message.path
assert signal_path is not None
yield (
signal_path,
cast(T, next_signal_message.get_contents())
)
with closing(match_slot):
while True:
next_signal_message = await message_queue.get()
signal_path = next_signal_message.path
assert signal_path is not None
yield (
signal_path,
cast(T, next_signal_message.get_contents())
)

def emit(self, args: T) -> None:
raise RuntimeError("Cannot emit signal from D-Bus proxy.")
Expand Down Expand Up @@ -264,21 +273,25 @@ async def catch_anywhere(
if bus is None:
bus = get_default_bus()

message_queue = await bus.get_signal_queue_async(
message_queue: Queue[SdBusMessage] = Queue()

match_slot = await bus.match_signal_async(
service_name,
None,
self.dbus_signal.interface_name,
self.dbus_signal.signal_name,
message_queue.put_nowait,
)

while True:
next_signal_message = await message_queue.get()
signal_path = next_signal_message.path
assert signal_path is not None
yield (
signal_path,
cast(T, next_signal_message.get_contents())
)
with closing(match_slot):
while True:
next_signal_message = await message_queue.get()
signal_path = next_signal_message.path
assert signal_path is not None
yield (
signal_path,
cast(T, next_signal_message.get_contents())
)

def emit(self, args: T) -> None:
raise NotImplementedError(
Expand Down
18 changes: 13 additions & 5 deletions src/sdbus/sd_bus_internals.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@

// Python functions and objects
PyObject* asyncio_get_running_loop = NULL;
PyObject* asyncio_queue_class = NULL;
PyObject* is_coroutine_function = NULL;
// Str objects
PyObject* set_result_str = NULL;
PyObject* set_exception_str = NULL;
PyObject* put_no_wait_str = NULL;
PyObject* add_reader_str = NULL;
PyObject* remove_reader_str = NULL;
PyObject* empty_str = NULL;
Expand Down Expand Up @@ -56,6 +54,18 @@ static void SdBusSlot_dealloc(SdBusSlotObject* self) {
SD_BUS_DEALLOC_TAIL;
}

static PyObject* SdBusSlot_close(SdBusSlotObject* self) {
sd_bus_slot_unref(self->slot_ref);
self->slot_ref = NULL;

Py_RETURN_NONE;
}

static PyMethodDef SdBusSlot_methods[] = {
{"close", (PyCFunction)SdBusSlot_close, METH_NOARGS, PyDoc_STR("Dereference sd-bus slot stopping any associated callbacks.")},
{NULL, NULL, 0, NULL},
};

PyType_Spec SdBusSlotType = {
.name = "sd_bus_internals.SdBusSlot",
.basicsize = sizeof(SdBusSlotObject),
Expand All @@ -65,6 +75,7 @@ PyType_Spec SdBusSlotType = {
(PyType_Slot[]){
{Py_tp_new, PyType_GenericNew},
{Py_tp_dealloc, (destructor)SdBusSlot_dealloc},
{Py_tp_methods, SdBusSlot_methods},
{0, NULL},
},
};
Expand Down Expand Up @@ -154,11 +165,8 @@ PyMODINIT_FUNC PyInit_sd_bus_internals(void) {

asyncio_get_running_loop = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString(asyncio_module, "get_running_loop"));

asyncio_queue_class = CALL_PYTHON_AND_CHECK(PyObject_GetAttrString(asyncio_module, "Queue"));

set_result_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("set_result"));
set_exception_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("set_exception"));
put_no_wait_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("put_nowait"));
call_soon_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("call_soon"));
create_task_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("create_task"));
remove_reader_str = CALL_PYTHON_AND_CHECK(PyUnicode_FromString("remove_reader"));
Expand Down
2 changes: 0 additions & 2 deletions src/sdbus/sd_bus_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,10 @@

// Python functions and objects
extern PyObject* asyncio_get_running_loop;
extern PyObject* asyncio_queue_class;
extern PyObject* is_coroutine_function;
// Str objects
extern PyObject* set_result_str;
extern PyObject* set_exception_str;
extern PyObject* put_no_wait_str;
extern PyObject* add_reader_str;
extern PyObject* remove_reader_str;
extern PyObject* empty_str;
Expand Down
12 changes: 7 additions & 5 deletions src/sdbus/sd_bus_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from __future__ import annotations

from asyncio import Future, Queue
from asyncio import Future
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand Down Expand Up @@ -53,7 +53,9 @@

class SdBusSlot:
"""Holds reference to SdBus slot"""
...

def close(self) -> None:
raise NotImplementedError(__STUB_ERROR)


class SdBusInterface:
Expand Down Expand Up @@ -190,12 +192,12 @@ def add_interface(self, new_interface: SdBusInterface,
object_path: str, interface_name: str, /) -> None:
raise NotImplementedError(__STUB_ERROR)

def get_signal_queue_async(
def match_signal_async(
self,
senders_name: Optional[str], object_path: Optional[str],
interface_name: Optional[str], member_name: Optional[str],
/
) -> Future[Queue[SdBusMessage]]:
callback: Callable[[SdBusMessage], None], /
) -> Future[SdBusSlot]:
raise NotImplementedError(__STUB_ERROR)

def request_name_async(self, name: str, flags: int, /) -> Future[None]:
Expand Down
69 changes: 30 additions & 39 deletions src/sdbus/sd_bus_internals_bus.c
Original file line number Diff line number Diff line change
Expand Up @@ -376,39 +376,32 @@ static PyObject* SdBus_add_interface(SdBusObject* self, PyObject* args) {
}

int _SdBus_signal_callback(sd_bus_message* m, void* userdata, sd_bus_error* Py_UNUSED(ret_error)) {
PyObject* async_queue = userdata;
PyObject* signal_callback = userdata;

SdBusMessageObject* new_message_object CLEANUP_SD_BUS_MESSAGE = (SdBusMessageObject*)SD_BUS_PY_CLASS_DUNDER_NEW(SdBusMessage_class);
if (new_message_object == NULL) {
return -1;
}
PyObject* running_loop CLEANUP_PY_OBJECT = CALL_PYTHON_CHECK_RETURN_NEG1(PyObject_CallFunctionObjArgs(asyncio_get_running_loop, NULL));

SdBusMessageObject* new_message_object CLEANUP_SD_BUS_MESSAGE =
(SdBusMessageObject*)CALL_PYTHON_CHECK_RETURN_NEG1(SD_BUS_PY_CLASS_DUNDER_NEW(SdBusMessage_class));
_SdBusMessage_set_messsage(new_message_object, m);
PyObject* should_be_none CLEANUP_PY_OBJECT = PyObject_CallMethodObjArgs(async_queue, put_no_wait_str, new_message_object, NULL);
if (should_be_none == NULL) {
return -1;
}

Py_XDECREF(CALL_PYTHON_CHECK_RETURN_NEG1(PyObject_CallMethodObjArgs(running_loop, call_soon_str, signal_callback, new_message_object, NULL)));

return 0;
}

int _SdBus_match_signal_instant_callback(sd_bus_message* m, void* userdata, sd_bus_error* Py_UNUSED(ret_error)) {
PyObject* new_future = userdata;

if (!sd_bus_message_is_method_error(m, NULL)) {
PyObject* new_queue CLEANUP_PY_OBJECT = PyObject_GetAttrString(new_future, "_sd_bus_queue");
if (new_queue == NULL) {
return -1;
}
SdBusSlotObject* slot_object CLEANUP_SD_BUS_SLOT =
(SdBusSlotObject*)CALL_PYTHON_CHECK_RETURN_NEG1(PyObject_GetAttrString(new_future, "_sd_bus_slot"));

PyObject* should_be_none CLEANUP_PY_OBJECT = PyObject_CallMethodObjArgs(new_future, set_result_str, new_queue, NULL);
if (should_be_none == NULL) {
return -1;
}
Py_XDECREF(CALL_PYTHON_CHECK_RETURN_NEG1(PyObject_CallMethodObjArgs(new_future, set_result_str, slot_object, NULL)));

SdBusSlotObject* slot_object CLEANUP_SD_BUS_SLOT = (SdBusSlotObject*)PyObject_GetAttrString(new_queue, "_sd_bus_slot");
if (slot_object == NULL) {
return -1;
}
sd_bus_slot_set_userdata(slot_object->slot_ref, new_queue);
PyObject* signal_callback = CALL_PYTHON_CHECK_RETURN_NEG1(PyObject_GetAttrString(new_future, "_sd_bus_signal_callback"));

sd_bus_slot_set_userdata(slot_object->slot_ref, signal_callback);
sd_bus_slot_set_destroy_callback(slot_object->slot_ref, (sd_bus_destroy_t)Py_DecRef);
} else {
if (future_set_exception_from_message(new_future, m) < 0) {
return -1;
Expand All @@ -424,40 +417,38 @@ static int _unicode_or_none(PyObject* some_object) {
return (PyUnicode_Check(some_object) || (Py_None == some_object));
}

static PyObject* SdBus_get_signal_queue(SdBusObject* self, PyObject* const* args, Py_ssize_t nargs) {
SD_BUS_PY_CHECK_ARGS_NUMBER(4);
static PyObject* SdBus_match_signal_async(SdBusObject* self, PyObject* const* args, Py_ssize_t nargs) {
SD_BUS_PY_CHECK_ARGS_NUMBER(5);

SD_BUS_PY_CHECK_ARG_CHECK_FUNC(0, _unicode_or_none);
SD_BUS_PY_CHECK_ARG_CHECK_FUNC(1, _unicode_or_none);
SD_BUS_PY_CHECK_ARG_CHECK_FUNC(2, _unicode_or_none);
SD_BUS_PY_CHECK_ARG_CHECK_FUNC(3, _unicode_or_none);
SD_BUS_PY_CHECK_ARG_CHECK_FUNC(4, PyCallable_Check);

const char* sender_service_char_ptr = SD_BUS_PY_UNICODE_AS_CHAR_PTR_OPTIONAL(args[0]);
const char* path_name_char_ptr = SD_BUS_PY_UNICODE_AS_CHAR_PTR_OPTIONAL(args[1]);
const char* interface_name_char_ptr = SD_BUS_PY_UNICODE_AS_CHAR_PTR_OPTIONAL(args[2]);
const char* member_name_char_ptr = SD_BUS_PY_UNICODE_AS_CHAR_PTR_OPTIONAL(args[3]);
PyObject* signal_callback = args[4];
#else
static PyObject* SdBus_get_signal_queue(SdBusObject* self, PyObject* args) {
static PyObject* SdBus_match_signal_async(SdBusObject* self, PyObject* args) {
const char* sender_service_char_ptr = NULL;
const char* path_name_char_ptr = NULL;
const char* interface_name_char_ptr = NULL;
const char* member_name_char_ptr = NULL;
CALL_PYTHON_BOOL_CHECK(
PyArg_ParseTuple(args, "zzzz", &sender_service_char_ptr, &path_name_char_ptr, &interface_name_char_ptr, &member_name_char_ptr, NULL));
PyObject* signal_callback = NULL;
CALL_PYTHON_BOOL_CHECK(PyArg_ParseTuple(args, "zzzzO", &sender_service_char_ptr, &path_name_char_ptr, &interface_name_char_ptr, &member_name_char_ptr,
&signal_callback, NULL));
#endif
SdBusSlotObject* new_slot CLEANUP_SD_BUS_SLOT = (SdBusSlotObject*)CALL_PYTHON_AND_CHECK(SD_BUS_PY_CLASS_DUNDER_NEW(SdBusSlot_class));

PyObject* new_queue CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallFunctionObjArgs(asyncio_queue_class, NULL));

// Bind lifetime of the slot to the queue
CALL_PYTHON_INT_CHECK(PyObject_SetAttrString(new_queue, "_sd_bus_slot", (PyObject*)new_slot));

PyObject* running_loop CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallFunctionObjArgs(asyncio_get_running_loop, NULL));

PyObject* new_future CLEANUP_PY_OBJECT = CALL_PYTHON_AND_CHECK(PyObject_CallMethod(running_loop, "create_future", ""));

// Bind lifetime of the queue to future
CALL_PYTHON_INT_CHECK(PyObject_SetAttrString(new_future, "_sd_bus_queue", new_queue));
SdBusSlotObject* new_slot CLEANUP_SD_BUS_SLOT = (SdBusSlotObject*)CALL_PYTHON_AND_CHECK(SD_BUS_PY_CLASS_DUNDER_NEW(SdBusSlot_class));

// Bind lifetime of the slot to the Future
CALL_PYTHON_INT_CHECK(PyObject_SetAttrString(new_future, "_sd_bus_slot", (PyObject*)new_slot));
CALL_PYTHON_INT_CHECK(PyObject_SetAttrString(new_future, "_sd_bus_signal_callback", signal_callback));

CALL_SD_BUS_AND_CHECK(sd_bus_match_signal_async(self->sd_bus_ref, &new_slot->slot_ref, sender_service_char_ptr, path_name_char_ptr,
interface_name_char_ptr, member_name_char_ptr, _SdBus_signal_callback,
Expand Down Expand Up @@ -654,8 +645,8 @@ static PyMethodDef SdBus_methods[] = {
{"new_property_set_message", (SD_BUS_PY_FUNC_TYPE)SdBus_new_property_set_message, SD_BUS_PY_METH, PyDoc_STR("Create new empty property set message.")},
{"new_signal_message", (SD_BUS_PY_FUNC_TYPE)SdBus_new_signal_message, SD_BUS_PY_METH, PyDoc_STR("Create new empty signal message.")},
{"add_interface", (SD_BUS_PY_FUNC_TYPE)SdBus_add_interface, SD_BUS_PY_METH, PyDoc_STR("Add interface to the bus.")},
{"get_signal_queue_async", (SD_BUS_PY_FUNC_TYPE)SdBus_get_signal_queue, SD_BUS_PY_METH,
PyDoc_STR("Returns a future that returns a queue that queues signal messages.")},
{"match_signal_async", (SD_BUS_PY_FUNC_TYPE)SdBus_match_signal_async, SD_BUS_PY_METH,
PyDoc_STR("Register signal callback asynchronously. Returns a Future that returns a SdBusSlot.")},
{"request_name_async", (SD_BUS_PY_FUNC_TYPE)SdBus_request_name_async, SD_BUS_PY_METH, PyDoc_STR("Request D-Bus name async.")},
{"request_name", (SD_BUS_PY_FUNC_TYPE)SdBus_request_name, SD_BUS_PY_METH, PyDoc_STR("Request D-Bus name blocking.")},
{"add_object_manager", (SD_BUS_PY_FUNC_TYPE)SdBus_add_object_manager, SD_BUS_PY_METH, PyDoc_STR("Add object manager at the path.")},
Expand Down
18 changes: 13 additions & 5 deletions test/test_sdbus_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,14 +670,22 @@ async def too_long_wait() -> None:
async def test_singal_queue_wildcard_match(self) -> None:
test_object, test_object_connection = initialize_object()

message_queue = await self.bus.get_signal_queue_async(
loop = get_running_loop()
future = loop.create_future()

slot = await self.bus.match_signal_async(
TEST_SERVICE_NAME,
None, None, None)
None, None, None,
future.set_result)

test_object.test_signal.emit(('test', 'signal'))
try:
test_object.test_signal.emit(('test', 'signal'))

message = await wait_for(message_queue.get(), timeout=1)
self.assertEqual(message.member, "TestSignal")
await wait_for(future, timeout=1)
message = future.result()
self.assertEqual(message.member, "TestSignal")
finally:
slot.close()

async def test_class_with_string_subclass_parameter(self) -> None:
from enum import Enum
Expand Down

0 comments on commit 8b11d3f

Please sign in to comment.