-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpypubsub.py
74 lines (53 loc) · 1.95 KB
/
pypubsub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""基于 QuecPython 的订阅/发布机制"""
from usr.threading import Thread, Queue, Lock
class Publisher(object):
def __init__(self):
self.__q = Queue()
self.__topic_manager_lock = Lock()
self.__topic_manager = {}
self.__listen_thread = Thread(target=self.__listen_worker)
def listen(self):
self.__listen_thread.start()
def __listen_worker(self):
while True:
topic, messages = self.__q.get()
# print("topic: {}, messages: {}".format(topic, messages))
with self.__topic_manager_lock:
for listener in self.__topic_manager.setdefault(topic, []):
try:
listener(**messages)
except Exception as e:
print("listener error:", str(e))
def publish(self, topic, **kwargs):
self.__q.put((topic, kwargs))
def subscribe(self, topic, listener):
with self.__topic_manager_lock:
listener_list = self.__topic_manager.setdefault(topic, [])
listener_list.append(listener)
def unsubscribe(self, topic, listener):
with self.__topic_manager_lock:
listener_list = self.__topic_manager.setdefault(topic, [])
try:
listener_list.remove(listener)
except ValueError:
pass
# global publisher
__publisher__ = None
def get_default_publisher():
global __publisher__
if __publisher__ is None:
__publisher__ = Publisher()
__publisher__.listen()
return __publisher__
def publish(topic, **kwargs):
"""订阅消息"""
pub = get_default_publisher()
pub.publish(topic, **kwargs)
def subscribe(topic, listener):
"""订阅消息"""
pub = get_default_publisher()
pub.subscribe(topic, listener)
def unsubscribe(topic, listener):
"""取消订阅消息"""
pub = get_default_publisher()
pub.unsubscribe(topic, listener)