Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriber concurrency review #478

Merged
merged 3 commits into from
Apr 8, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ def verify_type(self, msg_type):
if not ros_loader.get_message_class(msg_type) is self.msg_class:
raise TypeConflictException(self.topic,
self.msg_class._type, msg_type)
return

def subscribe(self, client_id, callback):
""" Subscribe the specified client to this subscriber.
Expand Down Expand Up @@ -150,8 +149,7 @@ def unsubscribe(self, client_id):
def has_subscribers(self):
""" Return true if there are subscribers """
with self.lock:
ret = len(self.subscriptions) != 0
return ret
return len(self.subscriptions) != 0

def callback(self, msg, callbacks=None):
""" Callback for incoming messages on the rospy.Subscriber
Expand All @@ -177,7 +175,6 @@ def callback(self, msg, callbacks=None):
except Exception as exc:
# Do nothing if one particular callback fails except log it
logerr("Exception calling subscribe callback: %s", exc)
pass


class SubscriberManager():
Expand All @@ -186,6 +183,7 @@ class SubscriberManager():
"""

def __init__(self):
self._lock = Lock()
self._subscribers = {}

def subscribe(self, client_id, topic, callback, msg_type=None):
Expand All @@ -198,13 +196,14 @@ def subscribe(self, client_id, topic, callback, msg_type=None):
msg_type -- (optional) the type of the topic

"""
if not topic in self._subscribers:
self._subscribers[topic] = MultiSubscriber(topic, msg_type)
with self._lock:
if not topic in self._subscribers:
self._subscribers[topic] = MultiSubscriber(topic, msg_type)

if msg_type is not None:
self._subscribers[topic].verify_type(msg_type)
if msg_type is not None:
self._subscribers[topic].verify_type(msg_type)

self._subscribers[topic].subscribe(client_id, callback)
self._subscribers[topic].subscribe(client_id, callback)

def unsubscribe(self, client_id, topic):
""" Unsubscribe from a topic
Expand All @@ -214,14 +213,12 @@ def unsubscribe(self, client_id, topic):
topic -- the topic to unsubscribe from

"""
if not topic in self._subscribers:
return

self._subscribers[topic].unsubscribe(client_id)
with self._lock:
self._subscribers[topic].unsubscribe(client_id)

if not self._subscribers[topic].has_subscribers():
self._subscribers[topic].unregister()
del self._subscribers[topic]
if not self._subscribers[topic].has_subscribers():
self._subscribers[topic].unregister()
del self._subscribers[topic]


manager = SubscriberManager()
Expand Down