diff --git a/custom_components/mbapi2020/websocket.py b/custom_components/mbapi2020/websocket.py index 82aa923..24a9092 100644 --- a/custom_components/mbapi2020/websocket.py +++ b/custom_components/mbapi2020/websocket.py @@ -7,15 +7,13 @@ from typing import Optional import uuid -import aiohttp +from aiohttp import ClientSession, WSMsgType, client_exceptions from homeassistant.const import EVENT_HOMEASSISTANT_STOP from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession -from homeassistant.helpers.dispatcher import async_dispatcher_send from .const import ( - DOMAIN, REGION_APAC, REGION_EUROPE, REGION_NORAM, @@ -30,19 +28,14 @@ WEBSOCKET_USER_AGENT, WEBSOCKET_USER_AGENT_PA, ) +from .errors import WebsocketError from .helper import UrlHelper as helper from .oauth import Oauth from .proto import vehicle_events_pb2 -DEFAULT_WATCHDOG_TIMEOUT = 3600 - -STATE_INIT = "initializing" -STATE_CONNECTING = "connecting" +DEFAULT_WATCHDOG_TIMEOUT = 6600 STATE_CONNECTED = "connected" -STATE_AUTH_INVALID = "auth_invalid" -STATE_AUTH_REQUIRED = "auth_required" STATE_RECONNECTING = "reconnecting" -STATE_DISCONNECTED = "disconnected" LOGGER = logging.getLogger(__name__) @@ -75,8 +68,6 @@ async def on_expire(self): async def trigger(self): """Trigger the watchdog.""" - # LOGGER.debug("Watchdog triggered – sleeping for %s seconds", self._timeout) - if self._timer_task: self._timer_task.cancel() @@ -96,13 +87,8 @@ def __init__(self, hass, oauth, region) -> None: self._region = region self.connection_state = "unknown" self.is_connecting = False - self._watchdog: WebsocketWatchdog = WebsocketWatchdog(self._disconnected) - - def set_connection_state(self, state): - """Change current connection state.""" - signal = f"{DOMAIN}" - async_dispatcher_send(self._hass, signal, state) - self.connection_state = state + self._watchdog: WebsocketWatchdog = WebsocketWatchdog(self.initiatiate_connection_reset) + self._queue = asyncio.Queue() async def async_connect(self, on_data) -> None: """Connect to the socket.""" @@ -115,32 +101,17 @@ async def _async_stop_handler(event): await self.async_stop() self._on_data_received = on_data + self.is_connecting = True - await self._watchdog.trigger() - + self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_handler) session = async_get_clientsession(self._hass, VERIFY_SSL) - self.set_connection_state(STATE_CONNECTING) - - websocket_url = helper.Websocket_url(self._region) - while True: - try: - headers = await self._websocket_connection_headers() - self.is_connecting = True - LOGGER.info("Connecting to %s", websocket_url) - self._connection = await session.ws_connect(websocket_url, headers=headers, proxy=SYSTEM_PROXY) - except aiohttp.client_exceptions.ClientError as exc: - LOGGER.error("Could not connect to %s, retry in 10 seconds...", websocket_url) - LOGGER.debug(exc) - self.set_connection_state(STATE_RECONNECTING) - await asyncio.sleep(10) - else: - self.is_connecting = False - LOGGER.info("Connected to mercedes websocket at %s", websocket_url) - break - self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_handler) + # Tasks erstellen + queue_task = asyncio.create_task(self._start_queue_handler()) + websocket_task = asyncio.create_task(self._start_websocket_handler(session)) - asyncio.ensure_future(self._recv()) + # Warten, dass Tasks laufen + await asyncio.gather(queue_task, websocket_task) async def async_stop(self): """Close connection.""" @@ -149,65 +120,35 @@ async def async_stop(self): if self._connection is not None: await self._connection.close() - def _decode_message(self, res_raw): - res = vehicle_events_pb2.PushMessage() - res.ParseFromString(res_raw) - return res - - async def _disconnected(self): - self.set_connection_state(STATE_DISCONNECTED) - if not self._is_stopping: - asyncio.ensure_future(self.async_connect(self._on_data_received)) - - async def _recv(self): - while not self._connection.closed: - self.set_connection_state(STATE_CONNECTED) - - try: - data = await self._connection.receive() - except aiohttp.client_exceptions.ClientError as err: - LOGGER.debug("remote websocket connection closed: %s", err) - break - except ConnectionResetError as cr_err: - LOGGER.debug("remote websocket connection closed cr: %s", cr_err) - break - except RuntimeError as err: - LOGGER.debug("remote websocket connection closed cr: %s", err) - break - - await self._watchdog.trigger() - - if not data: - break - - if data.type == aiohttp.WSMsgType.PING: - LOGGER.debug("websocket connection PING ") + async def initiatiate_connection_reset(self): + """Initiate a connection reset.""" + if self._connection is not None: + await self._connection.close() - if data.type == aiohttp.WSMsgType.PONG: - LOGGER.debug("websocket connection PONG ") + async def call(self, message): + """Send a message to the MB websocket servers.""" + try: + await self._connection.send_bytes(message) + except client_exceptions.ClientError as err: + LOGGER.error("remote websocket connection closed: %s", err) - if data.type in ( - aiohttp.WSMsgType.CLOSE, - aiohttp.WSMsgType.CLOSED, - aiohttp.WSMsgType.CLOSING, - ): - LOGGER.debug("websocket connection is closing") - break + async def _start_queue_handler(self): + while not self._is_stopping: + await self._queue_handler() - if data.type == aiohttp.WSMsgType.ERROR: - LOGGER.error("websocket connection had an error") - break + async def _queue_handler(self): + while not self._is_stopping: + data = await self._queue.get() try: - if data.type == aiohttp.WSMsgType.BINARY: - message = self._decode_message(data.data) + message = vehicle_events_pb2.PushMessage() + message.ParseFromString(data) except TypeError as err: LOGGER.error("could not decode data (%s) from websocket: %s", data, err) break if message is None: break - LOGGER.debug("Got notification: %s", message.WhichOneof("msg")) ack_message = self._on_data_received(message) if ack_message: @@ -216,14 +157,50 @@ async def _recv(self): else: await self.call(ack_message.SerializeToString()) - await self._disconnected() + self._queue.task_done() - async def call(self, message): - try: - await self._connection.send_bytes(message) - except aiohttp.client_exceptions.ClientError as err: - LOGGER.error("remote websocket connection closed: %s", err) - await self._disconnected() + async def _start_websocket_handler(self, session: ClientSession): + retry_in: int = 10 + + while not self._is_stopping: + LOGGER.debug("_start_websocket_handler: %s", self.oauth._config_entry.entry_id) + + try: + await self._websocket_handler(session) + except client_exceptions.ClientConnectionError as cce: + LOGGER.error("Could not connect: %s, retry in %s seconds...", cce, retry_in) + LOGGER.debug(cce) + self.connection_state = STATE_RECONNECTING + await asyncio.sleep(retry_in) + retry_in = retry_in * 2 if retry_in < 120 else 120 + except Exception as error: + LOGGER.error("Other error %s", error) + raise WebsocketError from error + + async def _websocket_handler(self, session: ClientSession): + websocket_url = helper.Websocket_url(self._region) + + headers = await self._websocket_connection_headers() + self.is_connecting = True + LOGGER.info("Connecting to %s", websocket_url) + self._connection = await session.ws_connect(websocket_url, headers=headers, proxy=SYSTEM_PROXY) + LOGGER.info("Connected to mercedes websocket at %s", websocket_url) + + while not self._connection.closed: + self.is_connecting = False + msg = await self._connection.receive() + + self.connection_state = STATE_CONNECTED + + if msg.type in ( + WSMsgType.CLOSED, + WSMsgType.ERROR, + ): + LOGGER.info("websocket connection is closing") + break + elif msg.type == WSMsgType.BINARY: + self._queue.put_nowait(msg.data) + await self._watchdog.trigger() async def _websocket_connection_headers(self): token = await self.oauth.async_get_cached_token()