Skip to content

Commit

Permalink
Cleanup and change a few things in websocket (Fixes a heartbeat bug)
Browse files Browse the repository at this point in the history
  • Loading branch information
EvieePy committed May 13, 2024
1 parent b527114 commit 455a023
Showing 1 changed file with 45 additions and 27 deletions.
72 changes: 45 additions & 27 deletions twitchio/conduits/websockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

import aiohttp

from ..exceptions import WebsocketTimeoutException
from ..types_.conduits import (
KeepAliveMessage,
MessageTypes,
MetaData,
NotificationMessage,
Expand All @@ -40,8 +40,7 @@
WelcomeMessage,
WelcomePayload,
)
from ..utils import _from_json, a_timeout, parse_timestamp # type: ignore
from ..exceptions import WebsocketTimeoutException
from ..utils import _from_json, a_timeout # type: ignore


logger: logging.Logger = logging.getLogger(__name__)
Expand All @@ -52,18 +51,24 @@

class Websocket:
def __init__(
self, *, keep_alive_timeout: float = 60, session: aiohttp.ClientSession | None = None, id: int
self,
*,
keep_alive_timeout: float = 60,
session: aiohttp.ClientSession | None = None,
id: str,
) -> None:
self._keep_alive_timeout: int = max(10, min(int(keep_alive_timeout), 600))
self._last_keepalive: datetime.datetime | None = None
self._keep_alive_task: asyncio.Task[None] | None = None

self._session: aiohttp.ClientSession | None = session
self._id: int = id
self._id: str = id
self._session_id: str | None = None

self._socket: aiohttp.ClientWebSocketResponse | None = None
self._listen_task: asyncio.Task[None] | None = None

self._ready: asyncio.Event = asyncio.Event()
self._last_keepalive: datetime.datetime | None = None

@property
def keep_alive_timeout(self) -> int:
Expand All @@ -74,7 +79,7 @@ def connected(self) -> bool:
return bool(self._socket and not self._socket.closed)

@property
def id(self) -> int:
def id(self) -> str:
return self._id

async def connect(self) -> None:
Expand All @@ -87,17 +92,25 @@ async def connect(self) -> None:
if not self._session:
self._session = aiohttp.ClientSession()

self._socket = await self._session.ws_connect(url)
self._ready.clear()

self._socket = await self._session.ws_connect(url, heartbeat=15.0)
self._listen_task = asyncio.create_task(self._listen())

self._ready.clear()

try:
async with a_timeout(10):
await self._ready.wait()
except TimeoutError:
raise WebsocketTimeoutException

if self._keep_alive_task:
try:
self._keep_alive_task.cancel()
except Exception:
pass

self._keep_alive_task = asyncio.create_task(self._process_keepalive())

async def _listen(self) -> None:
assert self._socket

Expand All @@ -110,13 +123,19 @@ async def _listen(self) -> None:

type_: aiohttp.WSMsgType = message.type
if type_ in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING):
logger.debug("Received close message on conduit websocket: %s", self._session_id)
logger.debug(
"Received close message [%s] on conduit websocket: %s",
self._socket.close_code,
self._session_id,
)
return await self.close()

if type_ is not aiohttp.WSMsgType.TEXT:
logger.debug("Received unknown message from conduit websocket: %s", self._session_id)
continue

self._last_keepalive = datetime.datetime.now()

try:
data: WebsocketMessages = cast(WebsocketMessages, _from_json(message.data))
except Exception:
Expand All @@ -137,43 +156,42 @@ async def _listen(self) -> None:

elif message_type == "session_keepalive":
logger.debug('Received "session_keepalive" message from conduit websocket: %s', self._session_id)

keepalive_data: KeepAliveMessage = cast(KeepAliveMessage, data)
await self._process_keepalive(keepalive_data)

elif message_type == "revocation":
logger.debug('Received "revocation" message from conduit websocket: %s', self._session_id)

revocation_data: RevocationMessage = cast(RevocationMessage, data)
await self._process_revocation(revocation_data)

elif message_type == "notification":
logger.debug('Received "notification" message from conduit websocket: %s', self._session_id)

notification_data: NotificationMessage = cast(NotificationMessage, data)
await self._process_notification(notification_data)

else:
logger.warning("Received an unknown message type in conduit websocket: %s", self._session_id)

async def _process_keepalive(self) -> None:
assert self._last_keepalive
logger.debug("Started keep_alive task on conduit websocket: %s", self._session_id)

while True:
await asyncio.sleep(self._keep_alive_timeout)

now: datetime.datetime = datetime.datetime.now()
if self._last_keepalive + datetime.timedelta(seconds=self._keep_alive_timeout + 5) < now:
return await self.close()

async def _process_welcome(self, data: WelcomeMessage) -> None:
payload: WelcomePayload = data["payload"]
self._session_id = payload["session"]["id"]
self._ready.set()

logger.debug('Received "session_welcome" message from conduit websocket: %s', self._session_id)

async def _process_reconnect(self, data: ReconnectMessage) -> None: ...

async def _process_keepalive(self, data: KeepAliveMessage) -> None:
now: datetime.datetime = datetime.datetime.now()

if self._last_keepalive and self._last_keepalive + datetime.timedelta(seconds=self._keep_alive_timeout) < now:
# TODO: Reconnect and resubscribe
return await self.close()

self._last_keepalive = now

async def _process_revocation(self, data: RevocationMessage) -> None: ...

async def _process_notification(self, data: NotificationMessage) -> None: ...
Expand All @@ -196,5 +214,5 @@ async def close(self) -> None:
self._listen_task.cancel()
except Exception:
...

logger.debug("Successfully closed conduit websocket: %s", self._session_id)

0 comments on commit 455a023

Please sign in to comment.