From 0eb65fd31b55351ab81bda5a1746053ccf03063c Mon Sep 17 00:00:00 2001 From: at669 Date: Thu, 4 Feb 2021 18:01:59 -0700 Subject: [PATCH 1/6] Read message in chunks min --- .../Runtime/TcpConnector/ROSConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index d0230306..5e338682 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -313,7 +313,7 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) while (networkStream.DataAvailable && bytesRemaining > 0) { - int bytesRead = networkStream.Read(readBuffer, totalBytesRead, bytesRemaining); + int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(2048, bytesRemaining)); totalBytesRead += bytesRead; bytesRemaining -= bytesRead; } From 70c718d71b63728a4bf60be9b97205852d46ffbd Mon Sep 17 00:00:00 2001 From: at669 Date: Fri, 5 Feb 2021 10:37:54 -0700 Subject: [PATCH 2/6] Removing unpredictable DataAvailable check --- .../Runtime/TcpConnector/ROSConnection.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 5e338682..3f6f5b0d 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -311,7 +311,7 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) int bytesRemaining = full_message_size; int totalBytesRead = 0; - while (networkStream.DataAvailable && bytesRemaining > 0) + while (bytesRemaining > 0) { int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(2048, bytesRemaining)); totalBytesRead += bytesRead; From 4d2078321a225766b213b5abb4ec5e33385f03da Mon Sep 17 00:00:00 2001 From: at669 Date: Fri, 5 Feb 2021 12:41:02 -0700 Subject: [PATCH 3/6] Configurable byte chun k size --- .../Editor/ROSSettingsEditor.cs | 4 ++++ .../Runtime/TcpConnector/ROSConnection.cs | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 021c7e5d..78d17b94 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -58,6 +58,10 @@ protected virtual void OnGUI() new GUIContent("Sleep (seconds)", "While waiting for a service to respond, wait this many seconds between checks."), prefab.awaitDataSleepSeconds); + prefab.byteChunkToRead = EditorGUILayout.IntField( + new GUIContent("Read chunk size", + "While reading received messages, read this many bytes at a time."), + prefab.byteChunkToRead); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 3f6f5b0d..adcf53f2 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -33,6 +33,9 @@ public class ROSConnection : MonoBehaviour [Tooltip("While waiting for a service to respond, wait this many seconds between checks.")] public float awaitDataSleepSeconds = 1.0f; + [Tooltip("While reading received messages, read this many bytes at a time.")] + public int byteChunkToRead = 2048; + static object _lock = new object(); // sync lock static List activeConnectionTasks = new List(); // pending connections @@ -313,7 +316,8 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) while (bytesRemaining > 0) { - int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(2048, bytesRemaining)); + // Read the minimum of the bytes remaining, or the designated byteChunkToRead in segments until none remain + int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(byteChunkToRead, bytesRemaining)); totalBytesRead += bytesRead; bytesRemaining -= bytesRead; } From b8c42bb7daac39ba3ade6be8c3e5e9136f3b830e Mon Sep 17 00:00:00 2001 From: at669 Date: Fri, 5 Feb 2021 13:03:59 -0700 Subject: [PATCH 4/6] Prevent infinite hang on reading message contents --- .../Editor/ROSSettingsEditor.cs | 4 ++-- .../Runtime/TcpConnector/ROSConnection.cs | 22 ++++++++++++++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 78d17b94..1192c227 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -58,10 +58,10 @@ protected virtual void OnGUI() new GUIContent("Sleep (seconds)", "While waiting for a service to respond, wait this many seconds between checks."), prefab.awaitDataSleepSeconds); - prefab.byteChunkToRead = EditorGUILayout.IntField( + prefab.readChunkSize = EditorGUILayout.IntField( new GUIContent("Read chunk size", "While reading received messages, read this many bytes at a time."), - prefab.byteChunkToRead); + prefab.readChunkSize); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index adcf53f2..247d5dbb 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -34,7 +34,7 @@ public class ROSConnection : MonoBehaviour public float awaitDataSleepSeconds = 1.0f; [Tooltip("While reading received messages, read this many bytes at a time.")] - public int byteChunkToRead = 2048; + public int readChunkSize = 2048; static object _lock = new object(); // sync lock static List activeConnectionTasks = new List(); // pending connections @@ -314,14 +314,26 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) int bytesRemaining = full_message_size; int totalBytesRead = 0; - while (bytesRemaining > 0) + int attempts = 0; + // Read in message contents until completion, or until attempts are maxed out + while (bytesRemaining > 0 && attempts <= this.awaitDataMaxRetries) { - // Read the minimum of the bytes remaining, or the designated byteChunkToRead in segments until none remain - int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(byteChunkToRead, bytesRemaining)); + if (attempts == this.awaitDataMaxRetries) + { + Debug.LogError("No more data available on network stream after " + awaitDataMaxRetries + " attempts."); + return readBuffer; + } + + // Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain + int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(readChunkSize, bytesRemaining)); totalBytesRead += bytesRead; bytesRemaining -= bytesRead; - } + if (!networkStream.DataAvailable) + { + attempts++; + } + } return readBuffer; } From fb965da124ed7d516caf4fa46fa4e914e697ba1b Mon Sep 17 00:00:00 2001 From: at669 Date: Fri, 5 Feb 2021 14:12:38 -0700 Subject: [PATCH 5/6] Wait for frame on data read retry --- .../Editor/ROSSettingsEditor.cs | 8 +++-- .../Runtime/TcpConnector/ROSConnection.cs | 31 ++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 1192c227..8d9ae9a5 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -11,7 +11,7 @@ public static void OpenWindow() { ROSSettingsEditor window = GetWindow(false, "ROS Settings", true); window.minSize = new Vector2(300, 65); - window.maxSize = new Vector2(600, 200); + window.maxSize = new Vector2(600, 250); window.Show(); } @@ -51,7 +51,7 @@ protected virtual void OnGUI() EditorGUILayout.Space(); EditorGUILayout.LabelField("If awaiting a service response:", EditorStyles.boldLabel); prefab.awaitDataMaxRetries = EditorGUILayout.IntField( - new GUIContent("Max Retries", + new GUIContent("Max Service Retries", "While waiting for a service to respond, check this many times before giving up."), prefab.awaitDataMaxRetries); prefab.awaitDataSleepSeconds = EditorGUILayout.FloatField( @@ -62,6 +62,10 @@ protected virtual void OnGUI() new GUIContent("Read chunk size", "While reading received messages, read this many bytes at a time."), prefab.readChunkSize); + prefab.awaitDataReadRetry = EditorGUILayout.IntField( + new GUIContent("Max Read retries", + "While waiting to read a full message, check this many times before giving up."), + prefab.awaitDataReadRetry); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index 247d5dbb..bb8208a2 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -36,6 +36,9 @@ public class ROSConnection : MonoBehaviour [Tooltip("While reading received messages, read this many bytes at a time.")] public int readChunkSize = 2048; + [Tooltip("While waiting to read a full message, check this many times before giving up.")] + public int awaitDataReadRetry = 10; + static object _lock = new object(); // sync lock static List activeConnectionTasks = new List(); // pending connections @@ -147,7 +150,9 @@ public async void SendServiceMessage(string rosServiceName, Message se try { string serviceName; - byte[] content = ReadMessageContents(networkStream, out serviceName); + var messageContents = await ReadMessageContents(networkStream); + var topicName = messageContents.Item1; + var content = messageContents.Item2; serviceResponse.Deserialize(content, 0); } catch (Exception e) @@ -254,18 +259,19 @@ protected async Task HandleConnectionAsync(TcpClient tcpClient) await Task.Yield(); // continue asynchronously on another thread - ReadMessage(tcpClient.GetStream()); + await ReadMessage(tcpClient.GetStream()); } - void ReadMessage(NetworkStream networkStream) + async Task ReadMessage(NetworkStream networkStream) { if (!networkStream.CanRead) return; SubscriberCallback subs; - string topicName; - byte[] content = ReadMessageContents(networkStream, out topicName); + var messageContents = await ReadMessageContents(networkStream); + var topicName = messageContents.Item1; + var content = messageContents.Item2; if (!subscribers.TryGetValue(topicName, out subs)) return; // not interested in this topic @@ -294,7 +300,7 @@ void ReadMessage(NetworkStream networkStream) } } - byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) + async Task> ReadMessageContents(NetworkStream networkStream) { // Get first bytes to determine length of topic name byte[] rawTopicBytes = new byte[4]; @@ -304,7 +310,7 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) // Read and convert topic name byte[] topicNameBytes = new byte[topicLength]; networkStream.Read(topicNameBytes, 0, topicNameBytes.Length); - topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); + string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); byte[] full_message_size_bytes = new byte[4]; networkStream.Read(full_message_size_bytes, 0, full_message_size_bytes.Length); @@ -316,12 +322,12 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) int attempts = 0; // Read in message contents until completion, or until attempts are maxed out - while (bytesRemaining > 0 && attempts <= this.awaitDataMaxRetries) + while (bytesRemaining > 0 && attempts <= this.awaitDataReadRetry) { - if (attempts == this.awaitDataMaxRetries) + if (attempts == this.awaitDataReadRetry) { - Debug.LogError("No more data available on network stream after " + awaitDataMaxRetries + " attempts."); - return readBuffer; + Debug.LogError("No more data to read network stream after " + awaitDataReadRetry + " attempts."); + return Tuple.Create(topicName, readBuffer); } // Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain @@ -332,9 +338,10 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) if (!networkStream.DataAvailable) { attempts++; + await Task.Yield(); } } - return readBuffer; + return Tuple.Create(topicName, readBuffer); } /// From f4550890fd473da7d8af24f4ef23e51f31df4830 Mon Sep 17 00:00:00 2001 From: at669 Date: Fri, 5 Feb 2021 15:50:51 -0700 Subject: [PATCH 6/6] PR Feedback --- .../Runtime/TcpConnector/ROSConnection.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index bb8208a2..e1d08144 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -150,9 +150,7 @@ public async void SendServiceMessage(string rosServiceName, Message se try { string serviceName; - var messageContents = await ReadMessageContents(networkStream); - var topicName = messageContents.Item1; - var content = messageContents.Item2; + (string topicName, byte[] content) = await ReadMessageContents(networkStream); serviceResponse.Deserialize(content, 0); } catch (Exception e) @@ -269,9 +267,7 @@ async Task ReadMessage(NetworkStream networkStream) SubscriberCallback subs; - var messageContents = await ReadMessageContents(networkStream); - var topicName = messageContents.Item1; - var content = messageContents.Item2; + (string topicName, byte[] content) = await ReadMessageContents(networkStream); if (!subscribers.TryGetValue(topicName, out subs)) return; // not interested in this topic