Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring user-level systemd units status change #155

Open
honglei opened this issue Jun 30, 2023 · 0 comments
Open

Monitoring user-level systemd units status change #155

honglei opened this issue Jun 30, 2023 · 0 comments

Comments

@honglei
Copy link

honglei commented Jun 30, 2023

by #61, and changed bus_type to BusType.SESSION, but canot recv any message

import asyncio
import signal
import os


from dbus_next.aio import MessageBus
from dbus_next.message import Message
from dbus_next.constants import BusType, MessageType
from dbus_next.signature import Variant

def dbus_path_to_name(path):
    name = os.path.basename(path)
    name = name.replace('_40', '@')
    name = name.replace('_2e', '.')
    name = name.replace('_5f', '_')
    name = name.replace('_2d', '-')
    name = name.replace('_5c', '\\')
    return name

def message_handler(msg: Message ):
    name: str = dbus_path_to_name(msg.path)
    properties: dict[str, Variant] = msg.body[1]
    if 'ActiveState' not in properties:
        return False
    print(f"'unit: {name}, "
          f"ActiveState: {properties['ActiveState'].value}, "
          f"SubState:{properties['SubState'].value} ")
    return True

async def main():
    stop_event = asyncio.Event()
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, stop_event.set)
    loop.add_signal_handler(signal.SIGTERM, stop_event.set)

    bus = await MessageBus(bus_type=BusType.SESSION).connect()

    reply = await bus.call(Message(
        destination='org.freedesktop.DBus',
        path='/org/freedesktop/DBus',
        interface='org.freedesktop.DBus',
        member='AddMatch',
        signature='s',
        body=["path_namespace='/org/freedesktop/systemd1/unit',type='signal',interface='org.freedesktop.DBus.Properties'"],
        serial=bus.next_serial()
    ))
    assert reply.message_type == MessageType.METHOD_RETURN
    bus.add_message_handler(message_handler)
    await stop_event.wait()

asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant