Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Request/Reply? #450

Closed
mattpminer opened this issue Jun 1, 2015 · 4 comments
Closed

Parallel Request/Reply? #450

mattpminer opened this issue Jun 1, 2015 · 4 comments

Comments

@mattpminer
Copy link

I am attempting to evaluate EasyNetQ to replace something that has concurrency issues, but so far no luck. When it gets to the parallel part, the first request times out. Would you expect this to work? What am I missing?

    static void Main(string[] args)
    {
        IEasyNetQLogger logger = new ConsoleLogger();
        bus = RabbitHutch.CreateBus("host=localhost;virtualHost=test;username=guest;password=guest;persistentMessages=false", x => x.Register<IEasyNetQLogger>(_ => logger));
        busResponse = RabbitHutch.CreateBus("host=localhost;virtualHost=test;username=guest;password=guest;persistentMessages=false", x => x.Register<IEasyNetQLogger>(_ => logger));
        busResponse.Respond<EasyNetRequest, EasyNetReply>(request => new EasyNetReply(Guid.NewGuid()));

        // Works
        for (int i = 0; i < 50; i++)
        {
            Method1(i);
        }

        // Doesn't work
        Parallel.For(0, 50, Method1);

    }


    static void Method1(int i)
    {
        busResponse.Request<EasyNetRequest, EasyNetReply>(new EasyNetRequest());
        COMPLETED++;
        Console.WriteLine(i + " " + COMPLETED + "/" + NUM_REQUESTS);
    }
@Pliner
Copy link
Member

Pliner commented Jun 2, 2015

Hi,

I run your snippet and yes I can reproduce it :) Will investigate this critical issue...

@Pliner
Copy link
Member

Pliner commented Jun 2, 2015

ThreadPool.SetMinThreads(100, 100);

This ugly hack makes situation better, but it's not a solution... Will investigate

@micdenny
Copy link
Member

micdenny commented Jun 2, 2015

is the normal behavior of the thread pool, is all about this

When all thread pool threads have been assigned to tasks, the thread pool does not immediately begin creating new idle threads. To avoid unnecessarily allocating stack space for threads, it creates new idle threads at intervals. The interval is currently half a second, although it could change in future versions of the .NET Framework.

ThreadPool.SetMinThreads Method

@micdenny
Copy link
Member

I've tested the snippet and I've just found that could probably related to thread pool limiting the thread creation, but I also saw that Parallel.For is optimized for this scenario, and it is not like simply calling Task.Run, but anyway can suffer of thread pool limiting itself.

this is how it looks like using Parallel.For(0, 1000, Method1) watching the Thread count:

image

on the very beginning there were some block, then the incrementing of the threads was fast with minor slow down.

and this with for (int i = 0; i < 1000; i++) { Task.Run(Method1)... }:

image

in this case the process was completely stuck, the thread pool was in a escalation, and also the rabbitmq connection can drop in that case because of the timer to do the keepalive that uses the threadpool.

Does Parallel.For use one Task per iteration?

Under the covers, Parallel.For tries to use the minimum number of tasks necessary to complete the loop as fast as possible

I'm going to close this issue, if the problem persist feel free to re-open it

I attach the complete source:

using System;
using System.Threading;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Loggers;

namespace ConsoleApplication1
{
    class Program
    {
        private static int COMPLETED = 0;
        private static int NUM_REQUESTS = 0;
        private static IBus bus;
        private static IBus busResponse;

        static void Main(string[] args)
        {
            IEasyNetQLogger logger = new ConsoleLogger();
            bus = RabbitHutch.CreateBus("host=localhost;username=guest;password=guest;persistentMessages=false", x => x.Register<IEasyNetQLogger>(_ => logger));
            busResponse = RabbitHutch.CreateBus("host=localhost;username=guest;password=guest;persistentMessages=false", x => x.Register<IEasyNetQLogger>(_ => logger));
            busResponse.Respond<EasyNetRequest, EasyNetReply>(request => new EasyNetReply(Guid.NewGuid()));

            // Works
            //for (int i = 0; i < 100; i++)
            //{
            //    Method1(i);
            //}

            // Doesn't work
            Parallel.For(0, 1000, Method1);

            //for (int i = 0; i < 1000; i++)
            //{
            //    Task.Run(() => Method1(i));
            //}

            Console.ReadLine();
        }

        static void Method1(int i)
        {
            try
            {
                bus.Request<EasyNetRequest, EasyNetReply>(new EasyNetRequest());
                COMPLETED++;
                Console.WriteLine(i + " " + COMPLETED + "/" + NUM_REQUESTS);
            }
            catch (AggregateException ex)
            {
                Console.WriteLine(ex.InnerException.Message);
            }
        }
    }

    public class EasyNetRequest
    {
    }

    public class EasyNetReply
    {
        public Guid Guid { get; set; }

        public EasyNetReply()
        {

        }

        public EasyNetReply(Guid guid)
        {
            Guid = guid;
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants