Skip to content

Commit

Permalink
Rework websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
ReneNulschDE committed Mar 7, 2024
1 parent e2411c1 commit db6c8b3
Showing 1 changed file with 73 additions and 96 deletions.
169 changes: 73 additions & 96 deletions custom_components/mbapi2020/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@
from typing import Optional
import uuid

import aiohttp
from aiohttp import ClientSession, WSMsgType, client_exceptions

from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.dispatcher import async_dispatcher_send

from .const import (
DOMAIN,
REGION_APAC,
REGION_EUROPE,
REGION_NORAM,
Expand All @@ -30,19 +28,14 @@
WEBSOCKET_USER_AGENT,
WEBSOCKET_USER_AGENT_PA,
)
from .errors import WebsocketError
from .helper import UrlHelper as helper
from .oauth import Oauth
from .proto import vehicle_events_pb2

DEFAULT_WATCHDOG_TIMEOUT = 3600

STATE_INIT = "initializing"
STATE_CONNECTING = "connecting"
DEFAULT_WATCHDOG_TIMEOUT = 6600
STATE_CONNECTED = "connected"
STATE_AUTH_INVALID = "auth_invalid"
STATE_AUTH_REQUIRED = "auth_required"
STATE_RECONNECTING = "reconnecting"
STATE_DISCONNECTED = "disconnected"

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,8 +68,6 @@ async def on_expire(self):

async def trigger(self):
"""Trigger the watchdog."""
# LOGGER.debug("Watchdog triggered – sleeping for %s seconds", self._timeout)

if self._timer_task:
self._timer_task.cancel()

Expand All @@ -96,13 +87,8 @@ def __init__(self, hass, oauth, region) -> None:
self._region = region
self.connection_state = "unknown"
self.is_connecting = False
self._watchdog: WebsocketWatchdog = WebsocketWatchdog(self._disconnected)

def set_connection_state(self, state):
"""Change current connection state."""
signal = f"{DOMAIN}"
async_dispatcher_send(self._hass, signal, state)
self.connection_state = state
self._watchdog: WebsocketWatchdog = WebsocketWatchdog(self.initiatiate_connection_reset)
self._queue = asyncio.Queue()

async def async_connect(self, on_data) -> None:
"""Connect to the socket."""
Expand All @@ -115,32 +101,17 @@ async def _async_stop_handler(event):
await self.async_stop()

self._on_data_received = on_data
self.is_connecting = True

await self._watchdog.trigger()

self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_handler)
session = async_get_clientsession(self._hass, VERIFY_SSL)
self.set_connection_state(STATE_CONNECTING)

websocket_url = helper.Websocket_url(self._region)
while True:
try:
headers = await self._websocket_connection_headers()
self.is_connecting = True
LOGGER.info("Connecting to %s", websocket_url)
self._connection = await session.ws_connect(websocket_url, headers=headers, proxy=SYSTEM_PROXY)
except aiohttp.client_exceptions.ClientError as exc:
LOGGER.error("Could not connect to %s, retry in 10 seconds...", websocket_url)
LOGGER.debug(exc)
self.set_connection_state(STATE_RECONNECTING)
await asyncio.sleep(10)
else:
self.is_connecting = False
LOGGER.info("Connected to mercedes websocket at %s", websocket_url)
break

self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_stop_handler)
# Tasks erstellen
queue_task = asyncio.create_task(self._start_queue_handler())
websocket_task = asyncio.create_task(self._start_websocket_handler(session))

asyncio.ensure_future(self._recv())
# Warten, dass Tasks laufen
await asyncio.gather(queue_task, websocket_task)

async def async_stop(self):
"""Close connection."""
Expand All @@ -149,65 +120,35 @@ async def async_stop(self):
if self._connection is not None:
await self._connection.close()

def _decode_message(self, res_raw):
res = vehicle_events_pb2.PushMessage()
res.ParseFromString(res_raw)
return res

async def _disconnected(self):
self.set_connection_state(STATE_DISCONNECTED)
if not self._is_stopping:
asyncio.ensure_future(self.async_connect(self._on_data_received))

async def _recv(self):
while not self._connection.closed:
self.set_connection_state(STATE_CONNECTED)

try:
data = await self._connection.receive()
except aiohttp.client_exceptions.ClientError as err:
LOGGER.debug("remote websocket connection closed: %s", err)
break
except ConnectionResetError as cr_err:
LOGGER.debug("remote websocket connection closed cr: %s", cr_err)
break
except RuntimeError as err:
LOGGER.debug("remote websocket connection closed cr: %s", err)
break

await self._watchdog.trigger()

if not data:
break

if data.type == aiohttp.WSMsgType.PING:
LOGGER.debug("websocket connection PING ")
async def initiatiate_connection_reset(self):
"""Initiate a connection reset."""
if self._connection is not None:
await self._connection.close()

if data.type == aiohttp.WSMsgType.PONG:
LOGGER.debug("websocket connection PONG ")
async def call(self, message):
"""Send a message to the MB websocket servers."""
try:
await self._connection.send_bytes(message)
except client_exceptions.ClientError as err:
LOGGER.error("remote websocket connection closed: %s", err)

if data.type in (
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
):
LOGGER.debug("websocket connection is closing")
break
async def _start_queue_handler(self):
while not self._is_stopping:
await self._queue_handler()

if data.type == aiohttp.WSMsgType.ERROR:
LOGGER.error("websocket connection had an error")
break
async def _queue_handler(self):
while not self._is_stopping:
data = await self._queue.get()

try:
if data.type == aiohttp.WSMsgType.BINARY:
message = self._decode_message(data.data)
message = vehicle_events_pb2.PushMessage()
message.ParseFromString(data)
except TypeError as err:
LOGGER.error("could not decode data (%s) from websocket: %s", data, err)
break

if message is None:
break

LOGGER.debug("Got notification: %s", message.WhichOneof("msg"))
ack_message = self._on_data_received(message)
if ack_message:
Expand All @@ -216,14 +157,50 @@ async def _recv(self):
else:
await self.call(ack_message.SerializeToString())

await self._disconnected()
self._queue.task_done()

async def call(self, message):
try:
await self._connection.send_bytes(message)
except aiohttp.client_exceptions.ClientError as err:
LOGGER.error("remote websocket connection closed: %s", err)
await self._disconnected()
async def _start_websocket_handler(self, session: ClientSession):
retry_in: int = 10

while not self._is_stopping:
LOGGER.debug("_start_websocket_handler: %s", self.oauth._config_entry.entry_id)

try:
await self._websocket_handler(session)
except client_exceptions.ClientConnectionError as cce:
LOGGER.error("Could not connect: %s, retry in %s seconds...", cce, retry_in)
LOGGER.debug(cce)
self.connection_state = STATE_RECONNECTING
await asyncio.sleep(retry_in)
retry_in = retry_in * 2 if retry_in < 120 else 120
except Exception as error:
LOGGER.error("Other error %s", error)
raise WebsocketError from error

async def _websocket_handler(self, session: ClientSession):
websocket_url = helper.Websocket_url(self._region)

headers = await self._websocket_connection_headers()
self.is_connecting = True
LOGGER.info("Connecting to %s", websocket_url)
self._connection = await session.ws_connect(websocket_url, headers=headers, proxy=SYSTEM_PROXY)
LOGGER.info("Connected to mercedes websocket at %s", websocket_url)

while not self._connection.closed:
self.is_connecting = False
msg = await self._connection.receive()

self.connection_state = STATE_CONNECTED

if msg.type in (
WSMsgType.CLOSED,
WSMsgType.ERROR,
):
LOGGER.info("websocket connection is closing")
break
elif msg.type == WSMsgType.BINARY:
self._queue.put_nowait(msg.data)
await self._watchdog.trigger()

async def _websocket_connection_headers(self):
token = await self.oauth.async_get_cached_token()
Expand Down

0 comments on commit db6c8b3

Please sign in to comment.