This repository has been archived by the owner on Apr 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
universal_wrapper.py
95 lines (76 loc) · 2.88 KB
/
universal_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# This file is part of curious.
#
# curious is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# curious is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with curious. If not, see <http://www.gnu.org/licenses/>.
import threading
import anyio
from lomond.persist import persist
from lomond.websocket import WebSocket
from curious import USER_AGENT
class UniversalWrapper:
"""
Represents a universal websocket wrapper.
"""
_DONE = object()
def __init__(self, url: str, task_group: anyio.TaskGroup):
self._url = url
self._ws: WebSocket = None
self._cancelled = threading.Event()
self._queue = anyio.create_queue(5)
self._task_group = task_group
def _generator(self):
"""
Runs the websocket generator. This is used by calling next() over it.
"""
ws = WebSocket(self._url, agent=USER_AGENT)
websocket = persist(ws, ping_rate=0, poll=1, exit_event=self._cancelled)
self._ws = ws
for item in websocket:
# for some reason lomond doesn't exit the loop??
if self._cancelled.is_set():
break
anyio.run_async_from_thread(self._queue.put, item)
anyio.run_async_from_thread(self._queue.put, self._DONE)
async def run(self):
"""
Runs the websocket.
This returns an async generator.
"""
if self._cancelled.is_set():
return
await self._task_group.spawn(anyio.run_in_thread, self._generator)
while True:
item = await self._queue.get()
if item is self._DONE:
return
yield item
# this will work because if it's cancelled the persist will spew a cancelled
if self._cancelled.is_set():
break
async def send_text(self, message: str):
"""
Sends some text over the generator.
"""
if self._ws is not None:
await anyio.run_in_thread(self._ws.send_text, message)
async def close(self, code: int = 1006, reason: str = "No reason", *, kill: bool = False):
"""
Closes the websocket.
"""
if kill:
self._cancelled.set()
if self._ws is not None:
# NB: This can't run in a thread because if we're cancelled (trio) this will never
# happen
# So we just pray this doesn't block!
self._ws.close(code, reason)