From 8004ae09a7019c6cc31e82a935abc0a45f4ef1ce Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Mon, 19 Apr 2021 09:33:28 -0700 Subject: [PATCH 01/11] RosConnection 2.0 --- .../Runtime/TcpConnector/ROSConnection.cs | 307 ++++++------------ 1 file changed, 98 insertions(+), 209 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index a7546b08..92624886 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -11,6 +11,8 @@ using System.Globalization; using UnityEngine; using UnityEngine.Serialization; +using System.Collections.Concurrent; +using System.Threading; namespace Unity.Robotics.ROSTCPConnector { @@ -20,32 +22,22 @@ public class ROSConnection : MonoBehaviour [FormerlySerializedAs("hostName")] public string rosIPAddress = "127.0.0.1"; [FormerlySerializedAs("hostPort")] public int rosPort = 10000; - [Tooltip("If blank, determine IP automatically.")] - public string overrideUnityIP = ""; + private int networkTimeout = 2000; - public int unityPort = 5005; - bool alreadyStartedServer = false; + [Tooltip("While reading received messages, read this many bytes at a time.")] + public int readChunkSize = 2048; - private int networkTimeout = 2000; + [Tooltip("Send keepalive message if nothing has been sent for this long (seconds).")] + public float keepaliveTime = 10; + // Remove? [Tooltip("While waiting for a service to respond, check this many times before giving up.")] public int awaitDataMaxRetries = 10; + // Remove? [Tooltip("While waiting for a service to respond, wait this many seconds between checks.")] public float awaitDataSleepSeconds = 1.0f; - [Tooltip("While reading received messages, read this many bytes at a time.")] - public int readChunkSize = 2048; - - [Tooltip("While waiting to read a full message, check this many times before giving up.")] - public int awaitDataReadRetry = 10; - - [Tooltip("Close connection if nothing has been sent for this long (seconds).")] - public float timeoutOnIdle = 10; - - static object _lock = new object(); // sync lock - static List activeConnectionTasks = new List(); // pending connections - const string ERROR_TOPIC_NAME = "__error"; const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; const string HANDSHAKE_TOPIC_NAME = "__handshake"; @@ -106,6 +98,8 @@ public void ImplementService(string topic, Func callback) public async void SendServiceMessage(string rosServiceName, Message serviceRequest, Action callback) where RESPONSE : Message, new() { + // For phase 2, gut this and rewrite + // Serialize the message in service name, message size, and message bytes format byte[] messageBytes = GetMessageBytes(rosServiceName, serviceRequest); @@ -225,6 +219,11 @@ void OnEnable() _instance = this; } + Thread connectionThread; + Thread readerThread; + ConcurrentQueue> outgoingMessages = new ConcurrentQueue>(); + ConcurrentQueue> incomingMessages = new ConcurrentQueue>(); + private void Start() { if(!IPFormatIsCorrect(rosIPAddress)) @@ -232,15 +231,12 @@ private void Start() InitializeHUD(); Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); - if (overrideUnityIP != "") - { - if(!IPFormatIsCorrect(overrideUnityIP)) - Debug.LogError("Override Unity IP address is not correct"); - StartMessageServer(overrideUnityIP, unityPort); // no reason to wait, if we already know the IP - } + connectionThread = new Thread(ConnectionThread); + connectionThread.Start(); - SendServiceMessage(HANDSHAKE_TOPIC_NAME, - new MUnityHandshakeRequest(overrideUnityIP, (ushort) unityPort), RosUnityHandshakeCallback); + // Phase 2: send handshakes again + //SendServiceMessage(HANDSHAKE_TOPIC_NAME, + // new MUnityHandshakeRequest(overrideUnityIP, (ushort) unityPort), RosUnityHandshakeCallback); } void OnValidate() @@ -262,79 +258,99 @@ private void InitializeHUD() hudPanel.isEnabled = showHUD; } + /* Phase 2 void RosUnityHandshakeCallback(MUnityHandshakeResponse response) { StartMessageServer(response.ip, unityPort); - } + }*/ void RosUnityErrorCallback(MRosUnityError error) { Debug.LogError("ROS-Unity error: " + error.message); } - /// TcpClient to read byte stream from. - protected async Task HandleConnectionAsync(TcpClient tcpClient) + private void Update() { - await Task.Yield(); + m_RealTimeSinceStartup = Time.realtimeSinceStartup; - // continue asynchronously on another thread - await ReadFromStream(tcpClient.GetStream()); + Tuple data; + while(incomingMessages.TryDequeue(out data)) + { + (string topic, byte[] contents) = data; + // notify whatever is interested in this incoming message + SubscriberCallback callback; + if(subscribers.TryGetValue(topic, out callback)) + { + Message message = (Message)callback.messageConstructor.Invoke(new object[]{ }); + message.Deserialize(contents, 0); + callback.callbacks.ForEach(item=>item.Invoke(message)); + } + } } - async Task ReadFromStream(NetworkStream networkStream) - { - if (!networkStream.CanRead) - return; + float m_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here - SubscriberCallback subs; + void ConnectionThread() + { + TcpClient client = null; - float lastDataReceivedRealTimestamp = 0; - do + while (true) { - // try to keep reading messages as long as the networkstream has data. - // But if it's taking too long, don't freeze forever. - float frameLimitRealTimestamp = Time.realtimeSinceStartup + 0.1f; - while (networkStream.DataAvailable && Time.realtimeSinceStartup < frameLimitRealTimestamp) + try { - if (!Application.isPlaying) - { - networkStream.Close(); - return; - } - - (string topicName, byte[] content) = await ReadMessageContents(networkStream); - lastDataReceivedRealTimestamp = Time.realtimeSinceStartup; - - if (!subscribers.TryGetValue(topicName, out subs)) - continue; // not interested in this topic + client = new TcpClient(); + client.Connect(rosIPAddress, rosPort); - Message msg = (Message)subs.messageConstructor.Invoke(new object[0]); - msg.Deserialize(content, 0); + NetworkStream networkStream = client.GetStream(); + networkStream.ReadTimeout = networkTimeout; - if (hudPanel != null) - hudPanel.SetLastMessageReceived(topicName, msg); + readerThread = new Thread(ReaderThread); + readerThread.Start(networkStream); - foreach (Func callback in subs.callbacks) + // connected ok, now just watch our queue for outgoing messages to send (or else send a keepalive message occasionally) + while(networkStream.CanWrite) { - try + Tuple data; + float waitingSinceRealTime = m_RealTimeSinceStartup; + while (!outgoingMessages.TryDequeue(out data)) { - Message response = callback(msg); - if (response != null) + Thread.Yield(); + if (m_RealTimeSinceStartup > waitingSinceRealTime + keepaliveTime) { - // if the callback has a response, it's implementing a service - WriteDataStaggered(networkStream, topicName, response); + // send a keepalive message (8 zeroes = a ros message with topic "" and no message data.) + networkStream.Write(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }, 0, 8); + waitingSinceRealTime = m_RealTimeSinceStartup; } } - catch (Exception e) - { - Debug.LogError("Subscriber callback problem: " + e); - } + + WriteDataStaggered(networkStream, data.Item1, data.Item2); } } - await Task.Yield(); - } - while (Time.realtimeSinceStartup < lastDataReceivedRealTimestamp + timeoutOnIdle); // time out if idle too long. - networkStream.Close(); + catch (Exception e) + { + if (readerThread != null) + readerThread.Abort(); + + if (client != null) + client.Close(); + } + } + } + + async void ReaderThread(object param) + { + NetworkStream networkStream = (NetworkStream)param; + while (networkStream.CanRead) + { + try + { + Tuple content = await ReadMessageContents(networkStream); + incomingMessages.Enqueue(content); + } + catch(Exception e) + { + } + } } async Task> ReadMessageContents(NetworkStream networkStream) @@ -357,16 +373,9 @@ async Task> ReadMessageContents(NetworkStream networkStrea int bytesRemaining = full_message_size; int totalBytesRead = 0; - int attempts = 0; - // Read in message contents until completion, or until attempts are maxed out - while (bytesRemaining > 0 && attempts <= this.awaitDataReadRetry) + // Read in message contents until completion + while (bytesRemaining > 0) { - if (attempts == this.awaitDataReadRetry) - { - Debug.LogError("No more data to read network stream after " + awaitDataReadRetry + " attempts."); - return Tuple.Create(topicName, readBuffer); - } - // Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(readChunkSize, bytesRemaining)); totalBytesRead += bytesRead; @@ -374,109 +383,22 @@ async Task> ReadMessageContents(NetworkStream networkStrea if (!networkStream.DataAvailable) { - attempts++; await Task.Yield(); } + if (!networkStream.CanRead) + return null; } return Tuple.Create(topicName, readBuffer); } - /// - /// Handles multiple connections and locks. - /// - /// TcpClient to read byte stream from. - private async Task StartHandleConnectionAsync(TcpClient tcpClient) + void OnApplicationQuit() { - var connectionTask = HandleConnectionAsync(tcpClient); - - lock (_lock) - activeConnectionTasks.Add(connectionTask); - - try - { - await connectionTask; - // we may be on another thread after "await" - } - catch (Exception ex) - { - Debug.LogError(ex.ToString()); - } - finally - { - lock (_lock) - activeConnectionTasks.Remove(connectionTask); - } + if (connectionThread != null) + connectionThread.Abort(); + if (readerThread != null) + readerThread.Abort(); } - TcpListener tcpListener; - - protected async void StartMessageServer(string ip, int port) - { - if (alreadyStartedServer) - return; - - alreadyStartedServer = true; - while (true) - { - try - { - if (!Application.isPlaying) - break; - tcpListener = new TcpListener(IPAddress.Parse(ip), port); - tcpListener.Start(); - - Debug.Log("ROS-Unity server listening on " + ip + ":" + port); - - while (true) //we wait for a connection - { - var tcpClient = await tcpListener.AcceptTcpClientAsync(); - - var task = StartHandleConnectionAsync(tcpClient); - // if already faulted, re-throw any error on the calling context - if (task.IsFaulted) - await task; - - // try to get through the message queue before doing another await - // but if messages are arriving faster than we can process them, don't freeze up - float abortAtRealtime = Time.realtimeSinceStartup + 0.1f; - while (tcpListener.Pending() && Time.realtimeSinceStartup < abortAtRealtime) - { - tcpClient = tcpListener.AcceptTcpClient(); - task = StartHandleConnectionAsync(tcpClient); - if (task.IsFaulted) - await task; - } - } - } - catch (ObjectDisposedException e) - { - if (!Application.isPlaying) - { - // This only happened because we're shutting down. Not a problem. - } - else - { - Debug.LogError("Exception raised!! " + e); - } - } - catch (Exception e) - { - Debug.LogError("Exception raised!! " + e); - } - - // to avoid infinite loops, wait a frame before trying to restart the server - await Task.Yield(); - } - } - - private void OnApplicationQuit() - { - if (tcpListener != null) - tcpListener.Stop(); - tcpListener = null; - } - - /// /// Given some input values, fill a byte array in the desired format to use with /// https://github.com/Unity-Technologies/Robotics-Tutorials/tree/master/catkin_ws/src/tcp_endpoint @@ -550,42 +472,9 @@ void SendSysCommand(string command, object param) Send(SYSCOMMAND_TOPIC_NAME, new MRosUnitySysCommand(command, JsonUtility.ToJson(param))); } - public async void Send(string rosTopicName, Message message) + public void Send(string rosTopicName, Message message) { - TcpClient client = null; - try - { - client = new TcpClient(); - await client.ConnectAsync(rosIPAddress, rosPort); - - NetworkStream networkStream = client.GetStream(); - networkStream.ReadTimeout = networkTimeout; - - WriteDataStaggered(networkStream, rosTopicName, message); - } - catch (NullReferenceException e) - { - Debug.LogError("TCPConnector.SendMessage Null Reference Exception: " + e); - } - catch (Exception e) - { - Debug.LogError("TCPConnector Exception: " + e); - } - finally - { - if (client != null && client.Connected) - { - try - { - if (hudPanel != null) hudPanel.SetLastMessageSent(rosTopicName, message); - client.Close(); - } - catch (Exception) - { - //Ignored. - } - } - } + outgoingMessages.Enqueue(new Tuple(rosTopicName, message)); } /// From 812dcbf9e8a9b933a0cebc74bb739e57ff370307 Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Wed, 21 Apr 2021 14:33:52 -0700 Subject: [PATCH 02/11] Better read logic --- .../Editor/ROSSettingsEditor.cs | 16 -------- .../Runtime/TcpConnector/ROSConnection.cs | 40 +++++++++---------- 2 files changed, 19 insertions(+), 37 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 7fd13302..1ca0443a 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -43,14 +43,6 @@ protected virtual void OnGUI() prefab.rosIPAddress = EditorGUILayout.TextField("ROS IP Address", prefab.rosIPAddress); prefab.rosPort = EditorGUILayout.IntField("ROS Port", prefab.rosPort); EditorGUILayout.Space(); - prefab.overrideUnityIP = EditorGUILayout.TextField( - new GUIContent("Override Unity IP Address", "If blank, determine IP automatically."), - prefab.overrideUnityIP); - prefab.unityPort = EditorGUILayout.IntField("Unity Port", prefab.unityPort); - if ((prefab.overrideUnityIP != "" && !ROSConnection.IPFormatIsCorrect(prefab.overrideUnityIP))) - { - EditorGUILayout.HelpBox("Unity Override IP invalid", MessageType.Warning); - } if(!ROSConnection.IPFormatIsCorrect(prefab.rosIPAddress)) { @@ -70,14 +62,6 @@ protected virtual void OnGUI() new GUIContent("Read chunk size", "While reading received messages, read this many bytes at a time."), prefab.readChunkSize); - prefab.awaitDataReadRetry = EditorGUILayout.IntField( - new GUIContent("Max Read retries", - "While waiting to read a full message, check this many times before giving up."), - prefab.awaitDataReadRetry); - prefab.timeoutOnIdle = EditorGUILayout.FloatField( - new GUIContent("Timeout on idle (seconds)", - "If no messages have been sent for this long, close the connection."), - prefab.timeoutOnIdle); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 92624886..f8c64183 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -147,7 +147,6 @@ public async void SendServiceMessage(string rosServiceName, Message se try { - string serviceName; (string topicName, byte[] content) = await ReadMessageContents(networkStream); serviceResponse.Deserialize(content, 0); } @@ -353,41 +352,40 @@ async void ReaderThread(object param) } } + void ReadToByteArray(NetworkStream networkStream, byte[] array) + { + int read = 0; + while (read < array.Length && networkStream.CanRead) + { + if (!networkStream.DataAvailable) + Thread.Yield(); + + read += networkStream.Read(array, 0, array.Length - read); + } + + if (read < array.Length) + throw new SocketException(); // the connection has closed + } + async Task> ReadMessageContents(NetworkStream networkStream) { // Get first bytes to determine length of topic name byte[] rawTopicBytes = new byte[4]; - networkStream.Read(rawTopicBytes, 0, rawTopicBytes.Length); + ReadToByteArray(networkStream, rawTopicBytes); int topicLength = BitConverter.ToInt32(rawTopicBytes, 0); // Read and convert topic name byte[] topicNameBytes = new byte[topicLength]; - networkStream.Read(topicNameBytes, 0, topicNameBytes.Length); + ReadToByteArray(networkStream, topicNameBytes); string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); byte[] full_message_size_bytes = new byte[4]; - networkStream.Read(full_message_size_bytes, 0, full_message_size_bytes.Length); + ReadToByteArray(networkStream, full_message_size_bytes); int full_message_size = BitConverter.ToInt32(full_message_size_bytes, 0); byte[] readBuffer = new byte[full_message_size]; - int bytesRemaining = full_message_size; - int totalBytesRead = 0; - - // Read in message contents until completion - while (bytesRemaining > 0) - { - // Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain - int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(readChunkSize, bytesRemaining)); - totalBytesRead += bytesRead; - bytesRemaining -= bytesRead; + ReadToByteArray(networkStream, readBuffer); - if (!networkStream.DataAvailable) - { - await Task.Yield(); - } - if (!networkStream.CanRead) - return null; - } return Tuple.Create(topicName, readBuffer); } From ff02f27d722e4b958279fdc3034ebbf27aea89b4 Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Thu, 22 Apr 2021 17:08:33 -0700 Subject: [PATCH 03/11] Basic connection works, hanndles disconnect gracefully-ish --- .../Runtime/TcpConnector/ROSConnection.cs | 148 ++++++++++++------ 1 file changed, 98 insertions(+), 50 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index f8c64183..f233cad7 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -147,7 +147,7 @@ public async void SendServiceMessage(string rosServiceName, Message se try { - (string topicName, byte[] content) = await ReadMessageContents(networkStream); + (string topicName, byte[] content) = await ReadMessageContents(networkStream, new CancellationToken()); serviceResponse.Deserialize(content, 0); } catch (Exception e) @@ -218,8 +218,6 @@ void OnEnable() _instance = this; } - Thread connectionThread; - Thread readerThread; ConcurrentQueue> outgoingMessages = new ConcurrentQueue>(); ConcurrentQueue> incomingMessages = new ConcurrentQueue>(); @@ -230,8 +228,8 @@ private void Start() InitializeHUD(); Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); - connectionThread = new Thread(ConnectionThread); - connectionThread.Start(); + connectionThreadCancellation = new CancellationTokenSource(); + Task.Run(() => ConnectionThread(rosIPAddress, rosPort, networkTimeout, keepaliveTime, outgoingMessages, incomingMessages, connectionThreadCancellation.Token)); // Phase 2: send handshakes again //SendServiceMessage(HANDSHAKE_TOPIC_NAME, @@ -270,7 +268,7 @@ void RosUnityErrorCallback(MRosUnityError error) private void Update() { - m_RealTimeSinceStartup = Time.realtimeSinceStartup; + s_RealTimeSinceStartup = Time.realtimeSinceStartup; Tuple data; while(incomingMessages.TryDequeue(out data)) @@ -287,14 +285,34 @@ private void Update() } } - float m_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here + CancellationTokenSource connectionThreadCancellation; + static bool m_ShouldConnect; + static float s_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here - void ConnectionThread() + static void SendKeepalive(NetworkStream stream) { - TcpClient client = null; + // 8 zeroes = a ros message with topic "" and no message data. + stream.Write(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }, 0, 8); + } + + static async Task ConnectionThread( + string rosIPAddress, + int rosPort, + int networkTimeout, + float keepaliveTime, + ConcurrentQueue> outgoingQueue, + ConcurrentQueue> incomingQueue, + CancellationToken token) + { + Debug.Log("ConnectionThread begins"); + int nextReaderIdx = 101; + int nextReconnectionDelay = 1000; - while (true) + while (!token.IsCancellationRequested) { + TcpClient client = null; + CancellationTokenSource readerCancellation = null; + try { client = new TcpClient(); @@ -303,98 +321,128 @@ void ConnectionThread() NetworkStream networkStream = client.GetStream(); networkStream.ReadTimeout = networkTimeout; - readerThread = new Thread(ReaderThread); - readerThread.Start(networkStream); + SendKeepalive(networkStream); + + readerCancellation = new CancellationTokenSource(); + _ = Task.Run(() => ReaderThread(nextReaderIdx, networkStream, incomingQueue, readerCancellation.Token)); + nextReaderIdx++; - // connected ok, now just watch our queue for outgoing messages to send (or else send a keepalive message occasionally) - while(networkStream.CanWrite) + // connected, now just watch our queue for outgoing messages to send (or else send a keepalive message occasionally) + while (true) { Tuple data; - float waitingSinceRealTime = m_RealTimeSinceStartup; - while (!outgoingMessages.TryDequeue(out data)) + float waitingSinceRealTime = s_RealTimeSinceStartup; + token.ThrowIfCancellationRequested(); + while (!outgoingQueue.TryDequeue(out data)) { Thread.Yield(); - if (m_RealTimeSinceStartup > waitingSinceRealTime + keepaliveTime) + if (s_RealTimeSinceStartup > waitingSinceRealTime + keepaliveTime) { - // send a keepalive message (8 zeroes = a ros message with topic "" and no message data.) - networkStream.Write(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }, 0, 8); - waitingSinceRealTime = m_RealTimeSinceStartup; + // send a keepalive message + SendKeepalive(networkStream); + waitingSinceRealTime = s_RealTimeSinceStartup; } + token.ThrowIfCancellationRequested(); } + //Debug.Log("Sending " + data.Item1); WriteDataStaggered(networkStream, data.Item1, data.Item2); } } + catch (OperationCanceledException e) + { + } catch (Exception e) { - if (readerThread != null) - readerThread.Abort(); + Debug.Log($"Connection to {rosIPAddress}:{rosPort} failed - " + e); + await Task.Delay(nextReconnectionDelay); + } + finally + { + if (readerCancellation != null) + readerCancellation.Cancel(); if (client != null) client.Close(); + + // clear the message queue + Tuple unused; + while (outgoingQueue.TryDequeue(out unused)) + { + } } + await Task.Yield(); } } - async void ReaderThread(object param) + static async Task ReaderThread(int readerIdx, NetworkStream networkStream, ConcurrentQueue> queue, CancellationToken token) { - NetworkStream networkStream = (NetworkStream)param; - while (networkStream.CanRead) + while (!token.IsCancellationRequested) { try { - Tuple content = await ReadMessageContents(networkStream); - incomingMessages.Enqueue(content); + Tuple content = await ReadMessageContents(networkStream, token); + Debug.Log($"Message {content.Item1} received"); + queue.Enqueue(content); } - catch(Exception e) + catch (OperationCanceledException e) { } + /*catch (System.IO.IOException e) + { + }*/ + catch (Exception e) + { + Debug.Log("Reader "+readerIdx+" exception! " + e); + } } } - void ReadToByteArray(NetworkStream networkStream, byte[] array) + static async Task ReadToByteArray(NetworkStream networkStream, byte[] array, int length, CancellationToken token) { int read = 0; - while (read < array.Length && networkStream.CanRead) + while (read < length && networkStream.CanRead) { - if (!networkStream.DataAvailable) - Thread.Yield(); + while (!token.IsCancellationRequested && !networkStream.DataAvailable) + await Task.Yield(); - read += networkStream.Read(array, 0, array.Length - read); + token.ThrowIfCancellationRequested(); + read += await networkStream.ReadAsync(array, read, length - read, token); } - if (read < array.Length) + if (read < length) throw new SocketException(); // the connection has closed } - async Task> ReadMessageContents(NetworkStream networkStream) + static byte[] fourBytes = new byte[4]; + static byte[] topicScratchSpace = new byte[64]; + + static async Task> ReadMessageContents(NetworkStream networkStream, CancellationToken token) { // Get first bytes to determine length of topic name - byte[] rawTopicBytes = new byte[4]; - ReadToByteArray(networkStream, rawTopicBytes); - int topicLength = BitConverter.ToInt32(rawTopicBytes, 0); + await ReadToByteArray(networkStream, fourBytes, 4, token); + int topicLength = BitConverter.ToInt32(fourBytes, 0); + + // If our topic buffer isn't large enough, make a larger one (and keep it that size; assume that's the new standard) + if (topicLength > topicScratchSpace.Length) + topicScratchSpace = new byte[topicLength]; // Read and convert topic name - byte[] topicNameBytes = new byte[topicLength]; - ReadToByteArray(networkStream, topicNameBytes); - string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); + await ReadToByteArray(networkStream, topicScratchSpace, topicLength, token); + string topicName = Encoding.ASCII.GetString(topicScratchSpace, 0, topicLength); - byte[] full_message_size_bytes = new byte[4]; - ReadToByteArray(networkStream, full_message_size_bytes); - int full_message_size = BitConverter.ToInt32(full_message_size_bytes, 0); + await ReadToByteArray(networkStream, fourBytes, 4, token); + int full_message_size = BitConverter.ToInt32(fourBytes, 0); byte[] readBuffer = new byte[full_message_size]; - ReadToByteArray(networkStream, readBuffer); + await ReadToByteArray(networkStream, readBuffer, full_message_size, token); return Tuple.Create(topicName, readBuffer); } void OnApplicationQuit() { - if (connectionThread != null) - connectionThread.Abort(); - if (readerThread != null) - readerThread.Abort(); + connectionThreadCancellation.Cancel(); } /// @@ -488,7 +536,7 @@ public void Send(string rosTopicName, Message message) /// The network stream that is transmitting the messsage /// The ROS topic or service name that is receiving the messsage /// The ROS message to send to a ROS publisher or service - private void WriteDataStaggered(NetworkStream networkStream, string rosTopicName, Message message) + static void WriteDataStaggered(NetworkStream networkStream, string rosTopicName, Message message) { byte[] topicName = message.SerializeString(rosTopicName); List segments = message.SerializationStatements(); From 98c34a16a12221e238c3d4e433f5702910848d93 Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Mon, 26 Apr 2021 15:09:52 -0700 Subject: [PATCH 04/11] Services! --- .../RosTcpEndpoint/msg/MRosUnitySrvMessage.cs | 79 +++++++ .../MRosUnitySrvMessage.cs.meta} | 2 +- .../srv/MUnityHandshakeRequest.cs | 56 ----- .../srv/MUnityHandshakeResponse.cs | 49 ----- .../Runtime/TcpConnector/ROSConnection.cs | 206 +++++++++--------- .../Runtime/TcpConnector/TaskPauser.cs | 34 +++ .../TaskPauser.cs.meta} | 2 +- 7 files changed, 213 insertions(+), 215 deletions(-) create mode 100644 com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs rename com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/{srv/MUnityHandshakeRequest.cs.meta => msg/MRosUnitySrvMessage.cs.meta} (83%) delete mode 100644 com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs delete mode 100644 com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs create mode 100644 com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs rename com.unity.robotics.ros-tcp-connector/Runtime/{MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta => TcpConnector/TaskPauser.cs.meta} (83%) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs new file mode 100644 index 00000000..163288fc --- /dev/null +++ b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs @@ -0,0 +1,79 @@ +//Do not edit! This file was generated by Unity-ROS MessageGeneration. +using System; +using System.Linq; +using System.Collections.Generic; +using System.Text; +using Unity.Robotics.ROSTCPConnector.MessageGeneration; + +namespace RosMessageTypes.RosTcpEndpoint +{ + public class MRosUnitySrvMessage : Message + { + public const string RosMessageName = "ros_tcp_endpoint/RosUnitySrvMessage"; + + public int srv_id; + public bool is_request; + public string topic; + public byte[] payload; + + public MRosUnitySrvMessage() + { + this.srv_id = 0; + this.is_request = false; + this.topic = ""; + this.payload = new byte[0]; + } + + public MRosUnitySrvMessage(int srv_id, bool is_request, string topic, byte[] payload) + { + this.srv_id = srv_id; + this.is_request = is_request; + this.topic = topic; + this.payload = payload; + } + public override List SerializationStatements() + { + var listOfSerializations = new List(); + listOfSerializations.Add(BitConverter.GetBytes(this.srv_id)); + listOfSerializations.Add(BitConverter.GetBytes(this.is_request)); + listOfSerializations.Add(SerializeString(this.topic)); + + listOfSerializations.Add(BitConverter.GetBytes(payload.Length)); + listOfSerializations.Add(this.payload); + + return listOfSerializations; + } + + public override int Deserialize(byte[] data, int offset) + { + this.srv_id = BitConverter.ToInt32(data, offset); + offset += 4; + this.is_request = BitConverter.ToBoolean(data, offset); + offset += 1; + var topicStringBytesLength = DeserializeLength(data, offset); + offset += 4; + this.topic = DeserializeString(data, offset, topicStringBytesLength); + offset += topicStringBytesLength; + + var payloadArrayLength = DeserializeLength(data, offset); + offset += 4; + this.payload= new byte[payloadArrayLength]; + for(var i = 0; i < payloadArrayLength; i++) + { + this.payload[i] = data[offset]; + offset += 1; + } + + return offset; + } + + public override string ToString() + { + return "MRosUnitySrvMessage: " + + "\nsrv_id: " + srv_id.ToString() + + "\nis_request: " + is_request.ToString() + + "\ntopic: " + topic.ToString() + + "\npayload: " + System.String.Join(", ", payload.ToList()); + } + } +} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs.meta b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs.meta similarity index 83% rename from com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs.meta rename to com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs.meta index 866160a0..f1b1f543 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs.meta +++ b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs.meta @@ -1,5 +1,5 @@ fileFormatVersion: 2 -guid: 5e9ad0d71580f0a439cdd07e18f4fe0e +guid: 948d826c167dec548b1ac5e0c8393d86 MonoImporter: externalObjects: {} serializedVersion: 2 diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs deleted file mode 100644 index 0f8e3c35..00000000 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs +++ /dev/null @@ -1,56 +0,0 @@ -//Do not edit! This file was generated by Unity-ROS MessageGeneration. -using System; -using System.Linq; -using System.Collections.Generic; -using System.Text; -using Unity.Robotics.ROSTCPConnector.MessageGeneration; - -namespace RosMessageTypes.RosTcpEndpoint -{ - public class MUnityHandshakeRequest : Message - { - public const string RosMessageName = "Ros_Tcp_Endpoint/UnityHandshake"; - - public string ip; - public ushort port; - - public MUnityHandshakeRequest() - { - this.ip = ""; - this.port = 0; - } - - public MUnityHandshakeRequest(string ip, ushort port) - { - this.ip = ip; - this.port = port; - } - public override List SerializationStatements() - { - var listOfSerializations = new List(); - listOfSerializations.Add(SerializeString(this.ip)); - listOfSerializations.Add(BitConverter.GetBytes(this.port)); - - return listOfSerializations; - } - - public override int Deserialize(byte[] data, int offset) - { - var ipStringBytesLength = DeserializeLength(data, offset); - offset += 4; - this.ip = DeserializeString(data, offset, ipStringBytesLength); - offset += ipStringBytesLength; - this.port = BitConverter.ToUInt16(data, offset); - offset += 2; - - return offset; - } - - public override string ToString() - { - return "MUnityHandshakeRequest: " + - "\nip: " + ip.ToString() + - "\nport: " + port.ToString(); - } - } -} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs deleted file mode 100644 index e2191fec..00000000 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs +++ /dev/null @@ -1,49 +0,0 @@ -//Do not edit! This file was generated by Unity-ROS MessageGeneration. -using System; -using System.Linq; -using System.Collections.Generic; -using System.Text; -using Unity.Robotics.ROSTCPConnector.MessageGeneration; - -namespace RosMessageTypes.RosTcpEndpoint -{ - public class MUnityHandshakeResponse : Message - { - public const string RosMessageName = "Ros_Tcp_Endpoint/UnityHandshake"; - - public string ip; - - public MUnityHandshakeResponse() - { - this.ip = ""; - } - - public MUnityHandshakeResponse(string ip) - { - this.ip = ip; - } - public override List SerializationStatements() - { - var listOfSerializations = new List(); - listOfSerializations.Add(SerializeString(this.ip)); - - return listOfSerializations; - } - - public override int Deserialize(byte[] data, int offset) - { - var ipStringBytesLength = DeserializeLength(data, offset); - offset += 4; - this.ip = DeserializeString(data, offset, ipStringBytesLength); - offset += ipStringBytesLength; - - return offset; - } - - public override string ToString() - { - return "MUnityHandshakeResponse: " + - "\nip: " + ip.ToString(); - } - } -} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index f233cad7..351a8edb 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -41,9 +41,12 @@ public class ROSConnection : MonoBehaviour const string ERROR_TOPIC_NAME = "__error"; const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; const string HANDSHAKE_TOPIC_NAME = "__handshake"; + const string SERVICE_TOPIC_NAME = "__srv"; const string SYSCOMMAND_SUBSCRIBE = "subscribe"; const string SYSCOMMAND_PUBLISH = "publish"; + const string SYSCOMMAND_ROSSERVICE = "ros_service"; + const string SYSCOMMAND_UNITYSERVICE = "unity_service"; // GUI window variables internal HUDPanel hudPanel = null; @@ -53,122 +56,88 @@ public class ROSConnection : MonoBehaviour struct SubscriberCallback { public ConstructorInfo messageConstructor; - public List> callbacks; + public List> callbacks; } - Dictionary subscribers = new Dictionary(); + struct UnityServiceCallback + { + public ConstructorInfo messageConstructor; + public Func callback; + } + + Dictionary m_Subscribers = new Dictionary(); + Dictionary m_UnityServices = new Dictionary(); public void Subscribe(string topic, Action callback) where T : Message, new() { SubscriberCallback subCallbacks; - if (!subscribers.TryGetValue(topic, out subCallbacks)) + if (!m_Subscribers.TryGetValue(topic, out subCallbacks)) { subCallbacks = new SubscriberCallback { messageConstructor = typeof(T).GetConstructor(new Type[0]), - callbacks = new List> { } + callbacks = new List> { } }; - subscribers.Add(topic, subCallbacks); + m_Subscribers.Add(topic, subCallbacks); } subCallbacks.callbacks.Add((Message msg) => { - callback((T) msg); - return null; + callback((T)msg); }); } public void ImplementService(string topic, Func callback) where T : Message, new() { - SubscriberCallback subCallbacks; - if (!subscribers.TryGetValue(topic, out subCallbacks)) + UnityServiceCallback srvCallbacks; + if (!m_UnityServices.TryGetValue(topic, out srvCallbacks)) { - subCallbacks = new SubscriberCallback + srvCallbacks = new UnityServiceCallback { messageConstructor = typeof(T).GetConstructor(new Type[0]), - callbacks = new List> { } + callback = (Message msg) => callback((T)msg) }; - subscribers.Add(topic, subCallbacks); + m_UnityServices.Add(topic, srvCallbacks); } - - subCallbacks.callbacks.Add((Message msg) => { return callback((T) msg); }); } - public async void SendServiceMessage(string rosServiceName, Message serviceRequest, - Action callback) where RESPONSE : Message, new() - { - // For phase 2, gut this and rewrite - - // Serialize the message in service name, message size, and message bytes format - byte[] messageBytes = GetMessageBytes(rosServiceName, serviceRequest); - - TcpClient client = new TcpClient(); - await client.ConnectAsync(rosIPAddress, rosPort); - - NetworkStream networkStream = client.GetStream(); - networkStream.ReadTimeout = networkTimeout; - - RESPONSE serviceResponse = new RESPONSE(); - - int serviceID = 0; - - // Send the message - try - { - if (hudPanel != null) serviceID = hudPanel.AddServiceRequest(rosServiceName, serviceRequest); - networkStream.Write(messageBytes, 0, messageBytes.Length); - } - catch (Exception e) - { - Debug.LogError("SocketException: " + e); - goto finish; - } - - if (!networkStream.CanRead) - { - Debug.LogError("Sorry, you cannot read from this NetworkStream."); - goto finish; - } - - // Poll every 1 second(s) for available data on the stream - int attempts = 0; - while (!networkStream.DataAvailable && attempts <= this.awaitDataMaxRetries) - { - if (attempts == this.awaitDataMaxRetries) - { - Debug.LogError("No data available on network stream after " + awaitDataMaxRetries + " attempts."); - goto finish; - } - - attempts++; - await Task.Delay((int) (awaitDataSleepSeconds * 1000)); - } + readonly object m_ServiceRequestLock = new object(); + int m_NextSrvID = 101; + Dictionary m_ServiceCallbacks = new Dictionary(); + public async void SendServiceMessage(string rosServiceName, Message serviceRequest, Action callback) where RESPONSE : Message, new() + { + RESPONSE response = await SendServiceMessage(rosServiceName, serviceRequest); try { - (string topicName, byte[] content) = await ReadMessageContents(networkStream, new CancellationToken()); - serviceResponse.Deserialize(content, 0); + callback(response); } catch (Exception e) { - Debug.LogError("Exception raised!! " + e); + Debug.LogError("Exception in service callback: " + e); } - - finish: - callback(serviceResponse); - if (hudPanel != null) hudPanel.AddServiceResponse(serviceID, serviceResponse); - if (client.Connected) - client.Close(); } public async Task SendServiceMessage(string rosServiceName, Message serviceRequest) where RESPONSE : Message, new() { - var t = new TaskCompletionSource(); + byte[] requestBytes = serviceRequest.Serialize(); + TaskPauser pauser = new TaskPauser(); + + int srvID; + lock (m_ServiceRequestLock) + { + srvID = m_NextSrvID++; + m_ServiceCallbacks.Add(srvID, pauser); + } - SendServiceMessage(rosServiceName, serviceRequest, s => t.TrySetResult(s)); + MRosUnitySrvMessage srvMessage = new MRosUnitySrvMessage(srvID, true, rosServiceName, requestBytes); + Send(SERVICE_TOPIC_NAME, srvMessage); - return await t.Task; + byte[] rawResponse = (byte[])await pauser.PauseUntilResumed(); + RESPONSE result = new RESPONSE(); + result.Deserialize(rawResponse, 0); + return result; } public void GetTopicList(Action callback) @@ -178,13 +147,22 @@ public void GetTopicList(Action callback) public void RegisterSubscriber(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_SUBSCRIBE, - new SysCommand_Subscribe {topic = topic, message_name = rosMessageName}); + SendSysCommand(SYSCOMMAND_SUBSCRIBE, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } public void RegisterPublisher(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_PUBLISH, new SysCommand_Publish {topic = topic, message_name = rosMessageName}); + SendSysCommand(SYSCOMMAND_PUBLISH, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + } + + public void RegisterRosService(string topic, string rosMessageName) + { + SendSysCommand(SYSCOMMAND_ROSSERVICE, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + } + + public void RegisterUnityService(string topic, string rosMessageName) + { + SendSysCommand(SYSCOMMAND_UNITYSERVICE, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } private static ROSConnection _instance; @@ -223,17 +201,14 @@ void OnEnable() private void Start() { - if(!IPFormatIsCorrect(rosIPAddress)) + if (!IPFormatIsCorrect(rosIPAddress)) Debug.LogError("ROS IP address is not correct"); InitializeHUD(); Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); + Subscribe(SERVICE_TOPIC_NAME, ProcessServiceMessage); connectionThreadCancellation = new CancellationTokenSource(); Task.Run(() => ConnectionThread(rosIPAddress, rosPort, networkTimeout, keepaliveTime, outgoingMessages, incomingMessages, connectionThreadCancellation.Token)); - - // Phase 2: send handshakes again - //SendServiceMessage(HANDSHAKE_TOPIC_NAME, - // new MUnityHandshakeRequest(overrideUnityIP, (ushort) unityPort), RosUnityHandshakeCallback); } void OnValidate() @@ -255,12 +230,6 @@ private void InitializeHUD() hudPanel.isEnabled = showHUD; } - /* Phase 2 - void RosUnityHandshakeCallback(MUnityHandshakeResponse response) - { - StartMessageServer(response.ip, unityPort); - }*/ - void RosUnityErrorCallback(MRosUnityError error) { Debug.LogError("ROS-Unity error: " + error.message); @@ -271,17 +240,49 @@ private void Update() s_RealTimeSinceStartup = Time.realtimeSinceStartup; Tuple data; - while(incomingMessages.TryDequeue(out data)) + while (incomingMessages.TryDequeue(out data)) { (string topic, byte[] contents) = data; + // notify whatever is interested in this incoming message SubscriberCallback callback; - if(subscribers.TryGetValue(topic, out callback)) + if (m_Subscribers.TryGetValue(topic, out callback)) { - Message message = (Message)callback.messageConstructor.Invoke(new object[]{ }); + Message message = (Message)callback.messageConstructor.Invoke(new object[] { }); message.Deserialize(contents, 0); - callback.callbacks.ForEach(item=>item.Invoke(message)); + callback.callbacks.ForEach(item => item.Invoke(message)); + } + } + } + + void ProcessServiceMessage(MRosUnitySrvMessage message) + { + if (message.is_request) + { + UnityServiceCallback callback; + if(m_UnityServices.TryGetValue(message.topic, out callback)) + { + Message requestMessage = (Message)callback.messageConstructor.Invoke(new object[] { }); + requestMessage.Deserialize(message.payload, 0); + Message responseMessage = callback.callback(requestMessage); + byte[] responseBytes = responseMessage.Serialize(); + Send(SERVICE_TOPIC_NAME, new MRosUnitySrvMessage(message.srv_id, false, message.topic, responseBytes)); + } + } + else + { + TaskPauser resumer; + lock (m_ServiceRequestLock) + { + if (!m_ServiceCallbacks.TryGetValue(message.srv_id, out resumer)) + { + Debug.LogError($"Unable to route service response on \"{message.topic}\"! SrvID {message.srv_id} does not exist."); + return; + } + + m_ServiceCallbacks.Remove(message.srv_id); } + resumer.Resume(message.payload); } } @@ -338,18 +339,16 @@ static async Task ConnectionThread( Thread.Yield(); if (s_RealTimeSinceStartup > waitingSinceRealTime + keepaliveTime) { - // send a keepalive message SendKeepalive(networkStream); waitingSinceRealTime = s_RealTimeSinceStartup; } token.ThrowIfCancellationRequested(); } - //Debug.Log("Sending " + data.Item1); WriteDataStaggered(networkStream, data.Item1, data.Item2); } } - catch (OperationCanceledException e) + catch (OperationCanceledException) { } catch (Exception e) @@ -382,15 +381,12 @@ static async Task ReaderThread(int readerIdx, NetworkStream networkStream, Concu try { Tuple content = await ReadMessageContents(networkStream, token); - Debug.Log($"Message {content.Item1} received"); + //Debug.Log($"Message {content.Item1} received"); queue.Enqueue(content); } - catch (OperationCanceledException e) + catch (OperationCanceledException) { } - /*catch (System.IO.IOException e) - { - }*/ catch (Exception e) { Debug.Log("Reader "+readerIdx+" exception! " + e); @@ -501,13 +497,7 @@ public byte[] GetMessageBytes(string topicServiceName, Message message) return messageBuffer; } - struct SysCommand_Subscribe - { - public string topic; - public string message_name; - } - - struct SysCommand_Publish + struct SysCommand_TopicAndType { public string topic; public string message_name; diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs new file mode 100644 index 00000000..07b71eee --- /dev/null +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs @@ -0,0 +1,34 @@ +using System.Collections; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using UnityEngine; + +public class TaskPauser +{ + CancellationTokenSource m_Source = new CancellationTokenSource(); + object m_Result; + public object Result => m_Result; + + public async Task PauseUntilResumed() + { + try + { + while (!m_Source.Token.IsCancellationRequested) + { + await Task.Delay(10000, m_Source.Token); + } + } + catch(TaskCanceledException) + { + + } + return m_Result; + } + + public void Resume(object result) + { + m_Result = result; + m_Source.Cancel(); + } +} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs.meta similarity index 83% rename from com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta rename to com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs.meta index 71d81647..f1b0404a 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs.meta @@ -1,5 +1,5 @@ fileFormatVersion: 2 -guid: 800428bac01226e4185243abce6a9e34 +guid: d0e0d908abf7adb4c8100dfeebafd18c MonoImporter: externalObjects: {} serializedVersion: 2 From f06ca9114bbe6d919bb9077d87acbbc1d24771b0 Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Mon, 26 Apr 2021 17:07:30 -0700 Subject: [PATCH 05/11] Cleanup unnecessary fields --- .../Editor/ROSSettingsEditor.cs | 18 ++----- .../Runtime/TcpConnector/ROSConnection.cs | 49 +++++++------------ 2 files changed, 22 insertions(+), 45 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 1ca0443a..56f33540 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -49,19 +49,11 @@ protected virtual void OnGUI() EditorGUILayout.HelpBox("ROS IP is invalid", MessageType.Warning); } EditorGUILayout.Space(); - EditorGUILayout.LabelField("If awaiting a service response:", EditorStyles.boldLabel); - prefab.awaitDataMaxRetries = EditorGUILayout.IntField( - new GUIContent("Max Service Retries", - "While waiting for a service to respond, check this many times before giving up."), - prefab.awaitDataMaxRetries); - prefab.awaitDataSleepSeconds = EditorGUILayout.FloatField( - new GUIContent("Sleep (seconds)", - "While waiting for a service to respond, wait this many seconds between checks."), - prefab.awaitDataSleepSeconds); - prefab.readChunkSize = EditorGUILayout.IntField( - new GUIContent("Read chunk size", - "While reading received messages, read this many bytes at a time."), - prefab.readChunkSize); + + prefab.keepaliveTime = EditorGUILayout.FloatField( + new GUIContent("KeepAlive time (seconds)", + "Send a keepalive message this often. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), + prefab.keepaliveTime); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 351a8edb..2666c6ac 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -24,20 +24,9 @@ public class ROSConnection : MonoBehaviour private int networkTimeout = 2000; - [Tooltip("While reading received messages, read this many bytes at a time.")] - public int readChunkSize = 2048; - [Tooltip("Send keepalive message if nothing has been sent for this long (seconds).")] public float keepaliveTime = 10; - // Remove? - [Tooltip("While waiting for a service to respond, check this many times before giving up.")] - public int awaitDataMaxRetries = 10; - - // Remove? - [Tooltip("While waiting for a service to respond, wait this many seconds between checks.")] - public float awaitDataSleepSeconds = 1.0f; - const string ERROR_TOPIC_NAME = "__error"; const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; const string HANDSHAKE_TOPIC_NAME = "__handshake"; @@ -55,18 +44,19 @@ public class ROSConnection : MonoBehaviour struct SubscriberCallback { - public ConstructorInfo messageConstructor; + public Func messageConstructor; public List> callbacks; } - struct UnityServiceCallback + Dictionary m_Subscribers = new Dictionary(); + + struct UnityServiceImplementation { - public ConstructorInfo messageConstructor; + public Func messageConstructor; public Func callback; } - Dictionary m_Subscribers = new Dictionary(); - Dictionary m_UnityServices = new Dictionary(); + Dictionary m_UnityServices = new Dictionary(); public void Subscribe(string topic, Action callback) where T : Message, new() { @@ -75,7 +65,7 @@ struct UnityServiceCallback { subCallbacks = new SubscriberCallback { - messageConstructor = typeof(T).GetConstructor(new Type[0]), + messageConstructor = ()=> new T(), callbacks = new List> { } }; m_Subscribers.Add(topic, subCallbacks); @@ -90,16 +80,11 @@ struct UnityServiceCallback public void ImplementService(string topic, Func callback) where T : Message, new() { - UnityServiceCallback srvCallbacks; - if (!m_UnityServices.TryGetValue(topic, out srvCallbacks)) + m_UnityServices[topic] = new UnityServiceImplementation { - srvCallbacks = new UnityServiceCallback - { - messageConstructor = typeof(T).GetConstructor(new Type[0]), - callback = (Message msg) => callback((T)msg) - }; - m_UnityServices.Add(topic, srvCallbacks); - } + messageConstructor = ()=> new T(), + callback = (Message msg) => callback((T)msg) + }; } readonly object m_ServiceRequestLock = new object(); @@ -248,9 +233,9 @@ private void Update() SubscriberCallback callback; if (m_Subscribers.TryGetValue(topic, out callback)) { - Message message = (Message)callback.messageConstructor.Invoke(new object[] { }); + Message message = callback.messageConstructor(); message.Deserialize(contents, 0); - callback.callbacks.ForEach(item => item.Invoke(message)); + callback.callbacks.ForEach(item => item(message)); } } } @@ -259,12 +244,12 @@ void ProcessServiceMessage(MRosUnitySrvMessage message) { if (message.is_request) { - UnityServiceCallback callback; - if(m_UnityServices.TryGetValue(message.topic, out callback)) + UnityServiceImplementation service; + if(m_UnityServices.TryGetValue(message.topic, out service)) { - Message requestMessage = (Message)callback.messageConstructor.Invoke(new object[] { }); + Message requestMessage = service.messageConstructor(); requestMessage.Deserialize(message.payload, 0); - Message responseMessage = callback.callback(requestMessage); + Message responseMessage = service.callback(requestMessage); byte[] responseBytes = responseMessage.Serialize(); Send(SERVICE_TOPIC_NAME, new MRosUnitySrvMessage(message.srv_id, false, message.topic, responseBytes)); } From c47404ac79433a483e7d9b87aa0492e7c97b79ef Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Mon, 26 Apr 2021 17:15:08 -0700 Subject: [PATCH 06/11] Coding standards --- .../Runtime/TcpConnector/ROSConnection.cs | 59 +++++++++---------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 2666c6ac..989f39bb 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -29,7 +29,6 @@ public class ROSConnection : MonoBehaviour const string ERROR_TOPIC_NAME = "__error"; const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; - const string HANDSHAKE_TOPIC_NAME = "__handshake"; const string SERVICE_TOPIC_NAME = "__srv"; const string SYSCOMMAND_SUBSCRIBE = "subscribe"; @@ -42,6 +41,15 @@ public class ROSConnection : MonoBehaviour public bool showHUD = true; + ConcurrentQueue> m_OutgoingMessages = new ConcurrentQueue>(); + ConcurrentQueue> m_IncomingMessages = new ConcurrentQueue>(); + CancellationTokenSource m_ConnectionThreadCancellation; + static float s_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here + + readonly object m_ServiceRequestLock = new object(); + int m_NextSrvID = 101; + Dictionary m_ServicesWaiting = new Dictionary(); + struct SubscriberCallback { public Func messageConstructor; @@ -87,10 +95,6 @@ public void ImplementService(string topic, Func callback) }; } - readonly object m_ServiceRequestLock = new object(); - int m_NextSrvID = 101; - Dictionary m_ServiceCallbacks = new Dictionary(); - public async void SendServiceMessage(string rosServiceName, Message serviceRequest, Action callback) where RESPONSE : Message, new() { RESPONSE response = await SendServiceMessage(rosServiceName, serviceRequest); @@ -113,7 +117,7 @@ public void ImplementService(string topic, Func callback) lock (m_ServiceRequestLock) { srvID = m_NextSrvID++; - m_ServiceCallbacks.Add(srvID, pauser); + m_ServicesWaiting.Add(srvID, pauser); } MRosUnitySrvMessage srvMessage = new MRosUnitySrvMessage(srvID, true, rosServiceName, requestBytes); @@ -181,9 +185,6 @@ void OnEnable() _instance = this; } - ConcurrentQueue> outgoingMessages = new ConcurrentQueue>(); - ConcurrentQueue> incomingMessages = new ConcurrentQueue>(); - private void Start() { if (!IPFormatIsCorrect(rosIPAddress)) @@ -192,8 +193,8 @@ private void Start() Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); Subscribe(SERVICE_TOPIC_NAME, ProcessServiceMessage); - connectionThreadCancellation = new CancellationTokenSource(); - Task.Run(() => ConnectionThread(rosIPAddress, rosPort, networkTimeout, keepaliveTime, outgoingMessages, incomingMessages, connectionThreadCancellation.Token)); + m_ConnectionThreadCancellation = new CancellationTokenSource(); + Task.Run(() => ConnectionThread(rosIPAddress, rosPort, networkTimeout, keepaliveTime, m_OutgoingMessages, m_IncomingMessages, m_ConnectionThreadCancellation.Token)); } void OnValidate() @@ -225,7 +226,7 @@ private void Update() s_RealTimeSinceStartup = Time.realtimeSinceStartup; Tuple data; - while (incomingMessages.TryDequeue(out data)) + while (m_IncomingMessages.TryDequeue(out data)) { (string topic, byte[] contents) = data; @@ -259,22 +260,18 @@ void ProcessServiceMessage(MRosUnitySrvMessage message) TaskPauser resumer; lock (m_ServiceRequestLock) { - if (!m_ServiceCallbacks.TryGetValue(message.srv_id, out resumer)) + if (!m_ServicesWaiting.TryGetValue(message.srv_id, out resumer)) { Debug.LogError($"Unable to route service response on \"{message.topic}\"! SrvID {message.srv_id} does not exist."); return; } - m_ServiceCallbacks.Remove(message.srv_id); + m_ServicesWaiting.Remove(message.srv_id); } resumer.Resume(message.payload); } } - CancellationTokenSource connectionThreadCancellation; - static bool m_ShouldConnect; - static float s_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here - static void SendKeepalive(NetworkStream stream) { // 8 zeroes = a ros message with topic "" and no message data. @@ -290,7 +287,7 @@ static async Task ConnectionThread( ConcurrentQueue> incomingQueue, CancellationToken token) { - Debug.Log("ConnectionThread begins"); + //Debug.Log("ConnectionThread begins"); int nextReaderIdx = 101; int nextReconnectionDelay = 1000; @@ -395,25 +392,25 @@ static async Task ReadToByteArray(NetworkStream networkStream, byte[] array, int throw new SocketException(); // the connection has closed } - static byte[] fourBytes = new byte[4]; - static byte[] topicScratchSpace = new byte[64]; + static byte[] s_FourBytes = new byte[4]; + static byte[] s_TopicScratchSpace = new byte[64]; static async Task> ReadMessageContents(NetworkStream networkStream, CancellationToken token) { // Get first bytes to determine length of topic name - await ReadToByteArray(networkStream, fourBytes, 4, token); - int topicLength = BitConverter.ToInt32(fourBytes, 0); + await ReadToByteArray(networkStream, s_FourBytes, 4, token); + int topicLength = BitConverter.ToInt32(s_FourBytes, 0); // If our topic buffer isn't large enough, make a larger one (and keep it that size; assume that's the new standard) - if (topicLength > topicScratchSpace.Length) - topicScratchSpace = new byte[topicLength]; + if (topicLength > s_TopicScratchSpace.Length) + s_TopicScratchSpace = new byte[topicLength]; // Read and convert topic name - await ReadToByteArray(networkStream, topicScratchSpace, topicLength, token); - string topicName = Encoding.ASCII.GetString(topicScratchSpace, 0, topicLength); + await ReadToByteArray(networkStream, s_TopicScratchSpace, topicLength, token); + string topicName = Encoding.ASCII.GetString(s_TopicScratchSpace, 0, topicLength); - await ReadToByteArray(networkStream, fourBytes, 4, token); - int full_message_size = BitConverter.ToInt32(fourBytes, 0); + await ReadToByteArray(networkStream, s_FourBytes, 4, token); + int full_message_size = BitConverter.ToInt32(s_FourBytes, 0); byte[] readBuffer = new byte[full_message_size]; await ReadToByteArray(networkStream, readBuffer, full_message_size, token); @@ -423,7 +420,7 @@ static async Task> ReadMessageContents(NetworkStream netwo void OnApplicationQuit() { - connectionThreadCancellation.Cancel(); + m_ConnectionThreadCancellation.Cancel(); } /// @@ -495,7 +492,7 @@ void SendSysCommand(string command, object param) public void Send(string rosTopicName, Message message) { - outgoingMessages.Enqueue(new Tuple(rosTopicName, message)); + m_OutgoingMessages.Enqueue(new Tuple(rosTopicName, message)); } /// From 2c3bc9d6d50a588c51ab1cc914e4efb564bcea82 Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Tue, 27 Apr 2021 11:21:01 -0700 Subject: [PATCH 07/11] Updates for coding standards --- .../Editor/ROSSettingsEditor.cs | 26 +++-- .../Runtime/TcpConnector/HUDPanel.cs | 12 +-- .../Runtime/TcpConnector/ROSConnection.cs | 99 ++++++++++++++----- .../Runtime/TcpConnector/TaskPauser.cs | 40 ++++---- 4 files changed, 118 insertions(+), 59 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 56f33540..a7943bc9 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -39,21 +39,33 @@ protected virtual void OnGUI() } } + prefab.ConnectOnStart = EditorGUILayout.Toggle("Connect on Startup", prefab.ConnectOnStart); + EditorGUILayout.LabelField("Settings for a new ROSConnection.instance", EditorStyles.boldLabel); - prefab.rosIPAddress = EditorGUILayout.TextField("ROS IP Address", prefab.rosIPAddress); - prefab.rosPort = EditorGUILayout.IntField("ROS Port", prefab.rosPort); + prefab.RosIPAddress = EditorGUILayout.TextField("ROS IP Address", prefab.RosIPAddress); + prefab.RosPort = EditorGUILayout.IntField("ROS Port", prefab.RosPort); EditorGUILayout.Space(); - if(!ROSConnection.IPFormatIsCorrect(prefab.rosIPAddress)) + if (!ROSConnection.IPFormatIsCorrect(prefab.RosIPAddress)) { EditorGUILayout.HelpBox("ROS IP is invalid", MessageType.Warning); } + + EditorGUILayout.Space(); + + prefab.ShowHud = EditorGUILayout.Toggle("Show HUD", prefab.ShowHud); + EditorGUILayout.Space(); - prefab.keepaliveTime = EditorGUILayout.FloatField( - new GUIContent("KeepAlive time (seconds)", - "Send a keepalive message this often. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), - prefab.keepaliveTime); + prefab.KeepaliveTime = EditorGUILayout.FloatField( + new GUIContent("KeepAlive time (secs)", + "If no other messages are being sent, test the connection this often. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), + prefab.KeepaliveTime); + + prefab.NetworkTimeoutSeconds = EditorGUILayout.FloatField( + new GUIContent("Network timeout (secs)", + "If a network message takes this long to send, assume the connection has failed. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), + prefab.NetworkTimeoutSeconds); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs index 0da97379..6c7df925 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs @@ -26,7 +26,7 @@ public class HUDPanel : MonoBehaviour public void SetLastMessageSent(string topic, Message message) { - lastMessageSent = new MessageViewState() {label = "Last Message Sent:", message = message}; + lastMessageSent = new MessageViewState() { label = "Last Message Sent:", message = message }; lastMessageSentMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; redrawGUI = true; } @@ -36,7 +36,7 @@ public void SetLastMessageSent(string topic, Message message) public void SetLastMessageReceived(string topic, Message message) { - lastMessageReceived = new MessageViewState() {label = "Last Message Received:", message = message}; + lastMessageReceived = new MessageViewState() { label = "Last Message Received:", message = message }; lastMessageReceivedMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; redrawGUI = true; } @@ -84,7 +84,7 @@ void Awake() labelStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, - normal = {textColor = Color.white}, + normal = { textColor = Color.white }, fontStyle = FontStyle.Bold, fixedWidth = 250 }; @@ -93,7 +93,7 @@ void Awake() { alignment = TextAnchor.MiddleLeft, padding = new RectOffset(10, 0, 0, 5), - normal = {textColor = Color.white}, + normal = { textColor = Color.white }, fixedWidth = 300 }; @@ -101,7 +101,7 @@ void Awake() { alignment = TextAnchor.MiddleLeft, padding = new RectOffset(10, 0, 5, 5), - normal = {textColor = Color.white}, + normal = { textColor = Color.white }, fixedWidth = 300, wordWrap = true }; @@ -136,7 +136,7 @@ void OnGUI() GUILayout.Label($"{activeServices.Count} Active Service Requests:", labelStyle); if (activeServices.Count > 0) { - var dots = new String('.', (int) Time.time % 4); + var dots = new String('.', (int)Time.time % 4); GUILayout.Label($"Waiting for service response{dots}", contentStyle); } diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 989f39bb..a7e55563 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -19,13 +19,35 @@ namespace Unity.Robotics.ROSTCPConnector public class ROSConnection : MonoBehaviour { // Variables required for ROS communication - [FormerlySerializedAs("hostName")] public string rosIPAddress = "127.0.0.1"; - [FormerlySerializedAs("hostPort")] public int rosPort = 10000; + [SerializeField] + [FormerlySerializedAs("hostName")] + [FormerlySerializedAs("rosIPAddress")] + string m_RosIPAddress = "127.0.0.1"; + public string RosIPAddress { get => m_RosIPAddress; set => m_RosIPAddress = value; } + + [SerializeField] + [FormerlySerializedAs("hostPort")] + [FormerlySerializedAs("rosPort")] + int m_RosPort = 10000; + public int RosPort { get => m_RosPort; set => m_RosPort = value; } + + [SerializeField] + bool m_ConnectOnStart = true; + public bool ConnectOnStart { get => m_ConnectOnStart; set => m_ConnectOnStart = value; } + + [SerializeField] + [Tooltip("Send keepalive message if nothing has been sent for this long (seconds).")] + float m_KeepaliveTime = 10; + public float KeepaliveTime { get => m_KeepaliveTime; set => m_KeepaliveTime = value; } - private int networkTimeout = 2000; + [SerializeField] + float m_NetworkTimeoutSeconds = 2; + public float NetworkTimeoutSeconds { get => m_NetworkTimeoutSeconds; set => m_NetworkTimeoutSeconds = value; } - [Tooltip("Send keepalive message if nothing has been sent for this long (seconds).")] - public float keepaliveTime = 10; + [SerializeField] + [FormerlySerializedAs("showHUD")] + bool m_ShowHUD = true; + public bool ShowHud { get => m_ShowHUD; set => m_ShowHUD = value; } const string ERROR_TOPIC_NAME = "__error"; const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; @@ -39,11 +61,10 @@ public class ROSConnection : MonoBehaviour // GUI window variables internal HUDPanel hudPanel = null; - public bool showHUD = true; - ConcurrentQueue> m_OutgoingMessages = new ConcurrentQueue>(); ConcurrentQueue> m_IncomingMessages = new ConcurrentQueue>(); CancellationTokenSource m_ConnectionThreadCancellation; + static float s_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here readonly object m_ServiceRequestLock = new object(); @@ -73,7 +94,7 @@ struct UnityServiceImplementation { subCallbacks = new SubscriberCallback { - messageConstructor = ()=> new T(), + messageConstructor = () => new T(), callbacks = new List> { } }; m_Subscribers.Add(topic, subCallbacks); @@ -90,7 +111,7 @@ public void ImplementService(string topic, Func callback) { m_UnityServices[topic] = new UnityServiceImplementation { - messageConstructor = ()=> new T(), + messageConstructor = () => new T(), callback = (Message msg) => callback((T)msg) }; } @@ -187,14 +208,38 @@ void OnEnable() private void Start() { - if (!IPFormatIsCorrect(rosIPAddress)) - Debug.LogError("ROS IP address is not correct"); InitializeHUD(); Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); Subscribe(SERVICE_TOPIC_NAME, ProcessServiceMessage); + if (ConnectOnStart) + { + Connect(); + } + } + + public void Connect(string ipAddress, int port) + { + m_RosIPAddress = ipAddress; + m_RosPort = port; + if (hudPanel != null) + hudPanel.host = $"{ipAddress}:{port}"; + Connect(); + } + + public void Connect() + { + if (!IPFormatIsCorrect(m_RosIPAddress)) + Debug.LogError("ROS IP address is not correct"); + m_ConnectionThreadCancellation = new CancellationTokenSource(); - Task.Run(() => ConnectionThread(rosIPAddress, rosPort, networkTimeout, keepaliveTime, m_OutgoingMessages, m_IncomingMessages, m_ConnectionThreadCancellation.Token)); + Task.Run(() => ConnectionThread(m_RosIPAddress, m_RosPort, m_NetworkTimeoutSeconds, m_KeepaliveTime, m_OutgoingMessages, m_IncomingMessages, m_ConnectionThreadCancellation.Token)); + } + + public void Disconnect() + { + m_ConnectionThreadCancellation.Cancel(); + m_ConnectionThreadCancellation = null; } void OnValidate() @@ -204,16 +249,16 @@ void OnValidate() private void InitializeHUD() { - if (!Application.isPlaying || (!showHUD && hudPanel == null)) + if (!Application.isPlaying || (!m_ShowHUD && hudPanel == null)) return; if (hudPanel == null) { hudPanel = gameObject.AddComponent(); - hudPanel.host = $"{rosIPAddress}:{rosPort}"; + hudPanel.host = $"{RosIPAddress}:{RosPort}"; } - hudPanel.isEnabled = showHUD; + hudPanel.isEnabled = m_ShowHUD; } void RosUnityErrorCallback(MRosUnityError error) @@ -246,7 +291,7 @@ void ProcessServiceMessage(MRosUnitySrvMessage message) if (message.is_request) { UnityServiceImplementation service; - if(m_UnityServices.TryGetValue(message.topic, out service)) + if (m_UnityServices.TryGetValue(message.topic, out service)) { Message requestMessage = service.messageConstructor(); requestMessage.Deserialize(message.payload, 0); @@ -281,7 +326,7 @@ static void SendKeepalive(NetworkStream stream) static async Task ConnectionThread( string rosIPAddress, int rosPort, - int networkTimeout, + float networkTimeoutSeconds, float keepaliveTime, ConcurrentQueue> outgoingQueue, ConcurrentQueue> incomingQueue, @@ -302,7 +347,7 @@ static async Task ConnectionThread( client.Connect(rosIPAddress, rosPort); NetworkStream networkStream = client.GetStream(); - networkStream.ReadTimeout = networkTimeout; + networkStream.ReadTimeout = (int)(networkTimeoutSeconds * 1000); SendKeepalive(networkStream); @@ -371,7 +416,7 @@ static async Task ReaderThread(int readerIdx, NetworkStream networkStream, Concu } catch (Exception e) { - Debug.Log("Reader "+readerIdx+" exception! " + e); + Debug.Log("Reader " + readerIdx + " exception! " + e); } } } @@ -420,7 +465,7 @@ static async Task> ReadMessageContents(NetworkStream netwo void OnApplicationQuit() { - m_ConnectionThreadCancellation.Cancel(); + Disconnect(); } /// @@ -525,25 +570,25 @@ static void WriteDataStaggered(NetworkStream networkStream, string rosTopicName, public static bool IPFormatIsCorrect(string ipAddress) { - if(ipAddress == null || ipAddress == "") + if (ipAddress == null || ipAddress == "") return false; - + // If IP address is set using static lookup tables https://man7.org/linux/man-pages/man5/hosts.5.html - if(Char.IsLetter(ipAddress[0])) + if (Char.IsLetter(ipAddress[0])) { - foreach(Char subChar in ipAddress) + foreach (Char subChar in ipAddress) { - if(!(Char.IsLetterOrDigit(subChar) || subChar == '-'|| subChar == '.')) + if (!(Char.IsLetterOrDigit(subChar) || subChar == '-' || subChar == '.')) return false; } - if(!Char.IsLetterOrDigit(ipAddress[ipAddress.Length - 1])) + if (!Char.IsLetterOrDigit(ipAddress[ipAddress.Length - 1])) return false; return true; } string[] subAdds = ipAddress.Split('.'); - if(subAdds.Length != 4) + if (subAdds.Length != 4) { return false; } diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs index 07b71eee..adff2d09 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs @@ -4,31 +4,33 @@ using System.Threading.Tasks; using UnityEngine; -public class TaskPauser +namespace Unity.Robotics.ROSTCPConnector { - CancellationTokenSource m_Source = new CancellationTokenSource(); - object m_Result; - public object Result => m_Result; - - public async Task PauseUntilResumed() + public class TaskPauser { - try + CancellationTokenSource m_Source = new CancellationTokenSource(); + public object Result { get; private set; } + + public async Task PauseUntilResumed() { - while (!m_Source.Token.IsCancellationRequested) + try { - await Task.Delay(10000, m_Source.Token); + while (!m_Source.Token.IsCancellationRequested) + { + await Task.Delay(10000, m_Source.Token); + } } - } - catch(TaskCanceledException) - { + catch (TaskCanceledException) + { + } + return Result; } - return m_Result; - } - public void Resume(object result) - { - m_Result = result; - m_Source.Cancel(); + public void Resume(object result) + { + Result = result; + m_Source.Cancel(); + } } -} +} \ No newline at end of file From 58ac51471326cc448247a2b68dee0d84eef215ac Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Tue, 27 Apr 2021 12:57:58 -0700 Subject: [PATCH 08/11] Fix hudpanel --- .../Runtime/TcpConnector/ROSConnection.cs | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index a7e55563..7cb52895 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -59,7 +59,7 @@ public class ROSConnection : MonoBehaviour const string SYSCOMMAND_UNITYSERVICE = "unity_service"; // GUI window variables - internal HUDPanel hudPanel = null; + internal HUDPanel m_HudPanel = null; ConcurrentQueue> m_OutgoingMessages = new ConcurrentQueue>(); ConcurrentQueue> m_IncomingMessages = new ConcurrentQueue>(); @@ -222,8 +222,8 @@ public void Connect(string ipAddress, int port) { m_RosIPAddress = ipAddress; m_RosPort = port; - if (hudPanel != null) - hudPanel.host = $"{ipAddress}:{port}"; + if (m_HudPanel != null) + m_HudPanel.host = $"{ipAddress}:{port}"; Connect(); } @@ -249,16 +249,16 @@ void OnValidate() private void InitializeHUD() { - if (!Application.isPlaying || (!m_ShowHUD && hudPanel == null)) + if (!Application.isPlaying || (!m_ShowHUD && m_HudPanel == null)) return; - if (hudPanel == null) + if (m_HudPanel == null) { - hudPanel = gameObject.AddComponent(); - hudPanel.host = $"{RosIPAddress}:{RosPort}"; + m_HudPanel = gameObject.AddComponent(); + m_HudPanel.host = $"{RosIPAddress}:{RosPort}"; } - hudPanel.isEnabled = m_ShowHUD; + m_HudPanel.isEnabled = m_ShowHUD; } void RosUnityErrorCallback(MRosUnityError error) @@ -281,6 +281,10 @@ private void Update() { Message message = callback.messageConstructor(); message.Deserialize(contents, 0); + + if (m_HudPanel != null) + m_HudPanel.SetLastMessageReceived(topic, message); + callback.callbacks.ForEach(item => item(message)); } } @@ -538,6 +542,9 @@ void SendSysCommand(string command, object param) public void Send(string rosTopicName, Message message) { m_OutgoingMessages.Enqueue(new Tuple(rosTopicName, message)); + + if (m_HudPanel != null) + m_HudPanel.SetLastMessageSent(rosTopicName, message); } /// From 6d7da705481e70a0e245e05243a99eaf86f1668c Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Tue, 27 Apr 2021 14:32:49 -0700 Subject: [PATCH 09/11] Coding standards --- .../Runtime/TcpConnector/ROSConnection.cs | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 7cb52895..0e19cbb4 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -49,14 +49,14 @@ public class ROSConnection : MonoBehaviour bool m_ShowHUD = true; public bool ShowHud { get => m_ShowHUD; set => m_ShowHUD = value; } - const string ERROR_TOPIC_NAME = "__error"; - const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; - const string SERVICE_TOPIC_NAME = "__srv"; + const string k_Topic_Error = "__error"; + const string k_Topic_SysCommand = "__syscommand"; + const string k_Topic_Services = "__srv"; - const string SYSCOMMAND_SUBSCRIBE = "subscribe"; - const string SYSCOMMAND_PUBLISH = "publish"; - const string SYSCOMMAND_ROSSERVICE = "ros_service"; - const string SYSCOMMAND_UNITYSERVICE = "unity_service"; + const string k_SysCommand_Subscribe = "subscribe"; + const string k_SysCommand_Publish = "publish"; + const string k_SysCommand_RosService = "ros_service"; + const string k_SysCommand_UnityService = "unity_service"; // GUI window variables internal HUDPanel m_HudPanel = null; @@ -142,7 +142,7 @@ public void ImplementService(string topic, Func callback) } MRosUnitySrvMessage srvMessage = new MRosUnitySrvMessage(srvID, true, rosServiceName, requestBytes); - Send(SERVICE_TOPIC_NAME, srvMessage); + Send(k_Topic_Services, srvMessage); byte[] rawResponse = (byte[])await pauser.PauseUntilResumed(); RESPONSE result = new RESPONSE(); @@ -157,22 +157,22 @@ public void GetTopicList(Action callback) public void RegisterSubscriber(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_SUBSCRIBE, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + SendSysCommand(k_SysCommand_Subscribe, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } public void RegisterPublisher(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_PUBLISH, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + SendSysCommand(k_SysCommand_Publish, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } public void RegisterRosService(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_ROSSERVICE, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + SendSysCommand(k_SysCommand_RosService, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } public void RegisterUnityService(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_UNITYSERVICE, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + SendSysCommand(k_SysCommand_UnityService, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } private static ROSConnection _instance; @@ -209,8 +209,8 @@ void OnEnable() private void Start() { InitializeHUD(); - Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); - Subscribe(SERVICE_TOPIC_NAME, ProcessServiceMessage); + Subscribe(k_Topic_Error, RosUnityErrorCallback); + Subscribe(k_Topic_Services, ProcessServiceMessage); if (ConnectOnStart) { @@ -301,7 +301,7 @@ void ProcessServiceMessage(MRosUnitySrvMessage message) requestMessage.Deserialize(message.payload, 0); Message responseMessage = service.callback(requestMessage); byte[] responseBytes = responseMessage.Serialize(); - Send(SERVICE_TOPIC_NAME, new MRosUnitySrvMessage(message.srv_id, false, message.topic, responseBytes)); + Send(k_Topic_Services, new MRosUnitySrvMessage(message.srv_id, false, message.topic, responseBytes)); } } else @@ -536,7 +536,7 @@ struct SysCommand_TopicAndType void SendSysCommand(string command, object param) { - Send(SYSCOMMAND_TOPIC_NAME, new MRosUnitySysCommand(command, JsonUtility.ToJson(param))); + Send(k_Topic_SysCommand, new MRosUnitySysCommand(command, JsonUtility.ToJson(param))); } public void Send(string rosTopicName, Message message) From 318d174da8f87cc7f3f87fbe46b3ce4033f572ed Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Wed, 28 Apr 2021 11:44:31 -0700 Subject: [PATCH 10/11] Sleep instead of yield --- .../Editor/ROSSettingsEditor.cs | 5 ++++ .../Runtime/TcpConnector/ROSConnection.cs | 29 +++++++++++-------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index a7943bc9..e63b81f1 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -67,6 +67,11 @@ protected virtual void OnGUI() "If a network message takes this long to send, assume the connection has failed. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), prefab.NetworkTimeoutSeconds); + prefab.SleepTimeSeconds = EditorGUILayout.FloatField( + new GUIContent("Sleep time (secs)", + "Sleep this long before checking for new network messages. (Decreasing this time will make it respond faster, but consume more CPU)."), + prefab.SleepTimeSeconds); + if (GUI.changed) { EditorUtility.SetDirty(prefabObj); diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 0e19cbb4..40a7e89f 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -44,6 +44,10 @@ public class ROSConnection : MonoBehaviour float m_NetworkTimeoutSeconds = 2; public float NetworkTimeoutSeconds { get => m_NetworkTimeoutSeconds; set => m_NetworkTimeoutSeconds = value; } + [SerializeField] + float m_SleepTimeSeconds = 0.01f; + public float SleepTimeSeconds { get => m_SleepTimeSeconds; set => m_SleepTimeSeconds = value; } + [SerializeField] [FormerlySerializedAs("showHUD")] bool m_ShowHUD = true; @@ -233,7 +237,7 @@ public void Connect() Debug.LogError("ROS IP address is not correct"); m_ConnectionThreadCancellation = new CancellationTokenSource(); - Task.Run(() => ConnectionThread(m_RosIPAddress, m_RosPort, m_NetworkTimeoutSeconds, m_KeepaliveTime, m_OutgoingMessages, m_IncomingMessages, m_ConnectionThreadCancellation.Token)); + Task.Run(() => ConnectionThread(m_RosIPAddress, m_RosPort, m_NetworkTimeoutSeconds, m_KeepaliveTime, (int)(m_SleepTimeSeconds*1000.0f), m_OutgoingMessages, m_IncomingMessages, m_ConnectionThreadCancellation.Token)); } public void Disconnect() @@ -332,6 +336,7 @@ static async Task ConnectionThread( int rosPort, float networkTimeoutSeconds, float keepaliveTime, + int sleepMilliseconds, ConcurrentQueue> outgoingQueue, ConcurrentQueue> incomingQueue, CancellationToken token) @@ -356,7 +361,7 @@ static async Task ConnectionThread( SendKeepalive(networkStream); readerCancellation = new CancellationTokenSource(); - _ = Task.Run(() => ReaderThread(nextReaderIdx, networkStream, incomingQueue, readerCancellation.Token)); + _ = Task.Run(() => ReaderThread(nextReaderIdx, networkStream, incomingQueue, sleepMilliseconds, readerCancellation.Token)); nextReaderIdx++; // connected, now just watch our queue for outgoing messages to send (or else send a keepalive message occasionally) @@ -367,7 +372,7 @@ static async Task ConnectionThread( token.ThrowIfCancellationRequested(); while (!outgoingQueue.TryDequeue(out data)) { - Thread.Yield(); + Thread.Sleep(sleepMilliseconds); if (s_RealTimeSinceStartup > waitingSinceRealTime + keepaliveTime) { SendKeepalive(networkStream); @@ -405,13 +410,13 @@ static async Task ConnectionThread( } } - static async Task ReaderThread(int readerIdx, NetworkStream networkStream, ConcurrentQueue> queue, CancellationToken token) + static async Task ReaderThread(int readerIdx, NetworkStream networkStream, ConcurrentQueue> queue, int sleepMilliseconds, CancellationToken token) { while (!token.IsCancellationRequested) { try { - Tuple content = await ReadMessageContents(networkStream, token); + Tuple content = await ReadMessageContents(networkStream, sleepMilliseconds, token); //Debug.Log($"Message {content.Item1} received"); queue.Enqueue(content); } @@ -425,13 +430,13 @@ static async Task ReaderThread(int readerIdx, NetworkStream networkStream, Concu } } - static async Task ReadToByteArray(NetworkStream networkStream, byte[] array, int length, CancellationToken token) + static async Task ReadToByteArray(NetworkStream networkStream, byte[] array, int length, int sleepMilliseconds, CancellationToken token) { int read = 0; while (read < length && networkStream.CanRead) { while (!token.IsCancellationRequested && !networkStream.DataAvailable) - await Task.Yield(); + await Task.Delay(sleepMilliseconds); token.ThrowIfCancellationRequested(); read += await networkStream.ReadAsync(array, read, length - read, token); @@ -444,10 +449,10 @@ static async Task ReadToByteArray(NetworkStream networkStream, byte[] array, int static byte[] s_FourBytes = new byte[4]; static byte[] s_TopicScratchSpace = new byte[64]; - static async Task> ReadMessageContents(NetworkStream networkStream, CancellationToken token) + static async Task> ReadMessageContents(NetworkStream networkStream, int sleepMilliseconds, CancellationToken token) { // Get first bytes to determine length of topic name - await ReadToByteArray(networkStream, s_FourBytes, 4, token); + await ReadToByteArray(networkStream, s_FourBytes, 4, sleepMilliseconds, token); int topicLength = BitConverter.ToInt32(s_FourBytes, 0); // If our topic buffer isn't large enough, make a larger one (and keep it that size; assume that's the new standard) @@ -455,14 +460,14 @@ static async Task> ReadMessageContents(NetworkStream netwo s_TopicScratchSpace = new byte[topicLength]; // Read and convert topic name - await ReadToByteArray(networkStream, s_TopicScratchSpace, topicLength, token); + await ReadToByteArray(networkStream, s_TopicScratchSpace, topicLength, sleepMilliseconds, token); string topicName = Encoding.ASCII.GetString(s_TopicScratchSpace, 0, topicLength); - await ReadToByteArray(networkStream, s_FourBytes, 4, token); + await ReadToByteArray(networkStream, s_FourBytes, 4, sleepMilliseconds, token); int full_message_size = BitConverter.ToInt32(s_FourBytes, 0); byte[] readBuffer = new byte[full_message_size]; - await ReadToByteArray(networkStream, readBuffer, full_message_size, token); + await ReadToByteArray(networkStream, readBuffer, full_message_size, sleepMilliseconds, token); return Tuple.Create(topicName, readBuffer); } From 50990daf288c62eb32d3b0f7b315c5af58f4adb1 Mon Sep 17 00:00:00 2001 From: LaurieCheers <73140792+LaurieCheers-unity@users.noreply.github.com> Date: Wed, 5 May 2021 19:57:52 -0700 Subject: [PATCH 11/11] Local variable naming --- .../Runtime/TcpConnector/HUDPanel.cs | 87 +++++++++---------- 1 file changed, 42 insertions(+), 45 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs index 6c7df925..4fd1ce05 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs @@ -8,37 +8,34 @@ namespace Unity.Robotics.ROSTCPConnector public class HUDPanel : MonoBehaviour { // GUI variables - GUIStyle labelStyle; - GUIStyle contentStyle; - GUIStyle messageStyle; - bool viewSent = false; - bool viewRecv = false; - bool viewSrvs = false; - Rect scrollRect; - bool redrawGUI = false; + GUIStyle m_LabelStyle; + GUIStyle m_ContentStyle; + GUIStyle m_MessageStyle; + bool m_ViewSent = false; + bool m_ViewRecv = false; + bool m_ViewSrvs = false; + Rect m_ScrollRect; // ROS Message variables internal bool isEnabled; internal string host; - MessageViewState lastMessageSent; - string lastMessageSentMeta = "None"; + MessageViewState m_LastMessageSent; + string m_LastMessageSentMeta = "None"; public void SetLastMessageSent(string topic, Message message) { - lastMessageSent = new MessageViewState() { label = "Last Message Sent:", message = message }; - lastMessageSentMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; - redrawGUI = true; + m_LastMessageSent = new MessageViewState() { label = "Last Message Sent:", message = message }; + m_LastMessageSentMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; } - MessageViewState lastMessageReceived; - string lastMessageReceivedMeta = "None"; + MessageViewState m_LastMessageReceived; + string m_LastMessageReceivedMeta = "None"; public void SetLastMessageReceived(string topic, Message message) { - lastMessageReceived = new MessageViewState() { label = "Last Message Received:", message = message }; - lastMessageReceivedMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; - redrawGUI = true; + m_LastMessageReceived = new MessageViewState() { label = "Last Message Received:", message = message }; + m_LastMessageReceivedMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; } List activeServices = new List(); @@ -81,7 +78,7 @@ public void AddServiceResponse(int serviceID, Message response) void Awake() { // Define font styles - labelStyle = new GUIStyle + m_LabelStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, normal = { textColor = Color.white }, @@ -89,7 +86,7 @@ void Awake() fixedWidth = 250 }; - contentStyle = new GUIStyle + m_ContentStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, padding = new RectOffset(10, 0, 0, 5), @@ -97,7 +94,7 @@ void Awake() fixedWidth = 300 }; - messageStyle = new GUIStyle + m_MessageStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, padding = new RectOffset(10, 0, 5, 5), @@ -106,7 +103,7 @@ void Awake() wordWrap = true }; - scrollRect = new Rect(); + m_ScrollRect = new Rect(); } void OnGUI() @@ -118,49 +115,49 @@ void OnGUI() GUILayout.BeginVertical("box"); // ROS IP Setup - GUILayout.Label("ROS IP:", labelStyle); - GUILayout.Label(host, contentStyle); + GUILayout.Label("ROS IP:", m_LabelStyle); + GUILayout.Label(host, m_ContentStyle); // Last message sent - GUILayout.Label("Last Message Sent:", labelStyle); - GUILayout.Label(lastMessageSentMeta, contentStyle); - if (lastMessageSent != null) - viewSent = GUILayout.Toggle(viewSent, "View contents"); + GUILayout.Label("Last Message Sent:", m_LabelStyle); + GUILayout.Label(m_LastMessageSentMeta, m_ContentStyle); + if (m_LastMessageSent != null) + m_ViewSent = GUILayout.Toggle(m_ViewSent, "View contents"); // Last message received - GUILayout.Label("Last Message Received:", labelStyle); - GUILayout.Label(lastMessageReceivedMeta, contentStyle); - if (lastMessageReceived != null) - viewRecv = GUILayout.Toggle(viewRecv, "View contents"); + GUILayout.Label("Last Message Received:", m_LabelStyle); + GUILayout.Label(m_LastMessageReceivedMeta, m_ContentStyle); + if (m_LastMessageReceived != null) + m_ViewRecv = GUILayout.Toggle(m_ViewRecv, "View contents"); - GUILayout.Label($"{activeServices.Count} Active Service Requests:", labelStyle); + GUILayout.Label($"{activeServices.Count} Active Service Requests:", m_LabelStyle); if (activeServices.Count > 0) { var dots = new String('.', (int)Time.time % 4); - GUILayout.Label($"Waiting for service response{dots}", contentStyle); + GUILayout.Label($"Waiting for service response{dots}", m_ContentStyle); } - viewSrvs = GUILayout.Toggle(viewSrvs, "View services status"); + m_ViewSrvs = GUILayout.Toggle(m_ViewSrvs, "View services status"); GUILayout.EndVertical(); // Update length of scroll if (GUILayoutUtility.GetLastRect().height > 1 && GUILayoutUtility.GetLastRect().width > 1) - scrollRect = GUILayoutUtility.GetLastRect(); + m_ScrollRect = GUILayoutUtility.GetLastRect(); // Optionally show message contents - float y = scrollRect.yMax; - if (viewSent) + float y = m_ScrollRect.yMax; + if (m_ViewSent) { - y = ShowMessage(lastMessageSent, y); + y = ShowMessage(m_LastMessageSent, y); } - if (viewRecv) + if (m_ViewRecv) { - y = ShowMessage(lastMessageReceived, y); + y = ShowMessage(m_LastMessageReceived, y); } - if (viewSrvs) + if (m_ViewSrvs) { foreach (MessageViewState service in activeServices) { @@ -210,10 +207,10 @@ float ShowMessage(MessageViewState msgView, float y, bool showElapsedTime = fals // Paste contents of message if (showElapsedTime) - GUILayout.Label($"{msgView.label} ({Time.time - msgView.timestamp})", labelStyle); + GUILayout.Label($"{msgView.label} ({Time.time - msgView.timestamp})", m_LabelStyle); else - GUILayout.Label(msgView.label, labelStyle); - GUILayout.Label(msgView.message.ToString(), messageStyle); + GUILayout.Label(msgView.label, m_LabelStyle); + GUILayout.Label(msgView.message.ToString(), m_MessageStyle); GUILayout.EndVertical(); GUI.EndScrollView();