-
Notifications
You must be signed in to change notification settings - Fork 0
99_Posix_ipc
Warning
This page will not teach you what IPC is.
In the vast majority of cases, you should use multiprocessing or threading inter-process communication instead.
You need to install nob.py with ipc extra: nob.py[ipc]. It is based on - and installs the posix_ipc library on supported platforms.
The handle_existence parameter controls the behavior regarding the existence of the various named IPC objects. It accepts the following values:
-
RAISE_IF_EXISTS: Creates a new posix handle, raises an error if it already exists. -
LINK_OR_CREATE: Links to the existing posix handle if it exists. -
RAISE_IF_NOT_EXISTS: Links to the existing posix handle if it exists, raises an error otherwise. -
UNLINK_AND_CREATE: Deletes the existing posix handle and creates a new one.
The handle is automatically unlinked when the object is deleted if it was created by this handle. Else, it is only closed.
from nob.ipc import Flags, NamedSemaphore
sem = NamedSemaphore("/my_semaphore", handle_existence=Flags.LINK_OR_CREATE)Semaphore supports context manager, which automatically acquires and releases (once) the semaphore when the block is entered and exited, respectively:
with NamedSemaphore("/my_semaphore", handle_existence=Flags.LINK_OR_CREATE) as sem:
# critical sectionThe NamedSemaphore object has a similar interface to threading.Semaphore and multiprocessing.Semaphore. You can use acquire and release to wait and post the semaphore, respectively.
The acquire method also accepts:
-
blocking: Whether to block until the semaphore is available. IfFalse, the method will return immediately withTrueif the semaphore was acquired, andFalseotherwise. -
timeout: If provided, the method will block for at mosttimeoutseconds. If the semaphore is not acquired within this time, the method will return False. If not provided, the method will block indefinitely ifblockingis True. Not supported on macOS.
You may release the semaphore multiple times by calling release with n greater than 1.
sem.acquire()
# critical section
sem.release()from nob.ipc import Flags, NamedSharedMemory
shm = NamedSharedMemory("/my_shared_memory", size=1024, handle_existence=Flags.LINK_OR_CREATE)import mmap
memory = mmap.mmap(shm.fd, shm.size)You may also use .mmap() directly on the NamedSharedMemory object.
The NamedSharedMemory object can also be used as a context manager, which will automatically close the shared memory when the block is exited.
with NamedSharedMemory("/my_shared_memory", size=1024, handle_existence=Flags.LINK_OR_CREATE) as shm:
memory = shm.mmap()
# use memory or do something else with shmOr:
with NamedSharedMemory("/my_shared_memory", size=1024, handle_existence=Flags.LINK_OR_CREATE).mmap() as memory:
# use memoryfrom nob.ipc import Flags, NamedMessageQueue
mq = NamedMessageQueue("/my_message_queue", handle_existence=Flags.LINK_OR_CREATE)The message queue also supports context manager, which will automatically close the message queue when the block is exited:
with NamedMessageQueue("/my_message_queue", handle_existence=Flags.LINK_OR_CREATE) as mq:
# use mqYou can use the send and receive methods to send and receive messages, respectively. The send method accepts a bytes-like object or a string (which will be encoded to bytes using UTF-8), while the receive method returns a tuple of the received message and its priority.
mq.send(b"Hello, world!")
message, prio = mq.receive()Both methods are blocking by default if timeout=None is passed (which is the default value). This means that:
-
.send()blocks until the message is sent (it waits until the queue has an empty slot to put the message in). -
.receive()blocks until a message is received (it waits until the queue has a message to receive).
Additionally, you can order messages by priority. The send method accepts an optional priority parameter (default is 0), and the receive method returns the priority of the received message as the second element of the returned tuple - unless timeout is exhausted, in which case it returns None.
from nob import cli
from nob.ipc import Flags, NamedMessageQueue
MQ_NAME = "/test_queue"
@cli.grp()
def main(): ...
@cli.cmd(main)
@cli.arg("msg", default=None, help="Message to send in SEND phase")
def send(lg: cli.Logger, msg: str | None):
"""Send a message to an existing message queue."""
with NamedMessageQueue(MQ_NAME, handle_existence=Flags.RAISE_IF_NOT_EXISTS) as mq:
mq.send(msg or "Hello from POSIX IPC")
lg.info("Message sent")
@cli.cmd(main)
def receive(lg: cli.Logger):
"""Create a message queue and receive a message from it."""
with NamedMessageQueue(MQ_NAME, handle_existence=Flags.RAISE_IF_EXISTS) as mq:
message, prio = mq.receive()
lg.info("Received message: '%s' with priority %d", message.decode(), prio)
if __name__ == "__main__":
main()Start the receiver in one terminal:
uv run <file.py> receiveAnd then, the sender in another terminal:
uv run <file.py> send "Hello there!"Powered by caffeine and uv.
MIT license.