diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 021c7e5d..8d9ae9a5 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -11,7 +11,7 @@ public static void OpenWindow() { ROSSettingsEditor window = GetWindow(false, "ROS Settings", true); window.minSize = new Vector2(300, 65); - window.maxSize = new Vector2(600, 200); + window.maxSize = new Vector2(600, 250); window.Show(); } @@ -51,13 +51,21 @@ protected virtual void OnGUI() EditorGUILayout.Space(); EditorGUILayout.LabelField("If awaiting a service response:", EditorStyles.boldLabel); prefab.awaitDataMaxRetries = EditorGUILayout.IntField( - new GUIContent("Max Retries", + new GUIContent("Max Service Retries", "While waiting for a service to respond, check this many times before giving up."), prefab.awaitDataMaxRetries); prefab.awaitDataSleepSeconds = EditorGUILayout.FloatField( new GUIContent("Sleep (seconds)", "While waiting for a service to respond, wait this many seconds between checks."), prefab.awaitDataSleepSeconds); + prefab.readChunkSize = EditorGUILayout.IntField( + new GUIContent("Read chunk size", + "While reading received messages, read this many bytes at a time."), + prefab.readChunkSize); + prefab.awaitDataReadRetry = EditorGUILayout.IntField( + new GUIContent("Max Read retries", + "While waiting to read a full message, check this many times before giving up."), + prefab.awaitDataReadRetry); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index d0230306..e1d08144 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -33,6 +33,12 @@ public class ROSConnection : MonoBehaviour [Tooltip("While waiting for a service to respond, wait this many seconds between checks.")] public float awaitDataSleepSeconds = 1.0f; + [Tooltip("While reading received messages, read this many bytes at a time.")] + public int readChunkSize = 2048; + + [Tooltip("While waiting to read a full message, check this many times before giving up.")] + public int awaitDataReadRetry = 10; + static object _lock = new object(); // sync lock static List activeConnectionTasks = new List(); // pending connections @@ -144,7 +150,7 @@ public async void SendServiceMessage(string rosServiceName, Message se try { string serviceName; - byte[] content = ReadMessageContents(networkStream, out serviceName); + (string topicName, byte[] content) = await ReadMessageContents(networkStream); serviceResponse.Deserialize(content, 0); } catch (Exception e) @@ -251,18 +257,17 @@ protected async Task HandleConnectionAsync(TcpClient tcpClient) await Task.Yield(); // continue asynchronously on another thread - ReadMessage(tcpClient.GetStream()); + await ReadMessage(tcpClient.GetStream()); } - void ReadMessage(NetworkStream networkStream) + async Task ReadMessage(NetworkStream networkStream) { if (!networkStream.CanRead) return; SubscriberCallback subs; - string topicName; - byte[] content = ReadMessageContents(networkStream, out topicName); + (string topicName, byte[] content) = await ReadMessageContents(networkStream); if (!subscribers.TryGetValue(topicName, out subs)) return; // not interested in this topic @@ -291,7 +296,7 @@ void ReadMessage(NetworkStream networkStream) } } - byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) + async Task> ReadMessageContents(NetworkStream networkStream) { // Get first bytes to determine length of topic name byte[] rawTopicBytes = new byte[4]; @@ -301,7 +306,7 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) // Read and convert topic name byte[] topicNameBytes = new byte[topicLength]; networkStream.Read(topicNameBytes, 0, topicNameBytes.Length); - topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); + string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); byte[] full_message_size_bytes = new byte[4]; networkStream.Read(full_message_size_bytes, 0, full_message_size_bytes.Length); @@ -311,14 +316,28 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName) int bytesRemaining = full_message_size; int totalBytesRead = 0; - while (networkStream.DataAvailable && bytesRemaining > 0) + int attempts = 0; + // Read in message contents until completion, or until attempts are maxed out + while (bytesRemaining > 0 && attempts <= this.awaitDataReadRetry) { - int bytesRead = networkStream.Read(readBuffer, totalBytesRead, bytesRemaining); + if (attempts == this.awaitDataReadRetry) + { + Debug.LogError("No more data to read network stream after " + awaitDataReadRetry + " attempts."); + return Tuple.Create(topicName, readBuffer); + } + + // Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain + int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(readChunkSize, bytesRemaining)); totalBytesRead += bytesRead; bytesRemaining -= bytesRead; - } - return readBuffer; + if (!networkStream.DataAvailable) + { + attempts++; + await Task.Yield(); + } + } + return Tuple.Create(topicName, readBuffer); } ///