Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve robustness for multiple client connections #803

Merged
merged 6 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 22 additions & 19 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from threading import Lock
from threading import Lock, RLock

from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
from rosbridge_library.internal import ros_loader
Expand Down Expand Up @@ -123,7 +123,7 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F
# Create the subscriber and associated member variables
# Subscriptions is initialized with the current client to start with.
self.subscriptions = {client_id: callback}
self.lock = Lock()
self.rlock = RLock()
self.msg_class = msg_class
self.node_handle = node_handle
self.topic = topic
Expand All @@ -138,8 +138,11 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F

def unregister(self):
self.node_handle.destroy_subscription(self.subscriber)
with self.lock:
with self.rlock:
self.subscriptions.clear()
if self.new_subscriber:
self.node_handle.destroy_subscription(self.new_subscriber)
self.new_subscriber = None

def verify_type(self, msg_type):
"""Verify that the subscriber subscribes to messages of this type.
Expand All @@ -165,7 +168,7 @@ def subscribe(self, client_id, callback):
messages

"""
with self.lock:
with self.rlock:
# If the topic is latched, adding a new subscriber will immediately invoke
# the given callback.
# In any case, the first message is handled using new_sub_callback,
Expand All @@ -183,16 +186,16 @@ def unsubscribe(self, client_id):
client_id -- the ID of the client to unsubscribe

"""
with self.lock:
with self.rlock:
if client_id in self.new_subscriptions:
del self.new_subscriptions[client_id]
else:
del self.subscriptions[client_id]

def has_subscribers(self):
"""Return true if there are subscribers"""
with self.lock:
return len(self.subscriptions) != 0
with self.rlock:
return len(self.subscriptions) + len(self.new_subscriptions) != 0

def callback(self, msg, callbacks=None):
"""Callback for incoming messages on the rclpy subscription.
Expand All @@ -206,18 +209,18 @@ def callback(self, msg, callbacks=None):
"""
outgoing = OutgoingMessage(msg)

# Get the callbacks to call
if not callbacks:
with self.lock:
callbacks = self.subscriptions.values()
with self.rlock:
callbacks = callbacks or self.subscriptions.values()

# Pass the JSON to each of the callbacks
for callback in callbacks:
try:
callback(outgoing)
except Exception as exc:
# Do nothing if one particular callback fails except log it
self.node_handle.get_logger().error(f"Exception calling subscribe callback: {exc}")
# Pass the JSON to each of the callbacks
for callback in callbacks:
try:
callback(outgoing)
except Exception as exc:
# Do nothing if one particular callback fails except log it
self.node_handle.get_logger().error(
f"Exception calling subscribe callback: {exc}"
)

def _new_sub_callback(self, msg):
"""
Expand All @@ -230,7 +233,7 @@ def _new_sub_callback(self, msg):
the subscriptions dictionary is updated with the newly incorporated
subscriptors.
"""
with self.lock:
with self.rlock:
self.callback(msg, self.new_subscriptions.values())
self.subscriptions.update(self.new_subscriptions)
self.new_subscriptions = {}
Expand Down
3 changes: 2 additions & 1 deletion rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ def prewrite_message(self, message, binary):
"WebSocketClosedError: Tried to write to a closed websocket",
throttle_duration_sec=1.0,
)
raise
# If we end up here, a client has disconnected before its message callback(s) could be removed.
# To avoid log spamming, we only log a warning and do not re-raise the exception here.
except StreamClosedError:
cls.node_handle.get_logger().warn(
"StreamClosedError: Tried to write to a closed stream",
Expand Down