diff --git a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs index 7fd13302..e63b81f1 100644 --- a/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs +++ b/com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs @@ -39,45 +39,38 @@ protected virtual void OnGUI() } } + prefab.ConnectOnStart = EditorGUILayout.Toggle("Connect on Startup", prefab.ConnectOnStart); + EditorGUILayout.LabelField("Settings for a new ROSConnection.instance", EditorStyles.boldLabel); - prefab.rosIPAddress = EditorGUILayout.TextField("ROS IP Address", prefab.rosIPAddress); - prefab.rosPort = EditorGUILayout.IntField("ROS Port", prefab.rosPort); + prefab.RosIPAddress = EditorGUILayout.TextField("ROS IP Address", prefab.RosIPAddress); + prefab.RosPort = EditorGUILayout.IntField("ROS Port", prefab.RosPort); EditorGUILayout.Space(); - prefab.overrideUnityIP = EditorGUILayout.TextField( - new GUIContent("Override Unity IP Address", "If blank, determine IP automatically."), - prefab.overrideUnityIP); - prefab.unityPort = EditorGUILayout.IntField("Unity Port", prefab.unityPort); - if ((prefab.overrideUnityIP != "" && !ROSConnection.IPFormatIsCorrect(prefab.overrideUnityIP))) - { - EditorGUILayout.HelpBox("Unity Override IP invalid", MessageType.Warning); - } - if(!ROSConnection.IPFormatIsCorrect(prefab.rosIPAddress)) + if (!ROSConnection.IPFormatIsCorrect(prefab.RosIPAddress)) { EditorGUILayout.HelpBox("ROS IP is invalid", MessageType.Warning); } + EditorGUILayout.Space(); - EditorGUILayout.LabelField("If awaiting a service response:", EditorStyles.boldLabel); - prefab.awaitDataMaxRetries = EditorGUILayout.IntField( - new GUIContent("Max Service Retries", - "While waiting for a service to respond, check this many times before giving up."), - prefab.awaitDataMaxRetries); - prefab.awaitDataSleepSeconds = EditorGUILayout.FloatField( - new GUIContent("Sleep (seconds)", - "While waiting for a service to respond, wait this many seconds between checks."), - prefab.awaitDataSleepSeconds); - prefab.readChunkSize = EditorGUILayout.IntField( - new GUIContent("Read chunk size", - "While reading received messages, read this many bytes at a time."), - prefab.readChunkSize); - prefab.awaitDataReadRetry = EditorGUILayout.IntField( - new GUIContent("Max Read retries", - "While waiting to read a full message, check this many times before giving up."), - prefab.awaitDataReadRetry); - prefab.timeoutOnIdle = EditorGUILayout.FloatField( - new GUIContent("Timeout on idle (seconds)", - "If no messages have been sent for this long, close the connection."), - prefab.timeoutOnIdle); + + prefab.ShowHud = EditorGUILayout.Toggle("Show HUD", prefab.ShowHud); + + EditorGUILayout.Space(); + + prefab.KeepaliveTime = EditorGUILayout.FloatField( + new GUIContent("KeepAlive time (secs)", + "If no other messages are being sent, test the connection this often. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), + prefab.KeepaliveTime); + + prefab.NetworkTimeoutSeconds = EditorGUILayout.FloatField( + new GUIContent("Network timeout (secs)", + "If a network message takes this long to send, assume the connection has failed. (The longer this time is, the longer it will take for ROSConnection to notice the Endpoint has stopped responding)."), + prefab.NetworkTimeoutSeconds); + + prefab.SleepTimeSeconds = EditorGUILayout.FloatField( + new GUIContent("Sleep time (secs)", + "Sleep this long before checking for new network messages. (Decreasing this time will make it respond faster, but consume more CPU)."), + prefab.SleepTimeSeconds); if (GUI.changed) { diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs new file mode 100644 index 00000000..163288fc --- /dev/null +++ b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs @@ -0,0 +1,79 @@ +//Do not edit! This file was generated by Unity-ROS MessageGeneration. +using System; +using System.Linq; +using System.Collections.Generic; +using System.Text; +using Unity.Robotics.ROSTCPConnector.MessageGeneration; + +namespace RosMessageTypes.RosTcpEndpoint +{ + public class MRosUnitySrvMessage : Message + { + public const string RosMessageName = "ros_tcp_endpoint/RosUnitySrvMessage"; + + public int srv_id; + public bool is_request; + public string topic; + public byte[] payload; + + public MRosUnitySrvMessage() + { + this.srv_id = 0; + this.is_request = false; + this.topic = ""; + this.payload = new byte[0]; + } + + public MRosUnitySrvMessage(int srv_id, bool is_request, string topic, byte[] payload) + { + this.srv_id = srv_id; + this.is_request = is_request; + this.topic = topic; + this.payload = payload; + } + public override List SerializationStatements() + { + var listOfSerializations = new List(); + listOfSerializations.Add(BitConverter.GetBytes(this.srv_id)); + listOfSerializations.Add(BitConverter.GetBytes(this.is_request)); + listOfSerializations.Add(SerializeString(this.topic)); + + listOfSerializations.Add(BitConverter.GetBytes(payload.Length)); + listOfSerializations.Add(this.payload); + + return listOfSerializations; + } + + public override int Deserialize(byte[] data, int offset) + { + this.srv_id = BitConverter.ToInt32(data, offset); + offset += 4; + this.is_request = BitConverter.ToBoolean(data, offset); + offset += 1; + var topicStringBytesLength = DeserializeLength(data, offset); + offset += 4; + this.topic = DeserializeString(data, offset, topicStringBytesLength); + offset += topicStringBytesLength; + + var payloadArrayLength = DeserializeLength(data, offset); + offset += 4; + this.payload= new byte[payloadArrayLength]; + for(var i = 0; i < payloadArrayLength; i++) + { + this.payload[i] = data[offset]; + offset += 1; + } + + return offset; + } + + public override string ToString() + { + return "MRosUnitySrvMessage: " + + "\nsrv_id: " + srv_id.ToString() + + "\nis_request: " + is_request.ToString() + + "\ntopic: " + topic.ToString() + + "\npayload: " + System.String.Join(", ", payload.ToList()); + } + } +} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs.meta b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs.meta similarity index 83% rename from com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs.meta rename to com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs.meta index 866160a0..f1b1f543 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs.meta +++ b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/msg/MRosUnitySrvMessage.cs.meta @@ -1,5 +1,5 @@ fileFormatVersion: 2 -guid: 5e9ad0d71580f0a439cdd07e18f4fe0e +guid: 948d826c167dec548b1ac5e0c8393d86 MonoImporter: externalObjects: {} serializedVersion: 2 diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs deleted file mode 100644 index 0f8e3c35..00000000 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeRequest.cs +++ /dev/null @@ -1,56 +0,0 @@ -//Do not edit! This file was generated by Unity-ROS MessageGeneration. -using System; -using System.Linq; -using System.Collections.Generic; -using System.Text; -using Unity.Robotics.ROSTCPConnector.MessageGeneration; - -namespace RosMessageTypes.RosTcpEndpoint -{ - public class MUnityHandshakeRequest : Message - { - public const string RosMessageName = "Ros_Tcp_Endpoint/UnityHandshake"; - - public string ip; - public ushort port; - - public MUnityHandshakeRequest() - { - this.ip = ""; - this.port = 0; - } - - public MUnityHandshakeRequest(string ip, ushort port) - { - this.ip = ip; - this.port = port; - } - public override List SerializationStatements() - { - var listOfSerializations = new List(); - listOfSerializations.Add(SerializeString(this.ip)); - listOfSerializations.Add(BitConverter.GetBytes(this.port)); - - return listOfSerializations; - } - - public override int Deserialize(byte[] data, int offset) - { - var ipStringBytesLength = DeserializeLength(data, offset); - offset += 4; - this.ip = DeserializeString(data, offset, ipStringBytesLength); - offset += ipStringBytesLength; - this.port = BitConverter.ToUInt16(data, offset); - offset += 2; - - return offset; - } - - public override string ToString() - { - return "MUnityHandshakeRequest: " + - "\nip: " + ip.ToString() + - "\nport: " + port.ToString(); - } - } -} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs b/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs deleted file mode 100644 index e2191fec..00000000 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs +++ /dev/null @@ -1,49 +0,0 @@ -//Do not edit! This file was generated by Unity-ROS MessageGeneration. -using System; -using System.Linq; -using System.Collections.Generic; -using System.Text; -using Unity.Robotics.ROSTCPConnector.MessageGeneration; - -namespace RosMessageTypes.RosTcpEndpoint -{ - public class MUnityHandshakeResponse : Message - { - public const string RosMessageName = "Ros_Tcp_Endpoint/UnityHandshake"; - - public string ip; - - public MUnityHandshakeResponse() - { - this.ip = ""; - } - - public MUnityHandshakeResponse(string ip) - { - this.ip = ip; - } - public override List SerializationStatements() - { - var listOfSerializations = new List(); - listOfSerializations.Add(SerializeString(this.ip)); - - return listOfSerializations; - } - - public override int Deserialize(byte[] data, int offset) - { - var ipStringBytesLength = DeserializeLength(data, offset); - offset += 4; - this.ip = DeserializeString(data, offset, ipStringBytesLength); - offset += ipStringBytesLength; - - return offset; - } - - public override string ToString() - { - return "MUnityHandshakeResponse: " + - "\nip: " + ip.ToString(); - } - } -} diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs index 0da97379..4fd1ce05 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/HUDPanel.cs @@ -8,37 +8,34 @@ namespace Unity.Robotics.ROSTCPConnector public class HUDPanel : MonoBehaviour { // GUI variables - GUIStyle labelStyle; - GUIStyle contentStyle; - GUIStyle messageStyle; - bool viewSent = false; - bool viewRecv = false; - bool viewSrvs = false; - Rect scrollRect; - bool redrawGUI = false; + GUIStyle m_LabelStyle; + GUIStyle m_ContentStyle; + GUIStyle m_MessageStyle; + bool m_ViewSent = false; + bool m_ViewRecv = false; + bool m_ViewSrvs = false; + Rect m_ScrollRect; // ROS Message variables internal bool isEnabled; internal string host; - MessageViewState lastMessageSent; - string lastMessageSentMeta = "None"; + MessageViewState m_LastMessageSent; + string m_LastMessageSentMeta = "None"; public void SetLastMessageSent(string topic, Message message) { - lastMessageSent = new MessageViewState() {label = "Last Message Sent:", message = message}; - lastMessageSentMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; - redrawGUI = true; + m_LastMessageSent = new MessageViewState() { label = "Last Message Sent:", message = message }; + m_LastMessageSentMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; } - MessageViewState lastMessageReceived; - string lastMessageReceivedMeta = "None"; + MessageViewState m_LastMessageReceived; + string m_LastMessageReceivedMeta = "None"; public void SetLastMessageReceived(string topic, Message message) { - lastMessageReceived = new MessageViewState() {label = "Last Message Received:", message = message}; - lastMessageReceivedMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; - redrawGUI = true; + m_LastMessageReceived = new MessageViewState() { label = "Last Message Received:", message = message }; + m_LastMessageReceivedMeta = $"{topic} (time: {System.DateTime.Now.TimeOfDay})"; } List activeServices = new List(); @@ -81,32 +78,32 @@ public void AddServiceResponse(int serviceID, Message response) void Awake() { // Define font styles - labelStyle = new GUIStyle + m_LabelStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, - normal = {textColor = Color.white}, + normal = { textColor = Color.white }, fontStyle = FontStyle.Bold, fixedWidth = 250 }; - contentStyle = new GUIStyle + m_ContentStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, padding = new RectOffset(10, 0, 0, 5), - normal = {textColor = Color.white}, + normal = { textColor = Color.white }, fixedWidth = 300 }; - messageStyle = new GUIStyle + m_MessageStyle = new GUIStyle { alignment = TextAnchor.MiddleLeft, padding = new RectOffset(10, 0, 5, 5), - normal = {textColor = Color.white}, + normal = { textColor = Color.white }, fixedWidth = 300, wordWrap = true }; - scrollRect = new Rect(); + m_ScrollRect = new Rect(); } void OnGUI() @@ -118,49 +115,49 @@ void OnGUI() GUILayout.BeginVertical("box"); // ROS IP Setup - GUILayout.Label("ROS IP:", labelStyle); - GUILayout.Label(host, contentStyle); + GUILayout.Label("ROS IP:", m_LabelStyle); + GUILayout.Label(host, m_ContentStyle); // Last message sent - GUILayout.Label("Last Message Sent:", labelStyle); - GUILayout.Label(lastMessageSentMeta, contentStyle); - if (lastMessageSent != null) - viewSent = GUILayout.Toggle(viewSent, "View contents"); + GUILayout.Label("Last Message Sent:", m_LabelStyle); + GUILayout.Label(m_LastMessageSentMeta, m_ContentStyle); + if (m_LastMessageSent != null) + m_ViewSent = GUILayout.Toggle(m_ViewSent, "View contents"); // Last message received - GUILayout.Label("Last Message Received:", labelStyle); - GUILayout.Label(lastMessageReceivedMeta, contentStyle); - if (lastMessageReceived != null) - viewRecv = GUILayout.Toggle(viewRecv, "View contents"); + GUILayout.Label("Last Message Received:", m_LabelStyle); + GUILayout.Label(m_LastMessageReceivedMeta, m_ContentStyle); + if (m_LastMessageReceived != null) + m_ViewRecv = GUILayout.Toggle(m_ViewRecv, "View contents"); - GUILayout.Label($"{activeServices.Count} Active Service Requests:", labelStyle); + GUILayout.Label($"{activeServices.Count} Active Service Requests:", m_LabelStyle); if (activeServices.Count > 0) { - var dots = new String('.', (int) Time.time % 4); - GUILayout.Label($"Waiting for service response{dots}", contentStyle); + var dots = new String('.', (int)Time.time % 4); + GUILayout.Label($"Waiting for service response{dots}", m_ContentStyle); } - viewSrvs = GUILayout.Toggle(viewSrvs, "View services status"); + m_ViewSrvs = GUILayout.Toggle(m_ViewSrvs, "View services status"); GUILayout.EndVertical(); // Update length of scroll if (GUILayoutUtility.GetLastRect().height > 1 && GUILayoutUtility.GetLastRect().width > 1) - scrollRect = GUILayoutUtility.GetLastRect(); + m_ScrollRect = GUILayoutUtility.GetLastRect(); // Optionally show message contents - float y = scrollRect.yMax; - if (viewSent) + float y = m_ScrollRect.yMax; + if (m_ViewSent) { - y = ShowMessage(lastMessageSent, y); + y = ShowMessage(m_LastMessageSent, y); } - if (viewRecv) + if (m_ViewRecv) { - y = ShowMessage(lastMessageReceived, y); + y = ShowMessage(m_LastMessageReceived, y); } - if (viewSrvs) + if (m_ViewSrvs) { foreach (MessageViewState service in activeServices) { @@ -210,10 +207,10 @@ float ShowMessage(MessageViewState msgView, float y, bool showElapsedTime = fals // Paste contents of message if (showElapsedTime) - GUILayout.Label($"{msgView.label} ({Time.time - msgView.timestamp})", labelStyle); + GUILayout.Label($"{msgView.label} ({Time.time - msgView.timestamp})", m_LabelStyle); else - GUILayout.Label(msgView.label, labelStyle); - GUILayout.Label(msgView.message.ToString(), messageStyle); + GUILayout.Label(msgView.label, m_LabelStyle); + GUILayout.Label(msgView.message.ToString(), m_MessageStyle); GUILayout.EndVertical(); GUI.EndScrollView(); diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs index a7546b08..40a7e89f 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/ROSConnection.cs @@ -11,171 +11,147 @@ using System.Globalization; using UnityEngine; using UnityEngine.Serialization; +using System.Collections.Concurrent; +using System.Threading; namespace Unity.Robotics.ROSTCPConnector { public class ROSConnection : MonoBehaviour { // Variables required for ROS communication - [FormerlySerializedAs("hostName")] public string rosIPAddress = "127.0.0.1"; - [FormerlySerializedAs("hostPort")] public int rosPort = 10000; + [SerializeField] + [FormerlySerializedAs("hostName")] + [FormerlySerializedAs("rosIPAddress")] + string m_RosIPAddress = "127.0.0.1"; + public string RosIPAddress { get => m_RosIPAddress; set => m_RosIPAddress = value; } + + [SerializeField] + [FormerlySerializedAs("hostPort")] + [FormerlySerializedAs("rosPort")] + int m_RosPort = 10000; + public int RosPort { get => m_RosPort; set => m_RosPort = value; } + + [SerializeField] + bool m_ConnectOnStart = true; + public bool ConnectOnStart { get => m_ConnectOnStart; set => m_ConnectOnStart = value; } + + [SerializeField] + [Tooltip("Send keepalive message if nothing has been sent for this long (seconds).")] + float m_KeepaliveTime = 10; + public float KeepaliveTime { get => m_KeepaliveTime; set => m_KeepaliveTime = value; } + + [SerializeField] + float m_NetworkTimeoutSeconds = 2; + public float NetworkTimeoutSeconds { get => m_NetworkTimeoutSeconds; set => m_NetworkTimeoutSeconds = value; } + + [SerializeField] + float m_SleepTimeSeconds = 0.01f; + public float SleepTimeSeconds { get => m_SleepTimeSeconds; set => m_SleepTimeSeconds = value; } + + [SerializeField] + [FormerlySerializedAs("showHUD")] + bool m_ShowHUD = true; + public bool ShowHud { get => m_ShowHUD; set => m_ShowHUD = value; } + + const string k_Topic_Error = "__error"; + const string k_Topic_SysCommand = "__syscommand"; + const string k_Topic_Services = "__srv"; + + const string k_SysCommand_Subscribe = "subscribe"; + const string k_SysCommand_Publish = "publish"; + const string k_SysCommand_RosService = "ros_service"; + const string k_SysCommand_UnityService = "unity_service"; - [Tooltip("If blank, determine IP automatically.")] - public string overrideUnityIP = ""; - - public int unityPort = 5005; - bool alreadyStartedServer = false; - - private int networkTimeout = 2000; - - [Tooltip("While waiting for a service to respond, check this many times before giving up.")] - public int awaitDataMaxRetries = 10; - - [Tooltip("While waiting for a service to respond, wait this many seconds between checks.")] - public float awaitDataSleepSeconds = 1.0f; - - [Tooltip("While reading received messages, read this many bytes at a time.")] - public int readChunkSize = 2048; - - [Tooltip("While waiting to read a full message, check this many times before giving up.")] - public int awaitDataReadRetry = 10; - - [Tooltip("Close connection if nothing has been sent for this long (seconds).")] - public float timeoutOnIdle = 10; + // GUI window variables + internal HUDPanel m_HudPanel = null; - static object _lock = new object(); // sync lock - static List activeConnectionTasks = new List(); // pending connections + ConcurrentQueue> m_OutgoingMessages = new ConcurrentQueue>(); + ConcurrentQueue> m_IncomingMessages = new ConcurrentQueue>(); + CancellationTokenSource m_ConnectionThreadCancellation; - const string ERROR_TOPIC_NAME = "__error"; - const string SYSCOMMAND_TOPIC_NAME = "__syscommand"; - const string HANDSHAKE_TOPIC_NAME = "__handshake"; + static float s_RealTimeSinceStartup = 0.0f;// only the main thread can access Time.realTimeSinceStartup, so make a copy here - const string SYSCOMMAND_SUBSCRIBE = "subscribe"; - const string SYSCOMMAND_PUBLISH = "publish"; + readonly object m_ServiceRequestLock = new object(); + int m_NextSrvID = 101; + Dictionary m_ServicesWaiting = new Dictionary(); - // GUI window variables - internal HUDPanel hudPanel = null; + struct SubscriberCallback + { + public Func messageConstructor; + public List> callbacks; + } - public bool showHUD = true; + Dictionary m_Subscribers = new Dictionary(); - struct SubscriberCallback + struct UnityServiceImplementation { - public ConstructorInfo messageConstructor; - public List> callbacks; + public Func messageConstructor; + public Func callback; } - Dictionary subscribers = new Dictionary(); + Dictionary m_UnityServices = new Dictionary(); public void Subscribe(string topic, Action callback) where T : Message, new() { SubscriberCallback subCallbacks; - if (!subscribers.TryGetValue(topic, out subCallbacks)) + if (!m_Subscribers.TryGetValue(topic, out subCallbacks)) { subCallbacks = new SubscriberCallback { - messageConstructor = typeof(T).GetConstructor(new Type[0]), - callbacks = new List> { } + messageConstructor = () => new T(), + callbacks = new List> { } }; - subscribers.Add(topic, subCallbacks); + m_Subscribers.Add(topic, subCallbacks); } subCallbacks.callbacks.Add((Message msg) => { - callback((T) msg); - return null; + callback((T)msg); }); } public void ImplementService(string topic, Func callback) where T : Message, new() { - SubscriberCallback subCallbacks; - if (!subscribers.TryGetValue(topic, out subCallbacks)) + m_UnityServices[topic] = new UnityServiceImplementation { - subCallbacks = new SubscriberCallback - { - messageConstructor = typeof(T).GetConstructor(new Type[0]), - callbacks = new List> { } - }; - subscribers.Add(topic, subCallbacks); - } - - subCallbacks.callbacks.Add((Message msg) => { return callback((T) msg); }); + messageConstructor = () => new T(), + callback = (Message msg) => callback((T)msg) + }; } - public async void SendServiceMessage(string rosServiceName, Message serviceRequest, - Action callback) where RESPONSE : Message, new() + public async void SendServiceMessage(string rosServiceName, Message serviceRequest, Action callback) where RESPONSE : Message, new() { - // Serialize the message in service name, message size, and message bytes format - byte[] messageBytes = GetMessageBytes(rosServiceName, serviceRequest); - - TcpClient client = new TcpClient(); - await client.ConnectAsync(rosIPAddress, rosPort); - - NetworkStream networkStream = client.GetStream(); - networkStream.ReadTimeout = networkTimeout; - - RESPONSE serviceResponse = new RESPONSE(); - - int serviceID = 0; - - // Send the message + RESPONSE response = await SendServiceMessage(rosServiceName, serviceRequest); try { - if (hudPanel != null) serviceID = hudPanel.AddServiceRequest(rosServiceName, serviceRequest); - networkStream.Write(messageBytes, 0, messageBytes.Length); + callback(response); } catch (Exception e) { - Debug.LogError("SocketException: " + e); - goto finish; - } - - if (!networkStream.CanRead) - { - Debug.LogError("Sorry, you cannot read from this NetworkStream."); - goto finish; - } - - // Poll every 1 second(s) for available data on the stream - int attempts = 0; - while (!networkStream.DataAvailable && attempts <= this.awaitDataMaxRetries) - { - if (attempts == this.awaitDataMaxRetries) - { - Debug.LogError("No data available on network stream after " + awaitDataMaxRetries + " attempts."); - goto finish; - } - - attempts++; - await Task.Delay((int) (awaitDataSleepSeconds * 1000)); + Debug.LogError("Exception in service callback: " + e); } - - try - { - string serviceName; - (string topicName, byte[] content) = await ReadMessageContents(networkStream); - serviceResponse.Deserialize(content, 0); - } - catch (Exception e) - { - Debug.LogError("Exception raised!! " + e); - } - - finish: - callback(serviceResponse); - if (hudPanel != null) hudPanel.AddServiceResponse(serviceID, serviceResponse); - if (client.Connected) - client.Close(); } public async Task SendServiceMessage(string rosServiceName, Message serviceRequest) where RESPONSE : Message, new() { - var t = new TaskCompletionSource(); + byte[] requestBytes = serviceRequest.Serialize(); + TaskPauser pauser = new TaskPauser(); - SendServiceMessage(rosServiceName, serviceRequest, s => t.TrySetResult(s)); + int srvID; + lock (m_ServiceRequestLock) + { + srvID = m_NextSrvID++; + m_ServicesWaiting.Add(srvID, pauser); + } + + MRosUnitySrvMessage srvMessage = new MRosUnitySrvMessage(srvID, true, rosServiceName, requestBytes); + Send(k_Topic_Services, srvMessage); - return await t.Task; + byte[] rawResponse = (byte[])await pauser.PauseUntilResumed(); + RESPONSE result = new RESPONSE(); + result.Deserialize(rawResponse, 0); + return result; } public void GetTopicList(Action callback) @@ -185,13 +161,22 @@ public void GetTopicList(Action callback) public void RegisterSubscriber(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_SUBSCRIBE, - new SysCommand_Subscribe {topic = topic, message_name = rosMessageName}); + SendSysCommand(k_SysCommand_Subscribe, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } public void RegisterPublisher(string topic, string rosMessageName) { - SendSysCommand(SYSCOMMAND_PUBLISH, new SysCommand_Publish {topic = topic, message_name = rosMessageName}); + SendSysCommand(k_SysCommand_Publish, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + } + + public void RegisterRosService(string topic, string rosMessageName) + { + SendSysCommand(k_SysCommand_RosService, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); + } + + public void RegisterUnityService(string topic, string rosMessageName) + { + SendSysCommand(k_SysCommand_UnityService, new SysCommand_TopicAndType { topic = topic, message_name = rosMessageName }); } private static ROSConnection _instance; @@ -227,20 +212,38 @@ void OnEnable() private void Start() { - if(!IPFormatIsCorrect(rosIPAddress)) - Debug.LogError("ROS IP address is not correct"); InitializeHUD(); - Subscribe(ERROR_TOPIC_NAME, RosUnityErrorCallback); + Subscribe(k_Topic_Error, RosUnityErrorCallback); + Subscribe(k_Topic_Services, ProcessServiceMessage); - if (overrideUnityIP != "") + if (ConnectOnStart) { - if(!IPFormatIsCorrect(overrideUnityIP)) - Debug.LogError("Override Unity IP address is not correct"); - StartMessageServer(overrideUnityIP, unityPort); // no reason to wait, if we already know the IP + Connect(); } + } + + public void Connect(string ipAddress, int port) + { + m_RosIPAddress = ipAddress; + m_RosPort = port; + if (m_HudPanel != null) + m_HudPanel.host = $"{ipAddress}:{port}"; + Connect(); + } + + public void Connect() + { + if (!IPFormatIsCorrect(m_RosIPAddress)) + Debug.LogError("ROS IP address is not correct"); - SendServiceMessage(HANDSHAKE_TOPIC_NAME, - new MUnityHandshakeRequest(overrideUnityIP, (ushort) unityPort), RosUnityHandshakeCallback); + m_ConnectionThreadCancellation = new CancellationTokenSource(); + Task.Run(() => ConnectionThread(m_RosIPAddress, m_RosPort, m_NetworkTimeoutSeconds, m_KeepaliveTime, (int)(m_SleepTimeSeconds*1000.0f), m_OutgoingMessages, m_IncomingMessages, m_ConnectionThreadCancellation.Token)); + } + + public void Disconnect() + { + m_ConnectionThreadCancellation.Cancel(); + m_ConnectionThreadCancellation = null; } void OnValidate() @@ -250,21 +253,16 @@ void OnValidate() private void InitializeHUD() { - if (!Application.isPlaying || (!showHUD && hudPanel == null)) + if (!Application.isPlaying || (!m_ShowHUD && m_HudPanel == null)) return; - if (hudPanel == null) + if (m_HudPanel == null) { - hudPanel = gameObject.AddComponent(); - hudPanel.host = $"{rosIPAddress}:{rosPort}"; + m_HudPanel = gameObject.AddComponent(); + m_HudPanel.host = $"{RosIPAddress}:{RosPort}"; } - hudPanel.isEnabled = showHUD; - } - - void RosUnityHandshakeCallback(MUnityHandshakeResponse response) - { - StartMessageServer(response.ip, unityPort); + m_HudPanel.isEnabled = m_ShowHUD; } void RosUnityErrorCallback(MRosUnityError error) @@ -272,211 +270,213 @@ void RosUnityErrorCallback(MRosUnityError error) Debug.LogError("ROS-Unity error: " + error.message); } - /// TcpClient to read byte stream from. - protected async Task HandleConnectionAsync(TcpClient tcpClient) + private void Update() { - await Task.Yield(); + s_RealTimeSinceStartup = Time.realtimeSinceStartup; - // continue asynchronously on another thread - await ReadFromStream(tcpClient.GetStream()); - } + Tuple data; + while (m_IncomingMessages.TryDequeue(out data)) + { + (string topic, byte[] contents) = data; - async Task ReadFromStream(NetworkStream networkStream) - { - if (!networkStream.CanRead) - return; + // notify whatever is interested in this incoming message + SubscriberCallback callback; + if (m_Subscribers.TryGetValue(topic, out callback)) + { + Message message = callback.messageConstructor(); + message.Deserialize(contents, 0); - SubscriberCallback subs; + if (m_HudPanel != null) + m_HudPanel.SetLastMessageReceived(topic, message); - float lastDataReceivedRealTimestamp = 0; - do + callback.callbacks.ForEach(item => item(message)); + } + } + } + + void ProcessServiceMessage(MRosUnitySrvMessage message) + { + if (message.is_request) + { + UnityServiceImplementation service; + if (m_UnityServices.TryGetValue(message.topic, out service)) + { + Message requestMessage = service.messageConstructor(); + requestMessage.Deserialize(message.payload, 0); + Message responseMessage = service.callback(requestMessage); + byte[] responseBytes = responseMessage.Serialize(); + Send(k_Topic_Services, new MRosUnitySrvMessage(message.srv_id, false, message.topic, responseBytes)); + } + } + else { - // try to keep reading messages as long as the networkstream has data. - // But if it's taking too long, don't freeze forever. - float frameLimitRealTimestamp = Time.realtimeSinceStartup + 0.1f; - while (networkStream.DataAvailable && Time.realtimeSinceStartup < frameLimitRealTimestamp) + TaskPauser resumer; + lock (m_ServiceRequestLock) { - if (!Application.isPlaying) + if (!m_ServicesWaiting.TryGetValue(message.srv_id, out resumer)) { - networkStream.Close(); + Debug.LogError($"Unable to route service response on \"{message.topic}\"! SrvID {message.srv_id} does not exist."); return; } - (string topicName, byte[] content) = await ReadMessageContents(networkStream); - lastDataReceivedRealTimestamp = Time.realtimeSinceStartup; + m_ServicesWaiting.Remove(message.srv_id); + } + resumer.Resume(message.payload); + } + } - if (!subscribers.TryGetValue(topicName, out subs)) - continue; // not interested in this topic + static void SendKeepalive(NetworkStream stream) + { + // 8 zeroes = a ros message with topic "" and no message data. + stream.Write(new byte[] { 0, 0, 0, 0, 0, 0, 0, 0 }, 0, 8); + } - Message msg = (Message)subs.messageConstructor.Invoke(new object[0]); - msg.Deserialize(content, 0); + static async Task ConnectionThread( + string rosIPAddress, + int rosPort, + float networkTimeoutSeconds, + float keepaliveTime, + int sleepMilliseconds, + ConcurrentQueue> outgoingQueue, + ConcurrentQueue> incomingQueue, + CancellationToken token) + { + //Debug.Log("ConnectionThread begins"); + int nextReaderIdx = 101; + int nextReconnectionDelay = 1000; - if (hudPanel != null) - hudPanel.SetLastMessageReceived(topicName, msg); + while (!token.IsCancellationRequested) + { + TcpClient client = null; + CancellationTokenSource readerCancellation = null; - foreach (Func callback in subs.callbacks) + try + { + client = new TcpClient(); + client.Connect(rosIPAddress, rosPort); + + NetworkStream networkStream = client.GetStream(); + networkStream.ReadTimeout = (int)(networkTimeoutSeconds * 1000); + + SendKeepalive(networkStream); + + readerCancellation = new CancellationTokenSource(); + _ = Task.Run(() => ReaderThread(nextReaderIdx, networkStream, incomingQueue, sleepMilliseconds, readerCancellation.Token)); + nextReaderIdx++; + + // connected, now just watch our queue for outgoing messages to send (or else send a keepalive message occasionally) + while (true) { - try + Tuple data; + float waitingSinceRealTime = s_RealTimeSinceStartup; + token.ThrowIfCancellationRequested(); + while (!outgoingQueue.TryDequeue(out data)) { - Message response = callback(msg); - if (response != null) + Thread.Sleep(sleepMilliseconds); + if (s_RealTimeSinceStartup > waitingSinceRealTime + keepaliveTime) { - // if the callback has a response, it's implementing a service - WriteDataStaggered(networkStream, topicName, response); + SendKeepalive(networkStream); + waitingSinceRealTime = s_RealTimeSinceStartup; } + token.ThrowIfCancellationRequested(); } - catch (Exception e) - { - Debug.LogError("Subscriber callback problem: " + e); - } + + WriteDataStaggered(networkStream, data.Item1, data.Item2); + } + } + catch (OperationCanceledException) + { + } + catch (Exception e) + { + Debug.Log($"Connection to {rosIPAddress}:{rosPort} failed - " + e); + await Task.Delay(nextReconnectionDelay); + } + finally + { + if (readerCancellation != null) + readerCancellation.Cancel(); + + if (client != null) + client.Close(); + + // clear the message queue + Tuple unused; + while (outgoingQueue.TryDequeue(out unused)) + { } } await Task.Yield(); - } - while (Time.realtimeSinceStartup < lastDataReceivedRealTimestamp + timeoutOnIdle); // time out if idle too long. - networkStream.Close(); + } } - async Task> ReadMessageContents(NetworkStream networkStream) + static async Task ReaderThread(int readerIdx, NetworkStream networkStream, ConcurrentQueue> queue, int sleepMilliseconds, CancellationToken token) { - // Get first bytes to determine length of topic name - byte[] rawTopicBytes = new byte[4]; - networkStream.Read(rawTopicBytes, 0, rawTopicBytes.Length); - int topicLength = BitConverter.ToInt32(rawTopicBytes, 0); - - // Read and convert topic name - byte[] topicNameBytes = new byte[topicLength]; - networkStream.Read(topicNameBytes, 0, topicNameBytes.Length); - string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength); - - byte[] full_message_size_bytes = new byte[4]; - networkStream.Read(full_message_size_bytes, 0, full_message_size_bytes.Length); - int full_message_size = BitConverter.ToInt32(full_message_size_bytes, 0); - - byte[] readBuffer = new byte[full_message_size]; - int bytesRemaining = full_message_size; - int totalBytesRead = 0; - - int attempts = 0; - // Read in message contents until completion, or until attempts are maxed out - while (bytesRemaining > 0 && attempts <= this.awaitDataReadRetry) + while (!token.IsCancellationRequested) { - if (attempts == this.awaitDataReadRetry) + try { - Debug.LogError("No more data to read network stream after " + awaitDataReadRetry + " attempts."); - return Tuple.Create(topicName, readBuffer); + Tuple content = await ReadMessageContents(networkStream, sleepMilliseconds, token); + //Debug.Log($"Message {content.Item1} received"); + queue.Enqueue(content); } - - // Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain - int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(readChunkSize, bytesRemaining)); - totalBytesRead += bytesRead; - bytesRemaining -= bytesRead; - - if (!networkStream.DataAvailable) + catch (OperationCanceledException) + { + } + catch (Exception e) { - attempts++; - await Task.Yield(); + Debug.Log("Reader " + readerIdx + " exception! " + e); } } - return Tuple.Create(topicName, readBuffer); } - /// - /// Handles multiple connections and locks. - /// - /// TcpClient to read byte stream from. - private async Task StartHandleConnectionAsync(TcpClient tcpClient) + static async Task ReadToByteArray(NetworkStream networkStream, byte[] array, int length, int sleepMilliseconds, CancellationToken token) { - var connectionTask = HandleConnectionAsync(tcpClient); - - lock (_lock) - activeConnectionTasks.Add(connectionTask); - - try + int read = 0; + while (read < length && networkStream.CanRead) { - await connectionTask; - // we may be on another thread after "await" - } - catch (Exception ex) - { - Debug.LogError(ex.ToString()); - } - finally - { - lock (_lock) - activeConnectionTasks.Remove(connectionTask); + while (!token.IsCancellationRequested && !networkStream.DataAvailable) + await Task.Delay(sleepMilliseconds); + + token.ThrowIfCancellationRequested(); + read += await networkStream.ReadAsync(array, read, length - read, token); } + + if (read < length) + throw new SocketException(); // the connection has closed } - TcpListener tcpListener; + static byte[] s_FourBytes = new byte[4]; + static byte[] s_TopicScratchSpace = new byte[64]; - protected async void StartMessageServer(string ip, int port) + static async Task> ReadMessageContents(NetworkStream networkStream, int sleepMilliseconds, CancellationToken token) { - if (alreadyStartedServer) - return; - - alreadyStartedServer = true; - while (true) - { - try - { - if (!Application.isPlaying) - break; - tcpListener = new TcpListener(IPAddress.Parse(ip), port); - tcpListener.Start(); + // Get first bytes to determine length of topic name + await ReadToByteArray(networkStream, s_FourBytes, 4, sleepMilliseconds, token); + int topicLength = BitConverter.ToInt32(s_FourBytes, 0); - Debug.Log("ROS-Unity server listening on " + ip + ":" + port); + // If our topic buffer isn't large enough, make a larger one (and keep it that size; assume that's the new standard) + if (topicLength > s_TopicScratchSpace.Length) + s_TopicScratchSpace = new byte[topicLength]; - while (true) //we wait for a connection - { - var tcpClient = await tcpListener.AcceptTcpClientAsync(); + // Read and convert topic name + await ReadToByteArray(networkStream, s_TopicScratchSpace, topicLength, sleepMilliseconds, token); + string topicName = Encoding.ASCII.GetString(s_TopicScratchSpace, 0, topicLength); - var task = StartHandleConnectionAsync(tcpClient); - // if already faulted, re-throw any error on the calling context - if (task.IsFaulted) - await task; + await ReadToByteArray(networkStream, s_FourBytes, 4, sleepMilliseconds, token); + int full_message_size = BitConverter.ToInt32(s_FourBytes, 0); - // try to get through the message queue before doing another await - // but if messages are arriving faster than we can process them, don't freeze up - float abortAtRealtime = Time.realtimeSinceStartup + 0.1f; - while (tcpListener.Pending() && Time.realtimeSinceStartup < abortAtRealtime) - { - tcpClient = tcpListener.AcceptTcpClient(); - task = StartHandleConnectionAsync(tcpClient); - if (task.IsFaulted) - await task; - } - } - } - catch (ObjectDisposedException e) - { - if (!Application.isPlaying) - { - // This only happened because we're shutting down. Not a problem. - } - else - { - Debug.LogError("Exception raised!! " + e); - } - } - catch (Exception e) - { - Debug.LogError("Exception raised!! " + e); - } + byte[] readBuffer = new byte[full_message_size]; + await ReadToByteArray(networkStream, readBuffer, full_message_size, sleepMilliseconds, token); - // to avoid infinite loops, wait a frame before trying to restart the server - await Task.Yield(); - } + return Tuple.Create(topicName, readBuffer); } - private void OnApplicationQuit() + void OnApplicationQuit() { - if (tcpListener != null) - tcpListener.Stop(); - tcpListener = null; + Disconnect(); } - /// /// Given some input values, fill a byte array in the desired format to use with /// https://github.com/Unity-Technologies/Robotics-Tutorials/tree/master/catkin_ws/src/tcp_endpoint @@ -533,13 +533,7 @@ public byte[] GetMessageBytes(string topicServiceName, Message message) return messageBuffer; } - struct SysCommand_Subscribe - { - public string topic; - public string message_name; - } - - struct SysCommand_Publish + struct SysCommand_TopicAndType { public string topic; public string message_name; @@ -547,45 +541,15 @@ struct SysCommand_Publish void SendSysCommand(string command, object param) { - Send(SYSCOMMAND_TOPIC_NAME, new MRosUnitySysCommand(command, JsonUtility.ToJson(param))); + Send(k_Topic_SysCommand, new MRosUnitySysCommand(command, JsonUtility.ToJson(param))); } - public async void Send(string rosTopicName, Message message) + public void Send(string rosTopicName, Message message) { - TcpClient client = null; - try - { - client = new TcpClient(); - await client.ConnectAsync(rosIPAddress, rosPort); - - NetworkStream networkStream = client.GetStream(); - networkStream.ReadTimeout = networkTimeout; + m_OutgoingMessages.Enqueue(new Tuple(rosTopicName, message)); - WriteDataStaggered(networkStream, rosTopicName, message); - } - catch (NullReferenceException e) - { - Debug.LogError("TCPConnector.SendMessage Null Reference Exception: " + e); - } - catch (Exception e) - { - Debug.LogError("TCPConnector Exception: " + e); - } - finally - { - if (client != null && client.Connected) - { - try - { - if (hudPanel != null) hudPanel.SetLastMessageSent(rosTopicName, message); - client.Close(); - } - catch (Exception) - { - //Ignored. - } - } - } + if (m_HudPanel != null) + m_HudPanel.SetLastMessageSent(rosTopicName, message); } /// @@ -601,7 +565,7 @@ public async void Send(string rosTopicName, Message message) /// The network stream that is transmitting the messsage /// The ROS topic or service name that is receiving the messsage /// The ROS message to send to a ROS publisher or service - private void WriteDataStaggered(NetworkStream networkStream, string rosTopicName, Message message) + static void WriteDataStaggered(NetworkStream networkStream, string rosTopicName, Message message) { byte[] topicName = message.SerializeString(rosTopicName); List segments = message.SerializationStatements(); @@ -618,25 +582,25 @@ private void WriteDataStaggered(NetworkStream networkStream, string rosTopicName public static bool IPFormatIsCorrect(string ipAddress) { - if(ipAddress == null || ipAddress == "") + if (ipAddress == null || ipAddress == "") return false; - + // If IP address is set using static lookup tables https://man7.org/linux/man-pages/man5/hosts.5.html - if(Char.IsLetter(ipAddress[0])) + if (Char.IsLetter(ipAddress[0])) { - foreach(Char subChar in ipAddress) + foreach (Char subChar in ipAddress) { - if(!(Char.IsLetterOrDigit(subChar) || subChar == '-'|| subChar == '.')) + if (!(Char.IsLetterOrDigit(subChar) || subChar == '-' || subChar == '.')) return false; } - if(!Char.IsLetterOrDigit(ipAddress[ipAddress.Length - 1])) + if (!Char.IsLetterOrDigit(ipAddress[ipAddress.Length - 1])) return false; return true; } string[] subAdds = ipAddress.Split('.'); - if(subAdds.Length != 4) + if (subAdds.Length != 4) { return false; } diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs new file mode 100644 index 00000000..adff2d09 --- /dev/null +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs @@ -0,0 +1,36 @@ +using System.Collections; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using UnityEngine; + +namespace Unity.Robotics.ROSTCPConnector +{ + public class TaskPauser + { + CancellationTokenSource m_Source = new CancellationTokenSource(); + public object Result { get; private set; } + + public async Task PauseUntilResumed() + { + try + { + while (!m_Source.Token.IsCancellationRequested) + { + await Task.Delay(10000, m_Source.Token); + } + } + catch (TaskCanceledException) + { + + } + return Result; + } + + public void Resume(object result) + { + Result = result; + m_Source.Cancel(); + } + } +} \ No newline at end of file diff --git a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs.meta similarity index 83% rename from com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta rename to com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs.meta index 71d81647..f1b0404a 100644 --- a/com.unity.robotics.ros-tcp-connector/Runtime/MessageGeneration/PregeneratedMessages/RosTcpEndpoint/srv/MUnityHandshakeResponse.cs.meta +++ b/com.unity.robotics.ros-tcp-connector/Runtime/TcpConnector/TaskPauser.cs.meta @@ -1,5 +1,5 @@ fileFormatVersion: 2 -guid: 800428bac01226e4185243abce6a9e34 +guid: d0e0d908abf7adb4c8100dfeebafd18c MonoImporter: externalObjects: {} serializedVersion: 2