-
Notifications
You must be signed in to change notification settings - Fork 6
/
client.py
205 lines (174 loc) · 6.2 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""Client code"""
import asyncio
import logging
import sys
from datetime import datetime, timedelta
from contextlib import contextmanager
from aionursery import Nursery, MultiError
from . import (
AnswerCodes,
ArcamException,
CommandCodes,
CommandPacket,
ConnectionFailed,
NotConnectedException,
ResponseException,
ResponsePacket,
_read_packet,
_write_packet
)
from .utils import Throttle, async_retry
_LOGGER = logging.getLogger(__name__)
_REQUEST_TIMEOUT = timedelta(seconds=3)
_REQUEST_THROTTLE = 0.2
_HEARTBEAT_INTERVAL = timedelta(seconds=5)
_HEARTBEAT_TIMEOUT = _HEARTBEAT_INTERVAL + _HEARTBEAT_INTERVAL
class Client:
def __init__(self, host, port, loop=None) -> None:
self._reader = None
self._writer = None
self._loop = loop if loop else asyncio.get_event_loop()
self._task = None
self._listen = set()
self._host = host
self._port = port
self._throttle = Throttle(_REQUEST_THROTTLE)
self._timestamp = datetime.now()
@property
def host(self):
return self._host
@property
def port(self):
return self._port
@property
def loop(self):
return self._loop
@contextmanager
def listen(self, listener):
self._listen.add(listener)
yield self
self._listen.remove(listener)
async def _process_heartbeat(self, writer):
while True:
delay = self._timestamp + _HEARTBEAT_INTERVAL - datetime.now()
if delay > timedelta():
await asyncio.sleep(delay.total_seconds())
else:
_LOGGER.debug("Sending ping")
await _write_packet(
writer,
CommandPacket(1, CommandCodes.POWER, bytes([0xF0]))
)
self._timestamp = datetime.now()
async def _process_data(self, reader):
try:
while True:
try:
packet = await asyncio.wait_for(
_read_packet(self._reader),
_HEARTBEAT_TIMEOUT.total_seconds()
)
except asyncio.TimeoutError as exception:
_LOGGER.warning("Missed all pings")
raise ConnectionFailed() from exception
if packet is None:
_LOGGER.info("Server disconnected")
return
_LOGGER.debug("Packet received: %s", packet)
for listener in self._listen:
listener(packet)
finally:
self._reader = None
async def process(self):
cancelled = set()
async def cancelled_watcher():
try:
while True:
await asyncio.sleep(100)
except asyncio.CancelledError:
cancelled.add(True)
try:
async with Nursery() as nursery:
nursery.start_soon(cancelled_watcher())
nursery.start_soon(self._process_data(self._reader))
nursery.start_soon(self._process_heartbeat(self._writer))
except MultiError as e:
if len(e.exceptions) == 1:
raise e.exceptions[0] from e
if cancelled:
raise asyncio.CancelledError
@property
def connected(self):
return self._reader is not None and not self._reader.at_eof()
@property
def started(self):
return self._writer is not None
async def start(self):
if self._writer:
raise ArcamException("Already started")
_LOGGER.debug("Connecting to %s:%d", self._host, self._port)
try:
self._reader, self._writer = await asyncio.open_connection(
self._host, self._port, loop=self._loop)
except ConnectionError as exception:
raise ConnectionFailed() from exception
except OSError as exception:
raise ConnectionFailed() from exception
_LOGGER.info("Connected to %s:%d", self._host, self._port)
async def stop(self):
if self._writer:
try:
_LOGGER.info("Disconnecting from %s:%d", self._host, self._port)
self._writer.close()
if sys.version_info >= (3, 7):
await self._writer.wait_closed()
except (ConnectionError, OSError):
pass
finally:
self._writer = None
self._reader = None
@async_retry(2, asyncio.TimeoutError)
async def _request(self, request: CommandPacket):
if not self._writer:
raise NotConnectedException()
writer = self._writer # keep copy around if stopped by another task
future = asyncio.Future()
def listen(response: ResponsePacket):
if (response.zn == request.zn and
response.cc == request.cc):
if not (future.cancelled() or future.done()):
future.set_result(response)
await self._throttle.get()
async def req():
_LOGGER.debug("Requesting %s", request)
with self.listen(listen):
await _write_packet(writer, request)
self._timestamp = datetime.now()
return await future
return await asyncio.wait_for(
req(),
_REQUEST_TIMEOUT.total_seconds())
async def request(self, zn, cc, data):
response = await self._request(CommandPacket(zn, cc, data))
if response.ac == AnswerCodes.STATUS_UPDATE:
return response.data
raise ResponseException.from_response(response)
class ClientContext:
def __init__(self, client: Client):
self._client = client
self._task = None
async def __aenter__(self):
await self._client.start()
self._task = asyncio.ensure_future(
self._client.process(),
loop=self._client.loop
)
return self._client
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
await self._client.stop()