Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with Dynamic Maximum Concurrency #48

Closed
mattpodwysocki opened this issue Oct 13, 2014 · 4 comments
Closed

Merge with Dynamic Maximum Concurrency #48

mattpodwysocki opened this issue Oct 13, 2014 · 4 comments
Labels
[area] Rx feature request Request for new features or operators.

Comments

@mattpodwysocki
Copy link
Contributor

Copied from https://rx.codeplex.com/workitem/43

Rx defines an overload of the Merge operator with a maxConcurrent parameter that is useful for controlling the level of concurrency through the number of active subscriptions to inner observables; however, the maxConcurrent parameter is static and in some cases it's impossible to write a query that changes the parameter dynamically. Unsubscribing and re-subscribing to Merge won't work for a hot source due to the fact that Merge uses an internal buffer. Unsubscribing will lose all of the inner observables that have already been buffered but aren't yet active, and since the source is hot, this causes permanent data loss.

Consider adding a Merge overload such as the one shown in the following discussion:

http://social.msdn.microsoft.com/Forums/en-US/62743ffd-befd-474a-8f0a-19dbaec7a926/design-question-throttling-hits-on-external-systems

It has the following signature:

IObservable<TSource> Merge<TSource>(this IObservable<IObservable<TSource>> source, IObservable<int> maxConcurrent)

@RxDave
Copy link
Contributor

RxDave commented Nov 7, 2014

clairernovotny pushed a commit that referenced this issue Jun 16, 2016
@akarnokd
Copy link
Collaborator

What should the operator do when a smaller maxConcurrent value is signalled after a larger one? Stop consuming the excess active ones or let them finish and don't subscribe to new ones until the active source count goes below the current limit?

@RxDave
Copy link
Contributor

RxDave commented Jul 21, 2016

Here's the comment from the dynamic Merge operator in the linked discussion:

// When the maximum concurrency value is raised, try to dequeue and subscribe to as many 
// available observables as needed to fill the new bounds.
//
// When the maximum concurrency value is lowered, do not dispose of any active observables.
// 
// Lowering the maximum concurrency value does not imply cancelation; it only implies that 
// future inner observables will be deferred until the number of active observables drops
// sufficiently.  The number of active observables may currently be higher than the current 
// maximum concurrency value, though prematurely disposing of active observables may result 
// in irreversible data loss.

@akarnokd
Copy link
Collaborator

This requires deep changes and possibly done for a lot of merge variants. This is a really old issue, do you still want this or would an implementation with existing building blocks suffice?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
[area] Rx feature request Request for new features or operators.
Projects
None yet
Development

No branches or pull requests

5 participants