11import threading
2- from pubsub import pub
2+ from event_channel . threaded_event_channel import ThreadedEventChannel
33
44class EventManager (object ):
55 _instance = None
@@ -10,25 +10,33 @@ def get_instance(cls, node_name=None):
1010 return cls ._instance
1111
1212 def __init__ (self , node_name ):
13+ self ._channel = ThreadedEventChannel (blocking = False )
14+ self ._publisher_threads = None
15+ self ._subscribers = []
1316 self ._node_name = node_name
1417 self ._event_generators = []
1518
1619 def publish (self , topic , msg ):
17- pub . sendMessage (topic , message = msg )
20+ self . _publisher_threads = self . _channel . publish (topic , msg )
1821
1922 def register_event_listener (self , topic , callback ):
20- pub .subscribe (callback , topic )
23+ self ._channel .subscribe (topic , callback )
24+ self ._subscribers .append ((topic , callback ))
2125
2226 def register_event_generator (self , generator_func ):
2327 generator = threading .Thread (target = generator_func )
2428 self ._event_generators .append (generator )
2529 generator .start ()
2630
2731 def unregister_listeners (self ):
28- pub .unsubAll ()
29-
32+ for l in self ._subscribers :
33+ self ._channel .unsubscribe (l [0 ], l [1 ])
34+ self ._subscribers = []
35+
3036 def unregister_publishers (self ):
31- pass
37+ if self ._publisher_threads :
38+ for t in self ._publisher_threads :
39+ t .join ()
3240
3341 def start_event_generators (self ):
3442 for g in self ._event_generators :
@@ -37,3 +45,4 @@ def start_event_generators(self):
3745 def wait_event_generators (self ):
3846 for g in self ._event_generators :
3947 g .join ()
48+ self ._event_generators = []
0 commit comments