Skip to content

Threadsafe library #95

@silasalves

Description

@silasalves

Is your feature request related to a problem? Please describe.
My robot's sensors use a library that runs on a different thread than the main thread, causing race conditions with the method void Send(), which throws the following exception:

TCPConnector Exception: System.Net.Sockets.SocketException (0x80004005): Only one usage of each socket address (protocol/network address/port) is normally permitted.

Describe the solution you'd like
It would be great if the ROSConnection class was threadsafe, or if it was possible to have multiple endpoints (one per thread).

Describe alternatives you've considered
I tried using lock() and a Semaphore but that did not help. I am not sure how to proceed.

Update 1: To ensure that only one thread could call the methods I am using (i.e. Send and RegisterPublisher), I created a wrapper that allowed all scripts to store their messages to a queue, and then only one endpoint was calling ProcessPublishers() but that did not work.

I also noticed the errors stats consistently at 75-80 seconds after the beginning of the simulation.

using Unity.Robotics.ROSTCPConnector;
using Unity.Robotics.ROSTCPConnector.MessageGeneration;
using System.Collections.Concurrent;

public class RosWrapper
{
    private static ConcurrentQueue<(string topicName, Message message)> msgBuffer = new ConcurrentQueue<(string, Message)>();

    private static ConcurrentQueue<(string topicName, string messageName)> pubRegBuffer = new ConcurrentQueue<(string, string)>();

    private static readonly object msgBufferLock = new object();

    public void Send(string rosTopicName, Message message)
    {
        msgBuffer.Enqueue((rosTopicName, message));
    }

    public void RegisterPublisher(string rosTopicName, string rosMessageName)
    {
        pubRegBuffer.Enqueue((rosTopicName, rosMessageName));
    }

    public void ProcessPublishers()
    {
        (string topicName, string messageName) regData;
        while(pubRegBuffer.TryDequeue(out regData))
            ROSConnection.instance.RegisterPublisher(regData.topicName, regData.messageName);

        (string topicName, Message message) pubData;
        while(msgBuffer.TryDequeue(out pubData))
            ROSConnection.instance.Send(pubData.topicName, pubData.message);
    }
}

Update 2: I decided to remove all the calls to the instance of ROSConnection and created a very simple "communication class" that just eases access to the message queues. Only one script is now allowed to use ROSConnection.instance. This implementation still does not work. The error continues to show up after 75-80 seconds.

using System.Collections.Concurrent;
using Message = Unity.Robotics.ROSTCPConnector.MessageGeneration.Message;

public class RosWrapper
{
    private static ConcurrentQueue<(string topicName, Message message)> msgBuffer = new ConcurrentQueue<(string, Message)>();

    private static ConcurrentQueue<(string topicName, string messageName)> pubRegBuffer = new ConcurrentQueue<(string, string)>();

    private static readonly object msgBufferLock = new object();

    public static void Send(string rosTopicName, Message message)
    {
        msgBuffer.Enqueue((rosTopicName, message));
    }

    public static void RegisterPublisher(string rosTopicName, string rosMessageName)
    {
        pubRegBuffer.Enqueue((rosTopicName, rosMessageName));
    }

    public static bool TryDequeueSend(out (string topicName, Message message) data)
    {
        return msgBuffer.TryDequeue(out data);
    }

    public static bool TryDequeuePubReg(out (string topicName, string messageName) data)
    {
        return pubRegBuffer.TryDequeue(out data);
    }
}

Update 3: I tried to create a thread dedicated to processing the information, but ROSConnector requires being executed on the main thread:

UnityException: Load can only be called from the main thread.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions