-
Notifications
You must be signed in to change notification settings - Fork 0
/
elenabotlib.py
813 lines (689 loc) · 35.7 KB
/
elenabotlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
"""
This only implements a simple chatbot.
Inherit from Session class to use.
Copyright 2021-2024 ElenaWinters
References:
Twitch IRC Guide
Rinbot by RingoMär
TwitchDev samples
Tested on Windows 11 and Ubuntu 22.04
"""
from typing import Any, Callable, Union
from dataclasses import make_dataclass, asdict, dataclass
from sqlalchemy.types import LargeBinary
from datetime import datetime
from queue import Queue
import functools
import inspect
import aiohttp
import asyncio
import logging
import dataset
import msgpack
import hints
import math
import sys
import re
log = logging.getLogger(__name__)
SOH = chr(1) # ASCII SOH (Start of Header) Control Character, used for ACTION events
# con = sqlite3.connect('log.sqlite')
# cur = con.cursor()
# con.execute('''CREATE TABLE IF NOT EXISTS msg_sent (channel text, message text)''')
# con.execute('''CREATE TABLE IF NOT EXISTS msg_denied (channel text, message text)''')
# con.execute('''CREATE TABLE IF NOT EXISTS msg_banned (channel text)''')
# con.execute('''CREATE TABLE IF NOT EXISTS outgoing (channel text, message text)''')
def event(name: str = 'any', *extras) -> Callable: # listener/decorator for any event
events = list(extras)
events.append(name)
# print(events)
def wrapper(func: Callable) -> Callable:
return add_listeners(func, events)
return wrapper
events = event
def cooldown(time: int) -> Callable:
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self: Session, ctx) -> Callable:
if self.func_on_cooldown(func, time):
# log.debug(f'{func.__name__} is on cooldown!')
return asyncio.sleep(0)
return func(self, ctx)
return wrapper
return decorator
def ignore_myself() -> Callable:
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self: Session, ctx: hints.Messageable) -> Callable:
if ctx.user.lower() != self.nick:
return func(self, ctx)
return asyncio.sleep(0)
return wrapper
return decorator
def author(*names) -> Callable: # check author
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self: Session, ctx: hints.Messageable) -> Callable:
if any(ctx.user.lower() == name.lower() for name in list(names)):
return func(self, ctx)
return asyncio.sleep(0)
return wrapper
return decorator
authors = author
def channel(*names) -> Callable: # check channel
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(self: Session, ctx: hints.Messageable) -> Callable:
def adapt(_name: str) -> str:
return self.merge(_name)
if any(ctx.channel == adapt(name) for name in list(names)):
return func(self, ctx)
return asyncio.sleep(0)
return wrapper
return decorator
channels = channel
# these need to be set to the same thing for a solo compare to check for mode, reference the message decorator below
def msg_compare(mode: str = 'eq', actual: str = 'test', compare: str = 'test') -> bool:
match mode.lower():
case 'eq' | 'equals':
if actual == compare:
return True
case 'sw' | 'startswith':
return actual.startswith(compare)
case 'ew' | 'endswith':
return actual.endswith(compare)
case 'in' | 'contains':
if compare in actual:
return True
def message(*args: tuple, **kwargs: dict) -> Callable:
def decorator(func: Callable) -> Callable:
def wrapper(self: Session, ctx: hints.Messageable) -> Callable:
mode = [pmode for pmode in args if msg_compare(pmode)] # this is so bad lmfao
if not mode:
mode = kwargs.get('mode', kwargs.get('m', 'eq'))
else: mode = mode[0]
possible = list(args)
if mode in possible:
possible.remove(mode)
if any(msg_compare(mode, ctx.message.content, msg) for msg in possible):
if kwargs.get('ignore_self', True) and ctx.user.lower() == self.nick:
return asyncio.sleep(0)
return func(self, ctx)
return asyncio.sleep(0)
return wrapper
return decorator
class DebugFilter(logging.Filter):
def filter(self, record):
if record.levelno >= 30:
return True
elif record.levelno > 10:
return False
return True
# you can write your own configuration if you want to, not here though. do it in your own class
def configure_logger(_level=logging.INFO) -> None:
log.setLevel(_level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(_level)
log.addHandler(handler)
d_log = logging.FileHandler('debug.log')
d_log.addFilter(DebugFilter())
d_log.setLevel(_level)
log.addHandler(d_log)
_listeners = {}
_handlers = {} # for internal dispatch
def dispatch(*args) -> Callable: # listener/decorator for any event
'''
This function is used for notice dispatches to internal functions.
While it can be used in a import case, it will most likely do nothing.
A notice can only be registered once.
'''
def wrapper(func: Callable) -> Callable:
for event in args:
if event in _handlers: continue # skip if already found, no need to error
_handlers[event] = func
return func
return wrapper
# Adding this make me realize that I may need to split the bot into multiple files.
# I've prided myself on keeping the lib portable in a single file.
# But with the removal of hosts, I need to implement the Twitch API eventually.
def depr_event(date: datetime, before: str, after: str, events: list) -> Callable:
'''
Warn the user that the event they are registering is or is about to be depreciated.
'''
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Callable:
if event := set(args[1]).intersection(events):
pretty = {'date': f'{date.strftime("%B %d, %Y")}', 'event': event.pop()}
prefix = 'DEPRECIATION({event}): '
if date > datetime.now():
log.warning(str(prefix + before).format(**pretty))
else:
log.warning(str(prefix + after).format(**pretty))
return func(*args, **kwargs)
return wrapper
return decorator
def expr_event(message: str, events: list, date: datetime = datetime.now()) -> Callable:
'''
Alert the user that the event they are using is considered experiemental and may be removed at any time.
'''
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Callable:
if event := set(args[1]).intersection(events):
pretty = {'date': f'{date.strftime("%B %d, %Y")}', 'event': event.pop()}
log.warning(str('EXPERIMENT({event}): ' + message).format(**pretty))
return func(*args, **kwargs)
return wrapper
return decorator
# ORDER OF THESE DECORATORS SHOULD BE IN ORDER OF HAPPENINGS
@expr_event(message='https://help.twitch.tv/s/article/cheering-experiment-2022',
events=['midnightsquid']) # i don't know if this will ever be depr'd or if it already has
@depr_event(date=datetime(2022, 10, 3),
before='Event \'{event}\' will be depreciated by Twitch on {date}. You will no longer receive this event after that date.',
after='Event \'{event}\' has been depreciated by Twitch as of {date}. You can still listen for the event, but you will never receive it.',
events=['host', 'hosttarget', 'unhost', 'notice:autohost_receive', 'notice:bad_host_error', 'notice:bad_host_hosting',
'notice:bad_host_rate_exceeded', 'notice:bad_host_rejected', 'notice:bad_host_self', 'notice:bad_unhost_error',
'notice:host_off', 'notice:host_on', 'notice:host_receive', 'notice:host_receive_no_count', 'notice:host_target_went_offline',
'notice:hosts_remaining', 'notice:not_hosting', 'notice:usage_host', 'notice:usage_unhost'])
def add_listeners(func, names=['any']) -> None:
# print(names)
for name in names:
match name:
case 'all' | '*':
name = 'any'
case 'resubscribe' | 'resubscription':
name = 'resub'
case 'subscribe' | 'subscription':
name = 'sub'
case 'msg':
name = 'message'
if name not in _listeners:
_listeners[name] = []
_listeners[name].append(func)
rx_positive = re.compile(r'^\d+$')
@dataclass
class SessionFlags:
log_hint_differences: bool = False
send_in_debug: bool = False
def __setattr__(self, prop, val):
if (old_val := getattr(self, prop)) != val:
log.warn(f'WARNING: Flag {prop.upper()} was changed from {old_val} to {val}! This may put the program into an invalid state!')
super().__setattr__(prop, val)
class Session(object):
def __init__(self) -> None:
self.auto_reconnect = True
self.host = 'ws://irc-ws.chat.twitch.tv'
self.port = 80
self.__cooldowns = {}
self.__channels = []
self.__joinqueue = Queue()
self.__outgoing = {}
self.__proxies = {}
self.dbaddress = None
self.flags = SessionFlags()
self.flag_storage = {}
# This is a complex function, and it has a high "Cognitive Complexity", beyond the limit of 15.
def twitch_irc_formatter(self, original: str = '@badge-info=gay/4;badges=lesbian/3,premium/1;user-type=awesome :tmi.twitch.tv GAYMSG #zaquelle :This is kinda gay'): # tested with every @ based message. parses correctly.
array = original[1:].split(' ') # Indexes | 0 = Server, 1 = Notice, 2 = Channel, 3+ = Message (Broken up, use Regex)
# ChatGPT says that re.split should be used over a string split for the above. TODO: Test with re.split
offset = 0
info = {}
if original.startswith('@'):
offset = 1
for k, v, _ in re.findall(r'([-\w]+)=(.*?)(;| :)', re.split(r'tmi\.twitch\.tv', original)[0]):
# log.debug(k)
k = k.replace('-', '_')
if k in ('badge_info', 'badges'): # converts for specific fields
badges = []
for entry in v.split(','):
if badge := re.search(r'(.+?)/(.+)', entry):
version = badge.group(2).replace('\\s', ' ')
if rx_positive.match(version):
version = int(version)
badges.append(hints.Badge(name=badge.group(1), version=version))
info[k] = badges
elif k in ('emote_sets', 'emotes'):
info[k] = [e for e in v.split(',') if e] # E
elif k in ('system_msg', 'reply_parent_msg_body'):
info[k] = v.replace('\\s', ' ')
elif k in ('emote_only', 'subscriber', 'first_msg', 'subs_only', 'rituals',
'turbo', 'mod', 'r9k'):
info[k] = bool(int(v))
else:
if v == '':
v = None
elif rx_positive.match(v) or re.match(r'^-\d+$', v): # there is a positive and negative version cuz regex
v = int(v) # ^\d+$|^-\d+$
elif v == 'false':
v = False
elif v == 'true':
v = True
info[k] = v
if len(array) >= 3 + offset: # this is so sketch but it works. basically, check if a channel is provided
info['channel'] = array[2 + offset] # force channel
if message := re.search(f"{array[1 + offset]} {info['channel']}" + r'..(.*)', original):
info['message'] = message.group(1)
if user := re.search(r"(?P<name>[\w]+)!(?P=name)@(?P=name)", array[0 + offset]):
info['user'] = user.group(1)
if __debug__: # include server if in debug mode. this isn't useful for most cases and i dont wanna properly parse it
info['server'] = array[0 + offset]
prs = make_dataclass(array[1 + offset], [tuple([k, v]) for k, v in info.items()])
return prs(**info)
def proxy_send_obj(self, channel: str):
if channel not in self.__proxies:
async def _send_proxy(message: str): # construct send function that can be called from ctx)
await self.send(message, channel)
self.__proxies[channel] = _send_proxy
return self.__proxies[channel]
# @dispatch(1, 2, 3, 4, 372, 376, 366)
# async def sinkhole(self, ctx): # I'm not currently parsing these as I don't need to.
# print(ctx)
# pass
@dispatch('ping')
async def handle_ping_pong(self, ctx):
await self.sock.send_str('PONG :tmi.twitch.tv')
# if __debug__:
# log.info('Server sent PING. We sent PONG.')
dprs = {'server': ':tmi.twitch.tv'}
prs = make_dataclass('ping', [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners('ping', ctx=prs)
@dispatch('join', 'part')
async def handle_join_part(self, ctx):
if not self.any_listeners('join', 'part'): return
dprs = ctx.__dict__
dprs['send'] = self.proxy_send_obj(ctx.channel)
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
if ctx.user == self.nick:
await self._lcall(f'{type(ctx).__name__.lower()}:self', ctx=prs)
await self.call_listeners(f'{type(ctx).__name__.lower()}:{ctx.user}', ctx=prs)
@dispatch(353)
async def handle_353(self, ctx):
if not self.any_listeners('join'): return
users = re.findall(r'\w+', ctx.message)
channel = users.pop(0)
for user in users:
if user == self.nick: continue
prs = make_dataclass('JOIN', [tuple(['user', str]), tuple(['channel', str])])
await self.call_listeners(f'join:{user}', ctx=prs(user=user, channel=f'#{channel}'))
@dispatch(375)
async def handle_375(self, ctx):
log.debug(f"Connected! Environment is {'DEBUG' if __debug__ else 'PRODUCTION'}.")
loop = asyncio.get_running_loop()
self.jointask = loop.create_task(self._join())
@dispatch(421) # UNKNOWN IRC COMMAND
async def unknown_command_irc(self, ctx):
log.debug(f"An IRC command was sent to the server, but they didn't recognize it. Here is what the server told us:\n{ctx}")
@dispatch('cap')
async def handle_cap(self, ctx):
dprs = ctx.__dict__ # dprs and ctx are linked. changing one changes the other.
dprs['capabilities'] = re.findall(r'twitch\.tv\/(\w+)', ctx.message)
dprs['response'] = re.search(r'(\w+) :twitch\.tv\/\w+', ctx.message).group(1)
del dprs['channel']
del dprs['message']
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs(**dprs))
@dispatch('globaluserstate') # I usually format display_name to user but I don't want to do that with GUS
async def handle_globaluserstate(self, ctx: hints.GLOBALUSERSTATE): # ALWAYS SET THIS
dprs = ctx.__dict__
dprs['user'] = ctx.display_name
del dprs['display_name']
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs)
self.state = ctx
@dispatch('privmsg')
async def handle_privmsg(self, ctx: Union[hints.PRIVMSG, hints.USERSTATE]):
if not self.any_listeners('message'): return
dprs = ctx.__dict__
if hasattr(ctx, 'display_name'):
dprs['user'] = ctx.display_name
del dprs['display_name']
dprs['send'] = self.proxy_send_obj(ctx.channel)
dprs['action'] = True if 'ACTION' in ctx.message and SOH in ctx.message else False
dprs['message'] = hints.Message(dprs['user'], ctx.channel, ctx.message[len(SOH + 'ACTION '):-len(SOH)] if dprs['action'] else ctx.message)
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
if prs.action:
await self.call_listeners('message:action', ctx=prs)
return
await self.call_listeners('message', ctx=prs)
@dispatch('userstate')
async def handle_userstate(self, ctx: Union[hints.PRIVMSG, hints.USERSTATE]):
if not self.any_listeners('userstate'): return
dprs = ctx.__dict__
if hasattr(ctx, 'display_name'):
dprs['user'] = ctx.display_name
del dprs['display_name']
dprs['send'] = self.proxy_send_obj(ctx.channel)
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs)
@dispatch('roomstate', 'clearmsg')
async def handle_generic(self, ctx):
if not self.any_listeners('roomstate', 'clearmsg'): return
dprs = ctx.__dict__
dprs['send'] = self.proxy_send_obj(ctx.channel)
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs)
@dispatch('notice')
async def handle_notice(self, ctx):
if not self.any_listeners('notice'): return
dprs = ctx.__dict__
dprs['send'] = self.proxy_send_obj(ctx.channel)
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(f'notice:{ctx.msg_id}', ctx=prs)
@dispatch('usernotice') # i dont know if i'll add individual listener checks yet. unsure on speed of any_listeners
async def handle_usernotice(self, ctx):
dprs = ctx.__dict__
dprs['send'] = self.proxy_send_obj(ctx.channel)
if hasattr(ctx, 'display_name'):
dprs['user'] = ctx.display_name
del dprs['display_name']
else:
dprs['user'] = ctx.login
del dprs['login']
prs = None
if ctx.msg_id in ('sub', 'resub', 'extendsub', 'primepaidupgrade', 'communitypayforward', 'standardpayforward',
'subgift', 'anonsubgift', 'submysterygift', 'giftpaidupgrade', 'anongiftpaidupgrade'):
dprs['message'] = hints.Message(dprs['user'], ctx.channel, ctx.message if hasattr(ctx, 'message') else None)
prs = make_dataclass('SUBSCRIPTION', [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(f'sub:{prs.msg_id}', ctx=prs) # this covers anysub, just use "sub" as the event
elif ctx.msg_id == 'raid':
dprs['raider'] = dprs['msg_param_displayName'] if 'msg_param_displayName' in dprs else dprs['msg_param_login']
dprs['viewers'] = dprs['msg_param_viewerCount']
prs = make_dataclass(ctx.msg_id.upper(), [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(f'{ctx.msg_id}:{prs.channel}', ctx=prs)
elif ctx.msg_id == 'ritual':
dprs['name'] = dprs['msg_param_ritual_name']
prs = make_dataclass(ctx.msg_id.upper(), [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(f'{ctx.msg_id}:{prs.name}', ctx=prs) # this will be new new catergory format "listener:sub_type"
if prs.name != 'new_chatter':
log.debug(prs) # just incase new rituals are added
# This is for generic items that don't need to be processed.
elif ctx.msg_id in ('unraid', # called when a raid is cancelled. doesn't give a raid target
'bitsbadgetier', # i documented this above but never looked into it. only a few examples, no idea what it does
'announcement', # This is super new.
'midnightsquid'): # Midnightsquid is a 2022 cheering experiment with actual currency.
prs = make_dataclass(ctx.msg_id.upper(), [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(ctx.msg_id, ctx=prs)
else:
log.debug('THE FOLLOWING MSG_ID IS NOT BEING HANDLED PROPERLY.')
log.debug(ctx)
if hasattr(ctx, 'msg_id'):
prs = make_dataclass(ctx.msg_id.upper(), [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(prs.msg_id, ctx=prs)
log.debug(prs)
log.debug("WE'VE TRIED TO MAKE IT WORK FOR YOU THIS TIME.\nPLEASE CONTACT THE DEVELOPER.")
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs)
@dispatch('clearchat') # CLEARCHAT(ban_duration=60, room_id=22484632, target_user_id=42935983, tmi_sent_ts=1652203009894, message='narehawk', server=':tmi.twitch.tv', channel='#forsen')
async def handle_clearchat(self, ctx):
if not self.any_listeners('clearchat'): return
dprs = ctx.__dict__
dprs['target'] = dprs.get('message')
if hasattr(ctx, 'message'): del dprs['message']
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs)
if prs.target is None:
log.debug(prs)
# THIS IS A DEPRECIATED EVENT. THIS IS EFFECTIVELY A DEAD CODE PATH.
@dispatch('hosttarget') # HOSTTARGET(message='froggirlgaming 6', server='tmi.twitch.tv', channel='#xcup_of_joe')
async def handle_hosttarget(self, ctx):
if not self.any_listeners('host', 'unhost', 'hosttarget'): return
dprs = ctx.__dict__
target, dprs['viewers'] = ctx.message.split(' ')
del dprs['message']
if dprs['viewers'] == '-':
dprs['viewers'] = 0
focus = 'unhost'
if target != '-':
dprs['target'] = target
focus = 'host'
dprs['viewers'] = int(dprs['viewers'])
prs = make_dataclass(focus.upper(), [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners('hosttarget', ctx=prs) # this is sent regardless of focus
await self.call_listeners(focus, ctx=prs)
@dispatch('whisper')
async def handle_whisper(self, ctx): # you cannot send a whisper from irc so i'm not gonna include a send object
if not self.any_listeners('whisper'): return
dprs = ctx.__dict__
if hasattr(ctx, 'display_name'):
dprs['user'] = ctx.display_name
del dprs['display_name']
dprs['message'] = hints.Message(dprs['user'], ctx.channel, ctx.message if hasattr(ctx, 'message') else None)
prs = make_dataclass(type(ctx).__name__, [tuple([k, v]) for k, v in dprs.items()])(**dprs)
await self.call_listeners(type(ctx).__name__.lower(), ctx=prs)
async def __wsloop(self, channels): # i've seperated this out so it's easier to read. also using mangling cuz this is a messy function that i dont want people to touch
async with aiohttp.ClientSession().ws_connect(f'{self.host}:{self.port}', heartbeat=10) as self.sock:
log.debug(f'Attempting to connect to {self.host}:{self.port}')
await self.sock.send_str("CAP REQ :twitch.tv/membership twitch.tv/tags twitch.tv/commands")
await self.sock.send_str(f"PASS {self.token}")
await self.sock.send_str(f"NICK {self.nick}")
await self.join(channels)
async for msg in self.sock:
if msg.type != aiohttp.WSMsgType.TEXT:
log.debug(f'Unknown WSMessage Type: {msg.type}')
continue
for line in msg.data.split("\r\n")[:-1]: # ?!?
if line == 'PING :tmi.twitch.tv':
try:
await _handlers['ping'](self, ctx=line)
except Exception as exc:
log.exception(exc)
# await self.sock.send_str('PONG :tmi.twitch.tv')
# if __debug__:
# log.info('Server sent PING. We sent PONG.')
continue
elif line == ':tmi.twitch.tv NOTICE * :Login authentication failed':
log.error('CRITICAL: TWITCH REJECTED OUR LOGIN. THIS IS ON YOU TO FIX. MOST LIKELY YOUR TOKEN IS INVALID.')
self.auto_reconnect = False
await self.sock.close()
continue
elif line == ':tmi.twitch.tv RECONNECT': # the parser will parse this, but i want it to be very explicitly handled
await self.sock.close()
continue
try:
notice = self.twitch_irc_formatter(line)
except Exception as exc:
log.exception(exc)
log.debug(line)
continue
name = type(notice).__name__.lower()
if rx_positive.match(name):
name = int(name)
if name in _handlers:
try:
await _handlers[name](self, ctx=notice)
except Exception as exc:
log.exception(exc)
log.debug(notice)
else: # NOTICE NOT HANDLED, LOG AND NOTIFY
# print(type(notice).__name__.lower())
if type(notice).__name__.lower() not in ['001', '002', '003', '004', '372', '376', '366']:
log.debug('THE FOLLOWING NOTICE IS NOT BEING HANDLED PROPERLY.')
log.debug(notice)
log.debug("WE'VE TRIED TO MAKE IT WORK FOR YOU THIS TIME.\nPLEASE CONTACT THE DEVELOPER.")
await self.call_listeners(type(notice).__name__.lower(), ctx=notice)
log.info('WebSocket has been closed!')
if hasattr(self, 'jointask'):
self.jointask.cancel()
self.__joinqueue = Queue()
# This is a complex function, and it has a high "Cognitive Complexity", beyond the limit of 15.
def start(self, token, nick, channels=None) -> None:
self.token = token
self.nick = nick
if self.flags.log_hint_differences:
self.flag_storage['hint_classes'] = []
for _, obj in inspect.getmembers(sys.modules['hints']):
if inspect.isclass(obj):
self.flag_storage['hint_classes'].append(obj)
if not self.dbaddress:
self.dbaddress = 'sqlite:///elenabot.sqlite' # default
self.database = dataset.connect(self.dbaddress, engine_kwargs={'pool_recycle': 3600})
tab = self.database.create_table('incoming')
# tab.create_column('timestamp', self.database.types.datetime(6))
tab.create_column_by_example('timestamp', datetime.utcnow())
# tab.create_column('channel', self.database.types.text())
tab.create_column_by_example('channel', self.nick) # fails
tab.create_column_by_example('event', self.nick) # fails
tab.create_column('data', LargeBinary()) # fails
# tab.create_column_by_example('event', b'')
try: # we want this but things like SQLite don't do this so yeah
self.database.query("ALTER TABLE `incoming` COLLATE='utf8mb4_unicode_ci', CHANGE COLUMN `timestamp` `timestamp` DATETIME(6) NULL DEFAULT NULL AFTER `id`;")
except Exception:
pass
def attempt_connection():
success, ret = self.attempt(asyncio.run, self.__wsloop(channels))
if success is not None:
log.debug(ret)
# con.commit()
self.attempt(asyncio.run, asyncio.sleep(1))
if not self.auto_reconnect:
attempt_connection()
return
while self.auto_reconnect:
attempt_connection()
log.info('Auto Reconnect has been disabled, and the program has stopped.')
async def join(self, channels: Union[list, str]):
channels = [channels] if isinstance(channels, str) else channels
channels = [chan for chan in channels if chan not in self.__channels]
if not channels:
if hasattr(self, 'jointask') and self.jointask.done():
for chan in self.__channels:
self.__joinqueue.put(chan)
return
for chan in channels:
self.__channels.append(chan)
self.__joinqueue.put(chan)
async def _join(self) -> None: # i dont like this, but there's no other way to really do this
while True:
if not self.__joinqueue.empty():
channel = self.__joinqueue.get()
c = self.merge(channel)
await self.sock.send_str(f"JOIN {c}")
log.info(f'Joined {c}')
self.__outgoing[c] = []
await asyncio.sleep(0.5) # 20 times per 10 seconds, 2 times a second
async def part(self, channels: Union[list, str]) -> None: # i made it work
channels = [channels] if isinstance(channels, str) else channels
channels = [self.split(x) for x in channels]
channels = [chan for chan in channels if chan in self.__channels]
if not channels: return
for chan in channels:
c = self.merge(chan)
await self.sock.send_str(f"PART {c}")
log.info(f'Left {c}')
self.__channels.remove(chan)
@event('any')
async def log_incoming(self, ctx):
if type(ctx).__name__ in ['JOIN', 'PART']:
return
channel = None
if hasattr(ctx, 'channel'):
channel = ctx.channel
event_data = asdict(ctx)
if 'send' in event_data:
del event_data['send']
self.database['incoming'].insert(dict(
timestamp=datetime.utcnow(),
channel=channel,
event=str(ctx),
data=msgpack.packb({type(ctx).__name__: event_data})
))
@event('message', 'sub')
async def log_messageable(self, ctx: hints.Messageable) -> None:
if type(ctx).__name__ == 'PRIVMSG' and ctx.action:
log.info(f'ACTION {ctx.message.channel} >>> {ctx.message.author}: {ctx.message.content}')
return
log.info(f'{type(ctx).__name__} {ctx.message.channel} >>> {ctx.message.author}: {ctx.message.content}')
@event('userstate')
async def verify_outgoing_approve(self, ctx):
if not self.__outgoing[ctx.channel]: return
msg = self.__outgoing[ctx.channel].pop(0)
log.info(f'SENT {ctx.channel} >>> {ctx.user}: {msg}')
# cur.execute('insert into msg_sent values (?, ?)', (ctx.channel, msg,))
# con.commit()
@event('notice')
async def verify_outgoing_deny(self, ctx):
if not ctx.msg_id.startswith('msg'): return
if ctx.msg_id == 'msg_banned': # ban me bitches, im tired of this throwing errors cuz im on a bot list
# cur.execute('insert into msg_banned values (?)', (ctx.channel,))
# con.commit()
return
msg = self.__outgoing[ctx.channel].pop(0)
log.info(f'FAIL {ctx.channel} >>> {self.nick}: {msg}')
# cur.execute('insert into msg_denied values (?, ?)', (ctx.channel, msg,))
# con.commit()
@event('notice')
async def log_notices(self, ctx: hints.NOTICE):
func = log.info if __debug__ else log.debug
func(f'NOTICE({ctx.msg_id}): {ctx.channel} >>> {ctx.message}')
@event('cap')
async def get_cap(self, ctx):
log.info(ctx)
def attempt(self, func, *args, **kwargs) -> Any:
try:
return True, func(*args, **kwargs)
except Exception as exc:
return False, exc
def any_listeners(self, *events): # this is in an attempt to not construct a event object unless there's an event registered
if __debug__ and not self.flags.log_hint_differences: return True
for event in _listeners:
if ':' in event:
if event.split(':')[0] in events:
return True
else:
if event in events:
return True
return False
async def _lcall(self, event: str, **kwargs):
if event not in _listeners: return
for func in _listeners[event]:
await func(self, **kwargs)
async def call_listeners(self, event: str, **kwargs) -> None: # this is all overcomplicated
if ':' in event: await self._lcall(event.split(':')[0], **kwargs) # we call the base event here. simplifies coding subevents
await self._lcall(event, **kwargs)
if 'any' not in event: # I might add some logging here to compare the event to the hints file. Will only be enabled if a flag is set
await self._lcall('any', **kwargs)
if self.flags.log_hint_differences and 'ctx' in kwargs:
if hint_class := [x for x in self.flag_storage['hint_classes'] if type(kwargs['ctx']).__name__ == x.__name__]:
if hint_class == []: return
c1 = hint_class[0].__annotations__
# log.info(asdict(hint_class[0]()))
c2 = type(kwargs['ctx']).__annotations__
# log.info(set(hint_class[0].__annotations__) ^ set(type(kwargs['ctx']).__annotations__))
difference = set(c2) - set(c1) - set(['server'])
# print(difference)
# print(type(difference))
# print(set([]))
# if difference == set(['server']): return
# print(difference == set(['']))
if difference == set(): return
self.database['log_hint_differences'].insert(dict(
classname=hint_class[0].__name__,
difference=difference,
c1=c1,
c2=c2
))
# difference=set(c1) ^ set(c2),
# log.info(type(kwargs['ctx']).__name__)
def func_on_cooldown(self, func: Callable, time: int) -> bool:
time_now = datetime.utcnow()
if func in self.__cooldowns:
if (time_now - self.__cooldowns[func]).seconds >= time:
self.__cooldowns[func] = time_now
return False
else:
self.__cooldowns[func] = time_now
return False
return True
async def send(self, message: str, channel: str) -> None:
if __debug__ and not self.flags.send_in_debug: return # since i do a lot of debugging, i dont want to accidentally send something in a chat
await self.sock.send_str(f'PRIVMSG {channel} :{message}') # placement of the : is important :zaqPbt:
self.__outgoing[channel].append(message)
# cur.execute('insert into outgoing values (?, ?)', (channel, message,))
def maximize_msg(self, content: str, offset: int = 0) -> str:
return self.fill_msg(content, 500 - offset)
def fill_msg(self, content: str, length: int = 500) -> str:
return content * math.trunc(length / len(content)) # need to trunc cuz we always round down
def merge(self, channel: str): # name isn't appropriate for use-case
return channel.lower() if channel[0] == '#' else '#' + channel.lower()
def split(self, channel: str): # name isn't appropriate for use-case
return channel[1:].lower() if channel[0] == '#' else channel.lower()