/
twitch_monitor.py
121 lines (89 loc) · 3.63 KB
/
twitch_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import traceback
from asyncio import Event, Lock, sleep
import websockets
from websockets.exceptions import ConnectionClosedOK
from websockets.legacy.client import WebSocketClientProtocol
from config import NO_MONITOR, TTV_TOKEN, TTV_USERNAME
from utils import normalize_username
from vote import Vote
class TwitchMonitor:
lock = Lock()
run_loop = Event()
_vote: Vote
_channel: str = ''
_socket: WebSocketClientProtocol | None = None
def load(self, vote: Vote) -> None:
self._vote = vote
async def reconnect(self) -> None:
await self.disconnect()
await self.connect(self._channel)
async def disconnect(self) -> None:
if not self.run_loop.is_set():
return
self.run_loop.clear()
try:
await self._socket.close()
except Exception:
pass
self._socket = None
print(f'[TTV] 🔴 Disconnected from @{self._channel}')
async def connect(self, channel: str) -> None:
if NO_MONITOR:
return
if self._channel != channel:
await self.disconnect()
self._channel = channel
if self.run_loop.is_set():
return
timeout = 0.5
while True:
try:
self._socket = await websockets.connect('wss://irc-ws.chat.twitch.tv:443')
break
except Exception:
traceback.print_exc()
await sleep(timeout)
timeout = min(timeout * 2, 5)
self.run_loop.set()
print(f'[TTV] 🟢 Connected to @{channel}')
async def loop(self) -> None:
if NO_MONITOR:
return
while True:
# reset connection if connected
async with self.lock:
if self.run_loop.is_set():
await self.reconnect()
await self.run_loop.wait()
print(f'[TTV] Started monitoring on @{self._channel}')
try:
await self._socket.send(f'CAP REQ :twitch.tv/membership')
await self._socket.send(f'PASS oauth:{TTV_TOKEN}')
await self._socket.send(f'NICK {TTV_USERNAME}')
await self._socket.send(f'JOIN #{self._channel}')
while True:
raw = (await self._socket.recv()).strip()
parts = raw.split(' ')
assert len(parts) >= 2, f'Message contains no spaces: {raw}'
if parts[0] == 'PING':
nonce = ' '.join(parts[1:])
await self._socket.send(f'PONG {nonce}') # 🏓
elif parts[1] == 'PRIVMSG' and parts[2] == f'#{self._channel}':
username = normalize_username(parts[0].split('!')[0].lstrip(':'))
message = ' '.join(parts[3:]).lstrip(':').lower()
if message.startswith('!'):
message = message.lstrip('!').strip()
# try multiple formats for better compatibility
for new_whitespace in ('', '_'):
if self._vote.cast_user_vote(username, message.replace(' ', new_whitespace)):
break
elif parts[1] in {'JOIN', 'PART', '353'}:
pass
else:
print('[TTV]', raw)
except ConnectionClosedOK:
pass
except Exception:
traceback.print_exc()
await sleep(2)
print(f'[TTV] Stopped monitoring on @{self._channel}')