New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit number of concurrent requests #6

Closed
ameoba32 opened this Issue Oct 18, 2017 · 2 comments

Comments

Projects
None yet
2 participants
@ameoba32

ameoba32 commented Oct 18, 2017

Here is a thing, RxPHP allows to make concurrent requests, which is good. So next code will create all connection in parallel. What would be the way to limit number of concurrent connections?

I am trying to think of RxPHP way of doing it, but have no idea.

It seems that this needs to be a feature of RxPHP itself, so it has internal counter and does not create new connection until pool is full.

\Rx\Observable::fromArray(
    [
        'https://www.example.com/',
        'https://www.example.com/',
        'https://www.example.com/',
    ]
)->flatMap(
    function ($url) {
        return \Rx\React\Http::get($url);
    }
@mbonneau

This comment has been minimized.

Show comment
Hide comment
@mbonneau

mbonneau Oct 18, 2017

Member

@ameoba32 - You are wading into the waters of rx backpressure.

Because Rx is a "push" model it is conceivable that the producer could outrun the capacity of the system (or consume more resources than you wish).

There are two possible problems that need to be solved:

  1. When there is just too much information coming in to possibly be able to process.
  2. A temporary spike on the producer side (such as a large array input).

Problem 1 is probably not what you are interested in (you would just need a strategy for dropping things - like throttle or something)

For solving problem 2, we can use the following solution to limit concurrent subscriptions:

Observable::fromArray(
    [
        'http://www.example.com/',
        // .... lots more urls in here
        'http://www.example.com/',
    ]
)
    ->map(
        function ($url) use ($lifecycleWatcher) {
            static $requestNo = 0;

            $requestNo++;

            return \Rx\React\Http::get($url);
        })

    // this takes incoming Observables and splits it into 4
    // concat streams
    ->groupBy(function () {
        static $index = 0;
        return $index++ % 4;
    })
    ->flatMap(function (Observable\GroupedObservable $go) {
        return $go->concatAll();
    })
    
    ->subscribe(function ($x) {
        echo "$x\n";
    });

This solution works reasonably well for its simplicity.

One issue that this solution may experience is that things get "queued" into groups and wait there. This makes it so that if you have an observable waiting on one of the grouped streams when another becomes idle, it can't then switch to the idle stream. Everything will still process fine, just may not always be running at "full capacity".

Let me know if this answers your question.

Member

mbonneau commented Oct 18, 2017

@ameoba32 - You are wading into the waters of rx backpressure.

Because Rx is a "push" model it is conceivable that the producer could outrun the capacity of the system (or consume more resources than you wish).

There are two possible problems that need to be solved:

  1. When there is just too much information coming in to possibly be able to process.
  2. A temporary spike on the producer side (such as a large array input).

Problem 1 is probably not what you are interested in (you would just need a strategy for dropping things - like throttle or something)

For solving problem 2, we can use the following solution to limit concurrent subscriptions:

Observable::fromArray(
    [
        'http://www.example.com/',
        // .... lots more urls in here
        'http://www.example.com/',
    ]
)
    ->map(
        function ($url) use ($lifecycleWatcher) {
            static $requestNo = 0;

            $requestNo++;

            return \Rx\React\Http::get($url);
        })

    // this takes incoming Observables and splits it into 4
    // concat streams
    ->groupBy(function () {
        static $index = 0;
        return $index++ % 4;
    })
    ->flatMap(function (Observable\GroupedObservable $go) {
        return $go->concatAll();
    })
    
    ->subscribe(function ($x) {
        echo "$x\n";
    });

This solution works reasonably well for its simplicity.

One issue that this solution may experience is that things get "queued" into groups and wait there. This makes it so that if you have an observable waiting on one of the grouped streams when another becomes idle, it can't then switch to the idle stream. Everything will still process fine, just may not always be running at "full capacity".

Let me know if this answers your question.

@ameoba32

This comment has been minimized.

Show comment
Hide comment
@ameoba32

ameoba32 Oct 20, 2017

It works like a charm! Thank you Matt. Never thought of groupBy here!

ameoba32 commented Oct 20, 2017

It works like a charm! Thank you Matt. Never thought of groupBy here!

@ameoba32 ameoba32 closed this Oct 20, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment