Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

best way for multi-threading/concurrency in callee using wampsharp #286

Open
CapAdv opened this issue Feb 11, 2019 · 17 comments
Open

best way for multi-threading/concurrency in callee using wampsharp #286

CapAdv opened this issue Feb 11, 2019 · 17 comments

Comments

@CapAdv
Copy link

CapAdv commented Feb 11, 2019

Hi,
I originally posted this on the crossbario github site, but seems that this is the correct place to post since it concerns wampsharp primarîly:

I am quite new to wampsharp .. am having some probs with the parallelism/concurrency aspect. I create a wampsharp router console app under win 10/C# (.net core 2.2), and run it. Works fine. Then I run another console app which acts as the callee (callee RPC functions are registered using c# reflection). Also fine. Finally I make a third console app which is the caller. It uses C# Parallel.Invoke to fire 3 calls to the callee on different threads. There's one 'long' call to function 'longrunner' which just loops for 5 seconds then returns the length of a string parameter, and a small call 'add2' which adds 2 ints and returns the result. In the parallel invoke, I sleep 2 instances of the call to Add2 for 20 and 10ms so that the 'LongRunner' starts first. On the callee, we see that all callee functions run on the same thread and run sequentially, which is also easy to see from the fact that the whole call group return no data until 'LongRunner' finishes, then all results come back very quickly. I understand that wampsharp's router can use multi-cores (correct?) but obviously that doesnt help if the callee serializes everything.

So , what is the right way to ensure that the short calls don't wait for the 'LongRunner' ? All my RPC functions will be CPU intensive (its computing some financials, so not many connections but heavy cpu processing per RPC call), so not sure about the benefits of the async approach or whether my use of the reflection approach causes the issue. I actually want the RPC calls to fire up different threads somehow so I can utilize the server multi-core architecture. I have read other posts here on similar issues but still dont exactly see the answer. Anyone got any pointers here ? (code below)

// ROUTER
class Program
{
static void Main(string[] args)
{
const string location = "ws://127.0.0.1:8080/";
using (IWampHost host = new DefaultWampHost(location))
{
IWampHostedRealm realm = host.RealmContainer.GetRealmByName("realm1");

        // Host WAMP application components

        host.Open();

        Console.WriteLine("Server is running on " + location);
        Console.ReadLine();
    }
}

}

// CALLEE

namespace WampSharpCallee
{
public interface IArgumentsService
{
[WampProcedure("com.arguments.ping")]
void Ping();

[WampProcedure("com.arguments.add2")]
int Add2(int a, int b);

[WampProcedure("com.arguments.longrunner")]
string LongRunner(string big, int waitMs);          

}

public class ArgumentsService : IArgumentsService
{
public void Ping()
{
}

public int Add2(int a, int b)
{
    Console.WriteLine($"Thread:{Thread.CurrentThread.ManagedThreadId}");  
    return a + b;
}

public string LongRunner(string str, int waitMs)
{
    Console.WriteLine($"Thread:{Thread.CurrentThread.ManagedThreadId}");  
    DateTime d = DateTime.Now;
    while (DateTime.Now < d.Add(TimeSpan.FromMilliseconds(waitMs)))
    {
        int x = 1;
    }

    return str.Length.ToString();
}

}

internal class Program
{
public static void Main(string[] args)
{
const string location = "ws://127.0.0.1:8080/";

    DefaultWampChannelFactory channelFactory = new DefaultWampChannelFactory();

    IWampChannel channel = channelFactory.CreateJsonChannel(location, "realm1");

    Task openTask = channel.Open();

    // await openTask;
    openTask.Wait();

    IArgumentsService instance = new ArgumentsService();

    IWampRealmProxy realm = channel.RealmProxy;

    Task<IAsyncDisposable> registrationTask = realm.Services.RegisterCallee(instance);
    // await registrationTask;
    registrationTask.Wait();

    Console.ReadLine();
}

}

}

// CALLER

namespace MyNamespace
{
public interface IArgumentsService
{
[WampProcedure("com.arguments.ping")]
void Ping();

[WampProcedure("com.arguments.add2")]
int Add2(int a, int b);

[WampProcedure("com.arguments.longrunner")]
string LongRunner(string big, int waitMs);       

}

internal class Program
{
public static void Main(string[] args)
{
DefaultWampChannelFactory factory =
new DefaultWampChannelFactory();

    const string serverAddress = "ws://127.0.0.1:8080/ws";

    IWampChannel channel =
        factory.CreateJsonChannel(serverAddress, "realm1");

    channel.Open().Wait(5000);

    IArgumentsService proxy =
        channel.RealmProxy.Services.GetCalleeProxy<IArgumentsService>();

    Parallel.Invoke(           
        ()=>
        {   Console.WriteLine($"Thread:{System.Threading.Thread.CurrentThread.ManagedThreadId}");         
            Console.WriteLine(proxy.LongRunner(@"sdfsghsf fs f s fs df sf",5000));
        },
        ()=>
            
        {   Console.WriteLine($"Thread:{System.Threading.Thread.CurrentThread.ManagedThreadId}");  
            Thread.Sleep(20);
            Console.WriteLine(proxy.Add2(3,2).ToString());
        },
        () =>
           
        {   Console.WriteLine($"Thread:{System.Threading.Thread.CurrentThread.ManagedThreadId}");  
            Thread.Sleep(10);
            Console.WriteLine(proxy.Add2(3,12).ToString());
        }
    
    );
}

}

@darkl
Copy link
Member

darkl commented Feb 11, 2019 via email

@CapAdv
Copy link
Author

CapAdv commented Feb 11, 2019 via email

@darkl
Copy link
Member

darkl commented Feb 11, 2019 via email

@StevenBonePgh
Copy link
Contributor

Hello @CapAdv! In this case, async/await is your friend, and it must be done on both callee and caller sides. I am oversimplifying this and therefore am less than 100% technically accurate. Think of the websocket as something that blocks. When you call proxy.Add2 you are blocking the WebSocket until it gets the response for that call. Other calls will be likewise waiting for their responses, which can't be delivered until the first one is processing. Again, an oversimplification.

The best way to visualize what is happening is to use two executables and look at the WebSocket traffic using Wireshark (with the Wamp Protocol addin) or simply adding a configured logging framework and looking at the messages. You can see what is happening.

Fix the callees first, by creating a new interface and making your callees return a Task<> and use the async keyword. If we assume that your callee is doing something that is CPU bound, after input validation, you can use Task.Yield() to return the thread the WebSocket notified you on so it can be used, then perform your calculations and return it (This is NOT general advice, use standard async calls for I/O bound which 99.9% of the time is the case). Your caller can continue to use the same call without Task<> so long as the WampProcedure uri is the same. Now look at the trace and note the differences.

Then fix the callers by using the async interface and awaiting the calls. I see that @darkl just pointed you to a good resource. Note the behavior (and the thread ids) in your traces - it will look much different.

@CapAdv
Copy link
Author

CapAdv commented Feb 11, 2019

Hi Steven/darkl. Thanks for the responses, very helpful. I have switched the callee rpc functions to async/await and that does allow the function calls to execute on separate threads, as I wanted. Nice. Reading a couple of other issues here I see mention of 'websocket loop' getting blocked under certain conditions, so we use the async to allow it to continue. That's fine, but it made me ask what happens when, for example, 2 callee's complete at the same time and now need to send large messages/text blocks back to the caller. The callees have run asynchronously but will one block the other whilst the router is passing one of the large messages to the caller, or is there no 'serializing' of messages passed back ? I guess I am asking if the wamp router has a 'single-threaded' or blocking aspect to it that will persist even when all callees and callers run async ?

@darkl
Copy link
Member

darkl commented Feb 11, 2019

Once the callee is done with an invocation, it creates a YIELD message which is queued to a message queue. Once this message is processed, it is serialized to the required payload and sent to the router. On the router side, each client has its own message queue and messages are processed sequentially per client (simultaneously for all clients).

Elad

@CapAdv
Copy link
Author

CapAdv commented Feb 12, 2019

I took some time to implement the suggestions of @StevenBonePgh to make both caller and callees async (i think its correct??), so there was no noticeable blocking any more. That worked, so I think the callee's can run on seperate threads whilst freeing the initial thread to service other calls. Now I modifed the callee RPC functions to return either a short (1mb) or long (500mb) string. When firing these as parallel tasks (no Parallel.Invoke), if the long one starts returning its string first (controlled by making the short one wait a few milliseconds before returning the string), the short one seems to get stuck behind the long, which results in a similar blocking scenario as I had previously. If I start two Caller processes doing the same thing, they also seem to get stuck behind eachother. Could anyone comment as to whether my code (attached) is incorrect or whether the wampsharp router does indeed bottleneck on its payload:

ProgramCallee.txt
ProgramCaller.txt

@Jopie64
Copy link

Jopie64 commented Feb 12, 2019

Here's my take on this:
Wamp is able to multiplex multiple outstanding commands over a single TCP session. It is not able to multiplex the payload from multiple command responses, it does this sequential.

If you want to return massive payloads, I suggest you do this by using the progressive RPC interface. You can divide your payload in multiple smaller parts and return them one by one. This way other command responses have a chance to be sent in between them.

@CapAdv
Copy link
Author

CapAdv commented Feb 12, 2019

Hi @Jopie64 . Thanks. Certainly looks like wamp doesnt mulitplex the payload, my large payload example was a test case, but I guess many smaller payloads would also eventually cause the same effect. I just checked-out the progress RPC interface, which I suppose will allow smaller requests to get completed in less elapsed time. Still, based on watching cpu utilization of the router under load, it didnt seem to use more than one core, which concurs with your statement. That means the progressive rpc will let multiple payloads share the sequential payload processing, but total throughput would be the same. Needs testing, will get on it.

@darkl
Copy link
Member

darkl commented Feb 12, 2019

@CapAdv, as I explained, the response are serialized and sent to each client sequentially, i.e. one at a time (per client, it does happen simultaneously for all clients). This is the expected behavior, otherwise the client will receive messages out of order.

You might want to check with other router implementations, such as crossbar.io. These might have a different behavior.

@CapAdv
Copy link
Author

CapAdv commented Feb 13, 2019

Hi @darkl. Ok but what is a 'client' exactly ? If I run 2 console applications from a workstation, do I have 2 clients (the apps) or 1 (the machine/network interface) ? I ask because if I start 2 dotnet core console apps on the same workstation (router and callee console apps also on same workstation), then the requests get executed in series, even when callee and caller are written as async, which doesn't match what you wrote above.

(fyi - I looked t crossbar.io previously. It exhibited similar behaviour, which is what brought me here wondering if making my own wamp router would get around the issue. actually even on linux/debian i noticed that the router never used more than 1 core.. so there's a bottleneck somewhere)

@darkl
Copy link
Member

darkl commented Feb 13, 2019 via email

@darkl
Copy link
Member

darkl commented Feb 13, 2019

I looked at your code and debugged it a bit. What's happening there is that you call a procedure that runs shortly, but returns a long string, and then call a procedure that runs longer, but returns a short string. What happens is that the callee sends a large string to the router. This is the first string that is processed in the queue. Then the callee sends a shorter string. It takes time to send the long string to the router, and therefore the short string waits for the long string in the queue. The long string is received first to the router and therefore is been processed first and its corresponding RESULT message is sent first to caller. The short string is processed after that, since it arrives later to the queue. This is consistent with what I explained about messages being processed sequentially.

I think that a scenario that demonstrates this situation better is the following: call a long running procedure that returns a long string, and then call a short running procedure that returns a short string. Then the short string will be returned to the client first.

BTW, your caller is not async at all. I edited it, you can see the modified file here.

Elad

@CapAdv
Copy link
Author

CapAdv commented Feb 13, 2019

Hi @darkl . Yes, the caller wasn't async. I updated it according to your changes and have tested the long/short combination that you mentioned (on 2 separate caller processes so certainly separate wamp channels). The short does return before the long, even though it was started afterwards, so now the behavior is as expected/desired. I do see though that the short (100 byte) string, which returns more or less instantly when run alone, takes several seconds to return whilst the long (500mb) is running, but I have not further investigated yet. Thanks so much for the input.

@golukas
Copy link

golukas commented Apr 3, 2019

Hi @CapAdv , @darkl . I expoerimented with your examples and noticed that short string returning instantly, but long on takes ages - I have reduced to 50mb it took ~3min to execute. What can be wrong?

@StevenBonePgh
Copy link
Contributor

StevenBonePgh commented Apr 3, 2019

@golukas, I think you need to reevaluate what it is you are trying to do with this and reconcile this with what is happening under the hood.

WebSockets in general are message based, and WAMP running over that essentially means that you need the entire WAMP message plus the data you are sending in the payload of the WAMP message in RAM to parse out the data from the message. I think you will find much better performance by breaking your data down into smaller messages, and assuring you are using msgpack to minimize the data further.

In WampSharp, progressive calling may be an option to make it more palatable, but keep in mind that you stand a non-zero chance of getting the last progress message following the completion of the progressive call. I use this technique in a diagnostic capacity to remotely pull a log file from a Xamarin device while still being able to make/receive other calls.

If you don't need to use WAMP at all, take a look at some of the RFCs related to WebSockets and message fragmentation, or perhaps other means of communication that are better suited to transferring large amounts of data.

@golukas
Copy link

golukas commented Apr 3, 2019

Thanks @StevenBonePgh, that makes sense. My goal is to have similar aproach to WAMP Callee-Router-Caller pattern, but only for console apps in distributed Clients (messages max 10Mb). I'm also considering ZeroMQ, but it has a weekness that need special port to open on Clients machines. Any other suggestions would be appreciated..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants