diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bccabcb..1f860dd 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,8 @@ Unreleased **Fixed** +* Fixed blocking issues on the Twisted/Autobahn-based implementation of websockets + **Deprecated** **Removed** diff --git a/src/roslibpy/comm/comm_autobahn.py b/src/roslibpy/comm/comm_autobahn.py index cf50461..a292559 100644 --- a/src/roslibpy/comm/comm_autobahn.py +++ b/src/roslibpy/comm/comm_autobahn.py @@ -45,11 +45,11 @@ def onClose(self, wasClean, code, reason): LOGGER.info('WebSocket connection closed: Code=%s, Reason=%s', str(code), reason) def send_message(self, payload): - return self.sendMessage(payload, isBinary=False, fragmentSize=None, sync=False, doNotCompress=False) + return reactor.callFromThread(self.sendMessage, payload, isBinary=False, fragmentSize=None, sync=False, doNotCompress=False) def send_close(self): self._manual_disconnect = True - self.sendClose() + reactor.callFromThread(self.sendClose) class AutobahnRosBridgeClientFactory(EventEmitterMixin, ReconnectingClientFactory, WebSocketClientFactory): @@ -190,7 +190,7 @@ def call_later(self, delay, callback): delay (:obj:`int`): Number of seconds to wait before invoking the callback. callback (:obj:`callable`): Callable function to be invoked when the delay has elapsed. """ - reactor.callLater(delay, callback) + reactor.callFromThread(reactor.callLater, delay, callback) def call_in_thread(self, callback): """Call the given function on a thread. @@ -198,7 +198,7 @@ def call_in_thread(self, callback): Args: callback (:obj:`callable`): Callable function to be invoked in a thread. """ - reactor.callInThread(callback) + reactor.callFromThread(reactor.callInThread, callback) def blocking_call_from_thread(self, callback, timeout): """Call the given function from a thread, and wait for the result synchronously @@ -258,6 +258,9 @@ def inner_errback(error): def terminate(self): """Signals the termination of the main event loop.""" if reactor.running: - reactor.stop() + reactor.callFromThread(reactor.stop) + + if self._thread: + self._thread.join() self._log_observer.stop()