Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions getstream/video/rtc/coordinator/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

logger = logging.getLogger(__name__)

# in case you need to debug websockets, here's how to enable debug logs
# logging.getLogger("websockets.client").setLevel(logging.DEBUG)

DEFAULT_WS_URI = "wss://video.stream-io-api.com/api/v2/connect"

Expand Down Expand Up @@ -136,8 +138,9 @@ async def _open_socket(self) -> dict:
self._websocket, auth_payload = await asyncio.gather(
websockets.connect(
self.uri,
# ping_interval=None,
# ping_timeout=None,
ping_interval=None,
ping_timeout=None,
close_timeout=1.0,
),
self._build_auth_payload(),
)
Expand Down Expand Up @@ -247,26 +250,26 @@ async def _reader_task_func(self) -> None:
pass

self._logger.error(
"WebSocket connection closed by server",
f"WebSocket connection closed by server close_code: {close_code} close_reason: {close_reason}",
extra={
"close_code": close_code,
"close_reason": close_reason,
},
)
if self._connected and not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect("ws connection closed by server")
break
except websockets.exceptions.WebSocketException as e:
self._logger.error(
"WebSocket protocol error in reader task", exc_info=e
)
if self._connected and not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect(f"ws exception {e}")
break
except Exception as e:
self._logger.error("Unexpected error in reader task", exc_info=e)
if self._connected and not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect(f"ws read error {e}")
break

except asyncio.CancelledError:
Expand Down Expand Up @@ -301,7 +304,7 @@ async def _heartbeat_task_func(self) -> None:
},
)
if not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect("healthcheck timeout")
break

# Send heartbeat
Expand Down Expand Up @@ -331,22 +334,22 @@ async def _heartbeat_task_func(self) -> None:
},
)
if not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect("ws connection closed")
break
except websockets.exceptions.WebSocketException as e:
self._logger.error(
"WebSocket protocol error while sending heartbeat",
exc_info=e,
)
if not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect("healthcheck write error")
break
except Exception as e:
self._logger.error(
"Unexpected error while sending heartbeat", exc_info=e
except Exception:
self._logger.exception(
"Unexpected error while sending heartbeat"
)
if not self._reconnect_in_progress:
await self._trigger_reconnect()
await self._trigger_reconnect("heartbeat unknown error")
break

except asyncio.CancelledError:
Expand All @@ -355,14 +358,14 @@ async def _heartbeat_task_func(self) -> None:
finally:
self._logger.debug("Heartbeat task ended")

async def _trigger_reconnect(self) -> None:
async def _trigger_reconnect(self, reason) -> None:
"""
Trigger reconnection process.
"""
if self._reconnect_in_progress:
return

self._logger.warning("Triggering reconnection")
self._logger.warning(f"Triggering reconnection reason: '{reason}'")
self._reconnect_in_progress = True

try:
Expand Down
Loading