-
Notifications
You must be signed in to change notification settings - Fork 0
/
liveloading.py
40 lines (36 loc) · 942 Bytes
/
liveloading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import websocket
import threading
import time
class Liveloading:
def __init__(self, socketize=None):
self.socketize = socketize
self.workers = []
self.sluggiesToBeMadeIntoMen = []
def subscribe(self, slug, callback):
worker = self.getWorker()
# self.sluggiesToBeMadeIntoMen.append([slug, callback])
def getWorker(self):
if len(self.workers) < 1:
worker = Worker()
self.workers.append(worker)
def run(self):
while True:
time.sleep(0.1)
class Worker:
def __init__(self, socketize=None):
self.thread = threading.Thread(target=self.salad, args=())
self.thread.daemon = True
self.thread.start()
self.sluggies = []
def salad(self):
""" It handles many salad. """
while not self.abort:
try:
self.connect()
self.handle()
except:
pass
time.sleep(1)
def connect(self):
self.ws = websocket.WebSocket()
self.ws.connect("wss://realtime.beam.pro/socket.io/?EIO=3&transport=websocket")