Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

actor system: low-level signal layer #6

Closed
DrPyser opened this issue Nov 22, 2019 · 2 comments
Closed

actor system: low-level signal layer #6

DrPyser opened this issue Nov 22, 2019 · 2 comments

Comments

@DrPyser
Copy link
Owner

DrPyser commented Nov 22, 2019

The actor system in concurrency/threads/thread_actor.py needs a low-level signal interface to handle such things as exit signals.
Basically, there needs to be a side-channel beside the mailbox to handle exit signals. Maybe another queue?
The trick is to simultaneously handle events from this low-level signal queue and run user code, which may block waiting on mailbox events.
This suggests the user code has to cooperate with the underlying framework and allow it to process this signal queue alongside running the user code. Two models come to mind:

  • Use an async/await coroutine framework like asyncio/trio as basis for framework. blocking queues would be replaced with async equivalent from those libraries, and interacting with mailbox or signal queue would be done with async code.
    • Pro: can exploit existing async/await-friendly libraries and tools
    • Con: Adds complexity when dealing with blocking sync code(must be executed in separate thread)
    • Con: adds overall complexity to system by adding another layer of concurrency
  • Use custom coroutine framework: user code must be a coroutine generator. Interaction with the mailbox and actor system could be done by yielding requests objects, giving the framework a chance to also check for low-level signals. e.g.:
    def ping(pong):
        yield send("ping", pong, self())
        while True:
            message, key = yield receive([
                ("pong", lambda m: m.message == "pong"), 
                ("terminate", lambda m: m.message == "terminate")
            ])
            if key == "pong":
                print(f"Received pong from {message.sender}")
            elif key == "terminate":
                print(f"Received terminate message from {message.sender}")
                # we could ignore the message and continue running
                break
    On each yield, the underlying framework running this code could first check if a signal has been received in the signal queue. For example, if an exit signal is received, the framework would stop running the user code and terminate the thread.
    It could also call throw(ExitSignalError(signal.reason)) on the generator to give user code a chance to do cleanup before terminating. Or like in erlang, different type of exit signals could have different behaviors, and e.g. a "kill" signal would just break out of the loop and terminate as quickly as possible without giving user code opportunity to cleanup.
    • Pro: Custom made to use case, could mean minimal complexity
      Con: Harder to integrate with async/await code.
      Pro: Simple and flexible: can add new features to framework by adding handler for yielded requests.
@DrPyser
Copy link
Owner Author

DrPyser commented Nov 24, 2019

Turns out, just using a coroutine to interact with the mailbox while having a separate queue for signals is not enough. Both queue must be waited on together, and blocking on one prevents the other from being processed. A few approaches

  • All blocking calls on queues(signal or mailbox) must use poll looping. A receive call on the mailbox would always be implemented by doing a poll loop(e.g. doing a blocking get with a small timeout value) on the queue, and yielding on each loop to also check on the signal queue. The problem is that this would make implementing timeout on the receive call difficult(it already is broken though, since a receive with a timeout can block multiple times on the queue, as long as each individual block is less than the timeout).
  • Implement a "selectable queue" that would enable usage of select.select to implement simultaneous waiting on the two queues. This would involve associating os file descriptors to queues, and overriding write methods(e.g. put, put_nowait) to manipulate the file descriptors.
  • Use a single queue: signals would go through the mailbox as well, but the implementation would make sure signals are processed first independently of user expectations.
  • Use an async framework(e.g. Trio) as underlying implementation to manage all events(signals, messages). The user code doesn't need to know: it can still be a coroutine generator and use sync code. The coroutine would only need to yield often enough to allow the framework to handle events. This is always true anyway.

@DrPyser
Copy link
Owner Author

DrPyser commented Nov 24, 2019

The third approach was implemented as of ff50961 using asyncio. A Trio-based reimplementation could be attempted.

  • Each ThreadActor runs its own asyncio event loop
  • signal queue and mailbox queue become asyncio.Queue.
  • Handling Receive calls involve simultaneous waiting on the signal queue and the mailbox queue, using concurrency.utils.select(itself implemented using concurrency.utils.race).
  • User-specified Receive timeouts are implemented using asyncio.wait_for on the queue iteration, which means it has more correct timeout semantics.
  • System methods that interact with actors(e.g. sending a mailbox message using send, sending signals using send_signal) now use asyncio.run_coroutine_threadsafe to interact with asyncio.Queue objects in the actor's thread.

@DrPyser DrPyser closed this as completed Dec 4, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant