Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port accessories to asyncio #76

Open
wants to merge 4 commits into
base: asyncio_run
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pyhap/accessories/AM2302.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
The DHT22 module was taken from
https://www.raspberrypi.org/forums/viewtopic.php?f=37&t=71336
"""
import asyncio
import time
import random

Expand All @@ -19,7 +20,7 @@ class AM2302(Accessory):
category = Category.SENSOR

def __init__(self, *args, pin=4, **kwargs):
super(AM2302, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
self.pin = pin

self.temp_char = self.get_service("TemperatureSensor")\
Expand All @@ -31,25 +32,26 @@ def __init__(self, *args, pin=4, **kwargs):
self.sensor = DHT22.sensor(pigpio.pi(), pin)

def _set_services(self):
super(AM2302, self)._set_services()
super()._set_services()
self.add_service(
loader.get_serv_loader().get("TemperatureSensor"))
self.add_service(
loader.get_serv_loader().get("HumiditySensor"))

def __getstate__(self):
state = super(AM2302, self).__getstate__()
state = super().__getstate__()
state["sensor"] = None
return state

def __setstate__(self, state):
self.__dict__.update(state)
self.sensor = DHT22.sensor(pigpio.pi(), self.pin)

def run(self):
while not self.run_sentinel.wait(10):
async def run(self):
while not stop_event.is_set():
await asyncio.sleep(10)
self.sensor.trigger()
time.sleep(0.2)
await acyncio.sleep(0.2)
t = self.sensor.temperature()
h = self.sensor.humidity()
self.temp_char.set_value(t)
Expand Down
7 changes: 5 additions & 2 deletions pyhap/accessories/BMP180.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# Assume you have a bmp module with BMP180 class with read() method.
from sensors.bmp180 import BMP180 as sensor

import asyncio

from pyhap.accessory import Accessory, Category
import pyhap.loader as loader

Expand Down Expand Up @@ -33,7 +35,8 @@ def __setstate__(self, state):
self.__dict__.update(state)
self.sensor = sensor()

def run(self):
while not self.run_sentinel.wait(30):
async def run(self, stop_event, loop=None):
while not stop_event.is_set():
await asyncio.sleep(10)
temp, _pressure = self.sensor.read()
self.temp_char.set_value(temp)
10 changes: 6 additions & 4 deletions pyhap/accessories/DisplaySwitch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# An Accessory for viewing/controlling the status of a Mac display.
import asyncio
import subprocess

from pyhap.accessory import Accessory, Category
Expand Down Expand Up @@ -26,19 +27,20 @@ class DisplaySwitch(Accessory):
category = Category.SWITCH

def __init__(self, *args, **kwargs):
super(DisplaySwitch, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)

self.display = self.get_service("Switch")\
.get_characteristic("On")
self.display.setter_callback = self.set_display

def _set_services(self):
super(DisplaySwitch, self)._set_services()
super()._set_services()
self.add_service(
loader.get_serv_loader().get("Switch"))

def run(self):
while not self.run_sentinel.wait(1):
async def run(self, stop_event, loop=None):
while not stop_event.is_set():
await asyncio.sleep(1)
# We can't just use .set_value(state), because that will
# trigger our listener.
state = get_display_state()
Expand Down
112 changes: 30 additions & 82 deletions pyhap/accessories/Http.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,52 @@
import threading
import logging
from http.server import HTTPServer, BaseHTTPRequestHandler
from aiohttp import web

from pyhap.accessory import Bridge, Category

logger = logging.getLogger(__name__)


class HttpBridgeHandler(BaseHTTPRequestHandler):
class HttpBridgeHandler(web.Application):
"""Handles requests and passes value updates to an HttpAccessory.

The POST request should contain json data with the format:
{ "aid": <aid>
{ "aid": <aid>,
"services": {
<service>: {
<characteristic>: value,
<characteristic>: <value>
}
}
}

Example:
{ "aid": 2
{ "aid": 2,
"services": {
TemperatureSensor" : {
"TemperatureSensor" : {
"CurrentTemperature": 20
}
}
}
"""

def __init__(self, http_accessory, sock, client_addr, server):
def __init__(self, http_accessory):
"""Create a handler that passes updates to the given HttpAccessory.
"""
super().__init__()

self.http_accessory = http_accessory
super(HttpBridgeHandler, self).__init__(sock, client_addr, server)
self.add_routes([web.post('/', self.post_handler)])

def respond_ok(self):
"""Reply with code 200 (OK) and close the connection.
"""
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", 0)
self.end_headers()
self.close_connection = 1

def respond_err(self):
"""Reply with code 400 and close the connection.
"""
self.send_response(400)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", 0)
self.end_headers()
self.close_connection = 1

def do_POST(self):
"""Read the payload as json and update the state of the accessory.
"""
length = int(self.headers["Content-Length"])
async def post_handler(self, request):
try:
# The below decode is necessary only for python <3.6, because loads prior 3.6
# doesn't know bytes/bytearray.
content = self.rfile.read(length).decode("utf-8")
data = json.loads(content)
data = await request.json()
await self.http_accessory.update_state(data)
except Exception as e:
logger.error("Bad POST request; Error was: %s", str(e))
self.respond_err()
else:
self.http_accessory.update_state(data)
self.respond_ok()
return web.Response(text="Bad POST", status=400)

return web.Response(text="OK")


class HttpBridge(Bridge):
Expand Down Expand Up @@ -127,48 +106,19 @@ class HttpBridge(Bridge):

category = Category.OTHER

def __init__(self, address, *args, **kwargs):
def __init__(self, *args, address, **kwargs):
"""Initialise and add the given services.

@param address: The address-port on which to listen for requests.
@type address: tuple(str, int)

@param accessories:
"""
super(HttpBridge, self).__init__(*args, **kwargs)

# For exclusive access to updates. Slight overkill...
self.update_lock = None
self.server_thread = None
self._set_server(address)
super().__init__(*args, **kwargs)

def _set_server(self, address):
"""Set up a HTTPServer to listen on the given address.
"""
self.server = HTTPServer(address, lambda *a: HttpBridgeHandler(self, *a))
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.update_lock = threading.Lock()
self.address = address

def __getstate__(self):
"""Return the state of this instance, less the server and server thread.

Also add the server address. All this is because we cannot pickle such
objects and to allow to recover the server using the address.
"""
state = super(HttpBridge, self).__getstate__()
state["server"] = None
state["server_thread"] = None
state["update_lock"] = None
state["address"] = self.server.server_address
return state

def __setstate__(self, state):
"""Load the state and set up the server with the address in the state.
"""
self.__dict__.update(state)
self._set_server(state["address"])

def update_state(self, data):
async def update_state(self, data):
"""Update the characteristics from the received data.

Expected to be called from HapHttpHandler. Updates are thread-safe.
Expand All @@ -192,19 +142,17 @@ def update_state(self, data):
service_obj = accessory.get_service(service)
for char, value in char_data.items():
char_obj = service_obj.get_characteristic(char)
with self.update_lock:
char_obj.set_value(value)

def stop(self):
"""Stop the server.
"""
super(HttpBridge, self).stop()
logger.debug("Stopping HTTP bridge server.")
self.server.shutdown()
self.server.server_close()
char_obj.set_value(value)

def run(self):
async def run(self, stop_event, loop=None):
"""Start the server - can listen for requests.
"""
logger.debug("Starting HTTP bridge server.")
self.server_thread.start()
app = HttpBridgeHandler(self)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self.address[0], self.address[1])
await site.start()

await stop_event.wait()
await runner.cleanup()
7 changes: 5 additions & 2 deletions pyhap/accessories/TSL2591.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import tsl2591

import asyncio

from pyhap.accessory import Accessory, Category
import pyhap.loader as loader

Expand Down Expand Up @@ -32,8 +34,9 @@ def __setstate__(self, state):
self.__dict__.update(state)
self.tsl = tsl2591.Tsl2591()

def run(self):
while not self.run_sentinel.wait(10):
async def run(self, stop_event, loop=None):
while not stop_event.is_set():
await asyncio.sleep(10)
full, ir = self.tsl.get_full_luminosity()
lux = min(max(0.001, self.tsl.calculate_lux(full, ir)), 10000)
self.lux_char.set_value(lux)
Expand Down
31 changes: 22 additions & 9 deletions pyhap/accessory_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,22 +108,35 @@ class AIOThread(threading.Thread):
def __init__(self, run_method):
"""
"""
self.run_method = run_method
super().__init__()

def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)

self.stop_event = asyncio.Event()
self.task = self.loop.create_task(run_method(self.stop_event, self.loop))
super(AIOThread, self).__init__(target=self.loop.run_until_complete,
args=(self.task,))
self.task = self.loop.create_task(self.run_method(self.stop_event, self.loop))

def run(self):
self.loop.run_forever()
self.loop.close()
logger.debug("Sucessfully stopped accessory event loop.")

async def shutdown(self):
logger.debug("Shutting down accessory event loop")
self.stop_event.set()
try:
super(AIOThread, self).run()
except CancelledError:
await asyncio.wait_for(self.task, timeout=5)
except asyncio.TimeoutError:
logger.info("Accessory task did not shutdown within 5 seconds")
finally:
self.loop.stop()
self.loop.close()
logger.info("Sucessfully stopped accessory event loop.")

def safe_shutdown(self):
task = self.loop.create_task(self.shutdown())

def stop(self):
self.loop.call_soon_threadsafe(self.task.cancel)
self.loop.call_soon_threadsafe(self.safe_shutdown)

class AccessoryDriver(object):
"""
Expand Down
3 changes: 2 additions & 1 deletion requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ ed25519
zeroconf
curve25519-donna
pyqrcode
base36
base36
aiohttp==3.1.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that we add a dependency that is only used for an optional accessory. If that's the way to go, we should move all optional accessories elsewhere #43.