New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with Dynamic Maximum Concurrency #48

Open
mattpodwysocki opened this Issue Oct 13, 2014 · 4 comments

Comments

Projects
None yet
5 participants
@mattpodwysocki
Contributor

mattpodwysocki commented Oct 13, 2014

Copied from https://rx.codeplex.com/workitem/43

Rx defines an overload of the Merge operator with a maxConcurrent parameter that is useful for controlling the level of concurrency through the number of active subscriptions to inner observables; however, the maxConcurrent parameter is static and in some cases it's impossible to write a query that changes the parameter dynamically. Unsubscribing and re-subscribing to Merge won't work for a hot source due to the fact that Merge uses an internal buffer. Unsubscribing will lose all of the inner observables that have already been buffered but aren't yet active, and since the source is hot, this causes permanent data loss.

Consider adding a Merge overload such as the one shown in the following discussion:

http://social.msdn.microsoft.com/Forums/en-US/62743ffd-befd-474a-8f0a-19dbaec7a926/design-question-throttling-hits-on-external-systems

It has the following signature:

IObservable<TSource> Merge<TSource>(this IObservable<IObservable<TSource>> source, IObservable<int> maxConcurrent)

@RxDave

This comment has been minimized.

Show comment
Hide comment

onovotny added a commit that referenced this issue Jun 16, 2016

@onovotny onovotny added the [area] Rx label Jun 17, 2016

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Jul 20, 2016

Contributor

What should the operator do when a smaller maxConcurrent value is signalled after a larger one? Stop consuming the excess active ones or let them finish and don't subscribe to new ones until the active source count goes below the current limit?

Contributor

akarnokd commented Jul 20, 2016

What should the operator do when a smaller maxConcurrent value is signalled after a larger one? Stop consuming the excess active ones or let them finish and don't subscribe to new ones until the active source count goes below the current limit?

@RxDave

This comment has been minimized.

Show comment
Hide comment
@RxDave

RxDave Jul 21, 2016

Contributor

Here's the comment from the dynamic Merge operator in the linked discussion:

// When the maximum concurrency value is raised, try to dequeue and subscribe to as many 
// available observables as needed to fill the new bounds.
//
// When the maximum concurrency value is lowered, do not dispose of any active observables.
// 
// Lowering the maximum concurrency value does not imply cancelation; it only implies that 
// future inner observables will be deferred until the number of active observables drops
// sufficiently.  The number of active observables may currently be higher than the current 
// maximum concurrency value, though prematurely disposing of active observables may result 
// in irreversible data loss.
Contributor

RxDave commented Jul 21, 2016

Here's the comment from the dynamic Merge operator in the linked discussion:

// When the maximum concurrency value is raised, try to dequeue and subscribe to as many 
// available observables as needed to fill the new bounds.
//
// When the maximum concurrency value is lowered, do not dispose of any active observables.
// 
// Lowering the maximum concurrency value does not imply cancelation; it only implies that 
// future inner observables will be deferred until the number of active observables drops
// sufficiently.  The number of active observables may currently be higher than the current 
// maximum concurrency value, though prematurely disposing of active observables may result 
// in irreversible data loss.
@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd May 31, 2018

Contributor

This requires deep changes and possibly done for a lot of merge variants. This is a really old issue, do you still want this or would an implementation with existing building blocks suffice?

Contributor

akarnokd commented May 31, 2018

This requires deep changes and possibly done for a lot of merge variants. This is a really old issue, do you still want this or would an implementation with existing building blocks suffice?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment