Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor performance improvements #809

Merged
merged 4 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions rosbridge_library/src/rosbridge_library/internal/subscribers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from threading import Lock

from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.qos import DurabilityPolicy, QoSProfile, ReliabilityPolicy
from rosbridge_library.internal import ros_loader
from rosbridge_library.internal.message_conversion import msg_class_type_repr
Expand Down Expand Up @@ -129,9 +130,10 @@ def __init__(self, topic, client_id, callback, node_handle, msg_type=None, raw=F
self.topic = topic
self.qos = qos
self.raw = raw
self.callback_group = MutuallyExclusiveCallbackGroup()

self.subscriber = node_handle.create_subscription(
msg_class, topic, self.callback, qos, raw=raw
msg_class, topic, self.callback, qos, raw=raw, callback_group=self.callback_group
)
self.new_subscriber = None
self.new_subscriptions = {}
Expand Down Expand Up @@ -173,7 +175,12 @@ def subscribe(self, client_id, callback):
self.new_subscriptions.update({client_id: callback})
if self.new_subscriber is None:
self.new_subscriber = self.node_handle.create_subscription(
self.msg_class, self.topic, self._new_sub_callback, self.qos, raw=self.raw
self.msg_class,
self.topic,
self._new_sub_callback,
self.qos,
raw=self.raw,
callback_group=self.callback_group,
)

def unsubscribe(self, client_id):
Expand Down
4 changes: 3 additions & 1 deletion rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ def main(args=None):
rclpy.init(args=args)
node = RosbridgeWebsocketNode()

spin_callback = PeriodicCallback(lambda: rclpy.spin_once(node, timeout_sec=0.01), 1)
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
spin_callback = PeriodicCallback(lambda: executor.spin_once(timeout_sec=0.01), 1)
spin_callback.start()
try:
start_hook()
Expand Down
26 changes: 11 additions & 15 deletions rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
from rosbridge_library.util import bson
from tornado import version_info as tornado_version_info
from tornado.gen import BadYieldError, coroutine
from tornado.gen import BadYieldError
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketClosedError, WebSocketHandler
Expand Down Expand Up @@ -146,7 +146,6 @@ def open(self):
self.incoming_queue.start()
self.protocol.outgoing = self.send_message
self.set_nodelay(True)
self._write_lock = threading.RLock()
cls.clients_connected += 1
if cls.client_manager:
cls.client_manager.add_client(self.client_id, self.request.remote_ip)
Expand Down Expand Up @@ -185,24 +184,21 @@ def send_message(self, message):
else:
binary = False

with self._write_lock:
_io_loop.add_callback(partial(self.prewrite_message, message, binary))
_io_loop.add_callback(partial(self.prewrite_message, message, binary))

@coroutine
def prewrite_message(self, message, binary):
async def prewrite_message(self, message, binary):
cls = self.__class__
# Use a try block because the log decorator doesn't cooperate with @coroutine.
# Use a try block because the log decorator doesn't cooperate with coroutines.
try:
with self._write_lock:
future = self.write_message(message, binary)
future = self.write_message(message, binary)

# When closing, self.write_message() return None even if it's an undocument output.
# Consider it as WebSocketClosedError
# For tornado versions <4.3.0 self.write_message() does not have a return value
if future is None and tornado_version_info >= (4, 3, 0, 0):
raise WebSocketClosedError
# When closing, self.write_message() return None even if it's an undocument output.
# Consider it as WebSocketClosedError
# For tornado versions <4.3.0 self.write_message() does not have a return value
if future is None and tornado_version_info >= (4, 3, 0, 0):
raise WebSocketClosedError

yield future
return future
except WebSocketClosedError:
cls.node_handle.get_logger().warn(
"WebSocketClosedError: Tried to write to a closed websocket",
Expand Down