Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move to coroutines for Accessory.run #74

Merged
merged 6 commits into from
Apr 13, 2018
Merged

[WIP] Move to coroutines for Accessory.run #74

merged 6 commits into from
Apr 13, 2018

Conversation

ikalchev
Copy link
Owner

Problems to solve:

  • Support non-coroutine run methods - not all code could switch to
    asyncio, these should be started in a thread as before.
  • Port accessories/*

Problems to solve:
- Support non-coroutine run methods - not all code could switch to
asyncio, these should be started in a thread as before.
- Port accessories/*
@ikalchev ikalchev changed the title Move to coroutines for Accessory.run [WIP] Move to coroutines for Accessory.run Mar 31, 2018
@ikalchev
Copy link
Owner Author

First steps for #64

AccessoryDriver.stop still fails, because the stop_event is an
asyncio.Event being set from another thread.
@thomaspurchas
Copy link

Do you have a plan of work for this? Happy to pitch in, just don't want to end up doing the same work twice.

@cdce8p
Copy link
Contributor

cdce8p commented Mar 31, 2018

I'm by far an expert on async. Quite a while ago Home Assistant converted their core to async and I think that might be a good help to take a look at: https://github.com/home-assistant/home-assistant/blob/dev/homeassistant/core.py

Some takeaways they do:

  • The basis in async and everything that still uses threads is run through a ThreadPoolExecutor
  • Helper methods to append methods / coroutines to the loop / executer, without the need to pass loop to every function

@ikalchev
Copy link
Owner Author

ikalchev commented Apr 1, 2018

@cdce8p this was insightful, thanks. It's pretty similar to where I was heading, except the pool, which is much cleaner.

@thomaspurchas first let me briefly describe the threads in HAP-python and their purpose:

  • The server thread - this listens for iOS clients and spawns a new thread to handle each new connection.
  • The accessory threads - This is either one thread or multiple threads, depending on if there is a single accessory or a bridge containing multiple accessories. Each of these runs the Accessory.run. Eventually, value updates (such as reading the temperature from a sensor) result in an event being sent to iOS clients. This is achieved by each thread writing the event to a thread-safe queue (the event queue).
  • The "send events" thread - Because we don't want the Accessory threads to block waiting for the event to be sent, there is another thread which reads from the event queue and sends updates to clients. This also handles stale connections, etc.

The biggest "thread overhead" comes from the accessory threads because you have N threads for N accessories. What is more, they mostly sleep and do GPIO (e.g. read the sensor every X minutes). So if I have to outline a roadmap for this:

  • Move from accessory threads to a mix of coroutines and threads.
  • Evaluate if moving to aiohttp is worth it. This is not clear, because each client retains an open two-way connection and I am not entirely sure that moving to asyncio will be better. Some factors are the average number of clients and the average frequency of events coming from the accessories.

@thomaspurchas
Copy link

Could you elaborate a bit more on:

Evaluate if moving to aiohttp is worth it. This is not clear, because each client retains an open two-way connection and I am not entirely sure that moving to asyncio will be better. Some factors are the average number of clients and the average frequency of events coming from the accessories

If asyncio is anything like Twisted (which did async stuff and co-routines in Python a decade ago), then using aiohttp will still be worth it. The connection might stay open, but you hand-off waiting for data to the event loop, which will sit on network file-descriptor using select or poll/epoll waiting for data. So you code will only be called when data is moving about. Using asyncio everywhere will prevent botched code where we need to hand data between the event loop, and threads, which is never nice.

I'll do a little research to get a better idea of how hard if will be to do long-polling in aiohttp. I would agree though, that moving to aiohttp is probably lower priority.

@thomaspurchas
Copy link

After doing the little work I did. I realised that it might be a good move to split the Accessory class into an AsyncAccessory and SyncAccessory.

That would make it much easier to reason about the code because their would be clear separation. Then the rest of HAP-Python can be rewritten with async support everywhere, and older stuff can just use the SyncAccessory class for backwards compatibility.

self.loop = asyncio.new_event_loop()
self.stop_event = asyncio.Event()
self.task = self.loop.create_task(run_method(self.stop_event, self.loop))
super(AIOThread, self).__init__(target=self.loop.run_until_complete,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need for super(AIOThread, self).__init__(, this can be shorted to super().__init__( in Python 3 and higher.


def stop(self):
self.loop.call_soon_threadsafe(self.task.cancel)

Copy link

@thomaspurchas thomaspurchas Apr 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this class is safe. It looks like you are using a single event loop from multiple threads here. I'm 99% sure you shouldn't do that. (I've had exceptions thrown as a result).

Instead the event loop should be setup after thread creation like this:

class AIOThread(threading.Thread):
    """
    TODO: temporary convenience class. Create an event loop and event object for
    controlling the Accessory.run method.
    """

    def __init__(self, run_method):
        """
        """
        self.run_method = run_method
        super().__init__()

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

        self.stop_event = asyncio.Event()
        self.task = self.loop.create_task(self.run_method(self.stop_event, self.loop))

        self.loop.run_forever()
        self.loop.close()
        logger.debug("Sucessfully stopped accessory event loop.")

    async def shutdown(self):
        logger.debug("Shutting down accessory event loop")
        self.stop_event.set()
        try:
            await asyncio.wait_for(self.task, timeout=5)
        except asyncio.TimeoutError:
            logger.info("Accessory task did not shutdown within 5 seconds")
        finally:
            self.loop.stop()

    def safe_shutdown(self):
        task = self.loop.create_task(self.shutdown())

    def stop(self):
        self.loop.call_soon_threadsafe(self.safe_shutdown)

This spins up an event loop in each thread, and uses the stop_event to give an accessory an opportunity to shutdown cleanly, before forcing the event loop to stop.

I'm gonna include this in my other pull request as it was necessary to convert the HTTP bridge over to aiohttp.

This was referenced Apr 2, 2018
This change moves to a cleaner use of asyncio. Introduced
Accessory.start which creates a task for Accessory.run. The idea is to
maintain compatibility with clients that cannot support coroutines and
to allow Accessories to be run both as a thread and as a coroutine.

This is still work in progress.
@ikalchev
Copy link
Owner Author

ikalchev commented Apr 4, 2018

Hi all,

I posted a new version and I think I am almost happy with it. There a few things I need to address, but overall I think it ramps up to be a nice solution. I want to hear what's on your mind, if you wish, so go ahead.

A summary will be:

  • Don't break users already using the library.
  • Thus, we want both threads and coroutines from a single interface
  • This single interface is Accessory.start for the Driver side, while we keep Accessory.run as the user side.
  • Accessory.run can be both sync and async. In the former case, Accessory.start wraps that in a coroutine that starts a thread, so that we can iterate over multiple Accessory objects and schedule their run methods as tasks.
  • To make all of this work and avoid using task.cancel, I added event_wait in the util module
  • There is also a fancy decorator that allows new users to drop the use of while not event.set.. and just do:
@Accessory.repeat(3)
async def run(self):
    print("Hello world")  # prints every 3 seconds
  • Finally, to synchronise all these I pass a threading.Event, a asyncio.Event and an event loop to the Accessories. This is the part I would like to refine a bit.

In the meantime, I will try to setup a new async branch as suggested and go trough all the PRs and comments as soon as I can, sorry for the delay.

@@ -377,13 +381,42 @@ def setup_message(self):
self.print_qr()
print('Or enter this code in your HomeKit app on your iOS device: %s' % self.pincode.decode())

async def run(self, stop_event, loop=None):
def repeat(seconds):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename every or interval. repeat give the impression the decorated function will be called 3 times, then stop.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. run_at_interval?

"""Set the same sentinel to all contained accessories."""
super(Bridge, self).set_sentinel(run_sentinel)
super(Bridge, self).set_sentinel(run_sentinel, aio_stop_event, event_loop)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for super(Bridge, self).set_sentinel(). super().set_sentinel is all that is needed.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

self.accessory_task = self.event_loop.create_task(self.accessory.start())
logger.info("Starting event loop")
self.event_loop.run_until_complete(self.accessory_task)
self.event_loop.stop()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to call stop. run_until_complete already does this for you. (In fact, run_until_complete can only return once the loop has stopped)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -185,6 +159,9 @@ def __init__(self, accessory, port, address=None, persist_file="accessory.state"
self.persist()
self.topics = {} # topic: set of (address, port) of subscribed clients
self.topic_lock = threading.Lock() # for exclusive access to the topics
self.event_loop = asyncio.get_event_loop()
self.aio_stop_event = asyncio.Event()
Copy link

@thomaspurchas thomaspurchas Apr 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a better, cleaner way to do this.

It should be possible to call stop_event.wait() using event_loop.run_in_executor() to wait on the threaded event in a separate thread managed by asyncio. Then put a callback on the returned future to call aio_stop_event.set() back in the main thread.

That way calling stop_event.set() anywhere is both threadsafe, and guaranteed to stop both threads and the event_loop.

I'll put together a PR once I figured out the nicest way the structure the code.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it will be better. Do you mind if we leave it for another change?

@thomaspurchas
Copy link

With regards to:

  • This single interface is Accessory.start for the Driver side, while we keep Accessory.run as the user side.
  • Accessory.run can be both sync and async. In the former case, Accessory.start wraps that in a coroutine that starts a thread, so that we can iterate over multiple Accessory objects and schedule their run methods as tasks.

I think it might be an idea to move the async stuff into a separate AsyncAccessory class. Because more than run should be a co-routines. Ideally any char callbacks should also be co-routines.

This way we can encourage either 100% sync or 100% async accessories, and prevent accessories mixing both approaches (which would be nightmare from a execution safely perspective).

@ikalchev
Copy link
Owner Author

ikalchev commented Apr 4, 2018

Hopefully a fancier change

Copy link

@thomaspurchas thomaspurchas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome :)

@ikalchev ikalchev merged commit d675c4b into dev Apr 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants