-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: BoundedConcurrentQueue<T> #23700
Comments
cc: @kouvel, @tarekgh, @benaadams |
For performance it would be desirable for Would it be acceptable to have the capacity as: (int minCapacity) // Will be rounded up to nearest power of two |
Yes, we would want either to require the value be a power of 2 or be allowed to round it up to one. And I think it's best to require it, as that keeps the doors open in the future to allow for other sizes to be supported. |
This is a vary desirable type for Object pooling (rather than array pooling) One candidate use is "Improve performance of Immutable/SecureObjectPool/AllocFreeConcurrentStack" https://github.com/dotnet/corefx/issues/24337 However; testing wise it would be better to be a more generic type and have specific tests rather than be part of Immutable. Other candidates: Also the ObjectPools in aspnet https://github.com/aspnet/Common/tree/dev/src/Microsoft.Extensions.ObjectPool System.Reflection.Internal.ObjectPool System.Net.Http.HttpConnectionPool System.Data.ProviderBase.DbConnectionPool Also items like
However there is no easy way to do a high performance bounded pool in the framework (hence all the different implementations within the framework itself) Note: the applications for a bounded non-blocking concurrent queue are much bigger than object pooling; its just a common pattern; and its very easy to put an object pool on top (with other semantics like auto-create when empty, clear etc) |
What about implementing |
What would
Do? Throw when full? Can an interface be inserted into an inheritance chain and be binary compat? interface IBoundedCollection<T>
{
bool TryAdd(T item);
bool TryTake(out T item);
}
interface IProducerConsumerCollection<T> : IBoundedCollection<T>, IEnumerable<T>, ICollection
{
void CopyTo(T[] array, int index);
T[] ToArray();
} I assume not; else the |
|
Yes, but in addition to IEnumerable being problematic, CopyTo and ToArray are problematic for the same reasons. |
@stephentoub do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue? namespace System.Collections.Concurrent
{
public sealed class ConcurrentQueue<T>
{
public ConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
...
}
} |
A few concerns:
|
@stephentoub I am not pushing back on the new type but trying to evaluate if it is really needed. is it possible we can have a static method on ConcurrentQueue (e.g. CreateBoundedQueue) and this method will just return an internal instance of BoundedConcurrentQueue (considering it is subclassed of ConcurrentQueue). by doing that we'll provide the same implementation you were willing to have in BoundedConcurrentQueue anyway and will not affect the current ConcurrentQueue implementation. which should address the first and third points you mentioned.
I think we are defining a new behavior so I think it is ok to throw at that time. This is similar to the pattern we have in other classes anyway. My only point in having a new type is, developers, have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints. |
That's a good thing to do 😄
That would require the relevant methods to be made virtual, which can add non-trivial expense.
Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things. |
agree. I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.
Usually, devs can avoid that by just using TryEnque. so it is up to the dev to decide what works better in their scenario. |
If you have a method |
I'm also not understanding why adding such a factory is better in this case. 90% of the surface area of the type would need to throw for the special case, code would need to know how the type it's handed was constructed / what its constraint is, none of the interface implementations would work, etc. What is the benefit over exposing a new type? |
The only reason is what I mentioned which is devs don't have to use 2 types which really doing exact same things. Imagine someone wants to write a method which takes the queue as a parameter. having 2 types means they need to provide 2 methods doing exactly same things. If you think having a factory method on the ConcurrentQueue will complicate the implementation and the behavior, then I am fine with 2 types at that time. |
Is it possible to make educated guess what performance gain could be achived (or even simple perf test using internal ConcurrentQueue.Segment)? |
@4creators current issue is you can't create a bounded queue with In order to get round that you need to wrap it in another structure that does interlocks; but then you are getting a performance hit for the extra overhead - whereas its already behaving in the desired way in the internal |
One case for which I had to create a bounded concurrent queue before was when I had multiple producers and a single consumer that was transferring results from producers over the network. Producers had to be throttled such that they don't cause too much memory to be used while the sole consumer is lagging behind. Would it be beneficial to add For that particular case it may still be benefical to have a different implementation, as it may be possible to have more efficient |
Another requirement for that particular scenario was to bound the queue based on an estimation of memory usage rather than number of items, throttling on some memory usage bound. Would it be useful to allow some way for the user to determine the bound? |
Can't you use |
This is what the
Would that really need to be built-in? I'd think such an estimation could just be accomplished by the user and translated by the user into a count. This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing. |
I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock
I see, I'll take a look
The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.
Yea probably not worth folding that into this implementation |
It does support concurrent enqueues, and it does use a lock, though I think there are ways around that. One of the things I'd like help with in the channels lib is improving the perf of the implementation, including removing some of the locking that's currently being done, e.g. currently the channel implementations all lock on enqueue (most avoid that on dequeue), but we should be able to avoid that in many cases.
You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast? |
From the docs:
So, concurrent enqueues are definitely allowed. |
I see. BlockingCollection then seems to be very similar to BoundedConcurrentQueue in the functionality it provides. I guess it's not possible to have one implementation that does both because of the interfaces that BlockingCollection implements.
The bound could be based on the sum of opaque values that are passed in to Enqueue, which could for instance represent the size of the item being enqueued. BoundedChannel appears to be something close to what I was looking for. |
I think there's more flexibility in that API. Note that a bounded channel is created from a |
Another random thought, if the main use case for BoundedConcurrentQueue is object pooling, where I imagine the order in which items are removed from the queue doesn't matter, why not call it ObjectPool? If we feel that the queue behavior is beneficial there could also be BoundedConcurrentQueue and if we further feel that they could benefit from the same implementation, that could be the case behind the scenes. |
Is it possible to implement an ObjectPool which has less restrictions to be more efficient than a BoundedConcurrentQueue? |
I think we do need to invest in an That said, it's certainly possible there's a better name than the one I proposed. |
FYI: The API review discussion was recorded - see https://youtu.be/BI3iXFT8H7E?t=6317 (5 min duration) Conclusion: We want to wait on @stephentoub in the API review meeting (Feb). |
This isn't blocked anymore? |
Correct, @stephentoub is back, it is unblocked :) |
@karelz is this approved now? |
Nope, it was not critical for 2.1, so we didn't prioritize it. |
@stephentoub: Can we make this a new ctor on the existing |
We could. Here's what the implementation would likely look like: If we went this route, the proposed additional surface area for the existing public class ConcurrentQueue<T>
{
public ConcurrentQueue(int boundedCapacity);
public int? BoundedCapacity { get; }
public bool TryEnqueue(T item);
...
} and there would be an impact on some existing surface area when constructed with this new constructor:
I'm fine with this approach if that's the preferred way to go. |
Personally, I'm fine with the first two bullet points but I don't understand the last. Could you elaborate? Reading the issue description it sounds like peeking could result in marking all data as "preserve" which would block dequeues from removing data and thus could result in a state where enqueues would fail and the collection is stuck. Is copying not an option? There are also some concerns about the second bullet point although I find this acceptable as callers have to opt-in. |
You mean the bullet about ToArray/CopyTo/GetEnumerator/etc.? ConcurrentQueue is implemented as a linked list of circular buffers. If a segment fills up, a new segment is allocated and linked from the previous one, at which point no enqueueing happens into the previous one, only into the new one, and the old one will go away as soon as all items are dequeued from it and no one still has a reference to it. The bounding feature is implemented by only having a single segment, ever, such that if the segment fills up, rather than creating a new one, it simply says "it's full, you can't enqueue", such that TryEnqueue returns false. If you could have multiple segments at the same time, then you don't have a good way to enforce the bound that sized the original segment. The moment the second segment is created, you've now got a bunch of items in the first segment, and any enqueuer can come along and enqueue to the second segment, so you've just increased the capacity / bound artificially and incorrectly. Certain operations also cause a segment to no longer be viable for adds. For example, ConcurrentQueue supports enumerating the contents of the collection, and it does so with snapshot semantics, meaning the moment you call GetEnumerator, you're effectively creating a moment-in-time view of what was in the ConcurrentQueue... subsequent dequeues and enqueues will not affect what's returned from the enumerator. That means that a dequeue can't allow the element being dequeued to be overwritten if there's an active enumerator, because that enumerator will need to be able to get the data from the relevant slot at effectively any time in the future. Which means that once you start enumerating (which includes ToArray, CopyTo, etc.), the segments in the queue become blocked from any future enqueues. In a normal ConcurrentQueue, that's fine, subsequent enqueues will just create a new segment. But in a bounded queue where you're limited to a single segment and can't create additional segments, you're stuck. So, either a) you completely change the implementation and make it much slower to be able to handle this (e.g. all enqueues/dequeues are blocked while any thread is enumerating the queue), b) add additional synchronization to maintain a separate count for the number of items in the queue, c) once you enumerate the queue you say it can never have any more enqueues to it ever, or d) you say that enumerating operations are blocked. And I'm suggesting that (d) is the best option if you want to build this bounding functionality into ConcurrentQueue rather than creating a new type. We could always fall back to (b) in the future if the constraints were prohibitive and we were willing to accept the additional cost (though it would likely add at least some cost even for the non-bounded case, which would be concerning). |
Too many changes in behavior because of a ctor param. |
Just occurred to me... as well as don't enqueue if full behaviour, I have other scenarios where overwrite oldest if full is also desirable (temporal data discarding oldest if behind, live video/data streaming, networking with packetloss) With that in mind; would this work instead? namespace System.Collections.Generic
{
public sealed class CircularQueue<T>
{
public CircularQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool Enqueue(T item); // Returns whether queue was full, so item overwritten
public bool Enqueue(T item, out T discarded); // Additionally returns overwritten item
public bool TryEnqueue(T item);
public bool TryDequeue(out T item);
}
}
namespace System.Collections.Concurrent
{
public sealed class ConcurrentCircularQueue<T>
{
public ConcurrentCircularQueue(int capacity); // capacity must a power of 2 that's >= 2
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool Enqueue(T item); // Returns whether queue was full, so item overwritten
public bool Enqueue(T item, out T discarded); // Additionally returns overwritten item
public bool TryEnqueue(T item);
public bool TryDequeue(out T item);
}
} |
Then more controversially.... public partial sealed class ConcurrentCircularQueue<T>
{
// Completes when item queued (will wait if full)
public Task EnqueueAsync(T item, CancellationToken token = default);
} |
Why isn't Channels sufficient for this, where this exact behavior is already supported? If you have a more efficient implementation that can be used for the bounded implementation in Channels, excellent, please submit that as a PR. I'm not seeing how this can be done efficiently in the current ConcurrentQueue implementation. |
It probably is :) Are you just referring to the Async; or the overwrite oldest on Enqueue? |
Was suggesting alternative is to overwrite instead; however then you'd want a return value saying if it did overwrite and possibly the item that was overwritten. So it still wouldn't fit with the existing api. |
Or the existing implementation... how would overwrite be implemented safely and efficiently without harming the rest of the implementation?
Primarily the former, but also the latter. |
In principle it would be bool Enqueue(T item, out T discard)
{
while (true)
{
if (TryEnqueue(T item))
{
discard = default;
return false;
}
else if (TryEnqueueDequeue(T item, out T discard))
{
return true;
}
}
} Though I will admit Mostly I'm arguing for having a different data-structure rather than changing the behaviour of the existing A new data-struture would be more extensible for other methods that also fit its bounded mode of operation; whereas adding something like "overwrite when full" for |
I'm just not understanding how this is different from I don't believe we should add any kind of asynchronous methods to From my perspective, that means the only additional policy we might want to add would be the one expounded on in this issue, allowing a TryEnqueue to return false if the collection is full. But it does come with the additional constraints, where creating the queue in this mode would make some of the other APIs throw. I don't have a strong preference about which way to go. I think we should add this functionality, it's just a question of whether to:
|
Fine with that
Don't think that's a good option
This is good Having a data structure change mode based on the constructor isn't very good for a consumer of that data structure ; its runtime/convention enforcement/erroring rather than compile time. In some situations its hard to avoid, but this would be just to avoid adding a new type. |
There is a trade-off decision here: how much do we care about the type behaving like a collection (i.e. being enumerable) vs how much we care about performance. @stephentoub added this item as we're using it internally for building object pools. It seems there is an idea to make object pooling more first class. Let's look at this type when we're productizing an object pool to make sure we're building a type that doesn't end up being the worst of all worlds (not quite fast enough, not quite usable enough). |
Can we revisit this issue again? This is a very useful type and is a core building block for something like an object pool (which has policy on top of this). I want to think of this like a synchronous bounded channel with 2 possible behaviors (drop, and drop oldest). |
What do you mean by this? Drop oldest would not be atomic. |
I was commenting on 2 modes expressed on this issue. It seems like the thing to build here is the equivalent of a bounded channel but synchronous (not blocking) with the possibility of adding more options for what to do when those bounds are reached.
We would start with the current mode (DropWrite) and discuss the other modes (DropNewest, DropOldest) |
ConcurrentQueue<T>
is an unbounded, thread-safe queue, where the primary operations are Enqueue and TryDequeue. It's one of the more valuable concurrent collection types. However, unbounded collections aren't always desirable. For example, consider wanting to use a concurrent queue for object pooling. If you want to ensure you don't store more than N objects, this is difficult or impossible to achieve efficiently withConcurrentQueue<T>
, which will automatically grow its storage to store the item being added if there's insufficient room.ConcurrentQueue<T>
, however, is actually implemented as a wrapper for a bounded queue, internally calledConcurrentQueue<T>.Segment
.https://github.com/dotnet/corefx/blob/9c468a08151402a68732c784b0502437b808df9f/src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentQueue.cs#L820
In essence,
ConcurrentQueue<T>.Segment
provides the bounded queue behavior, andConcurrentQueue<T>
layers on top of that unbounded semantics.We should clean up the
Segment
APIs and expose it asThe current implementation is fast, and is able to achieve that speed in part by eschewing some functionality that would weigh it down non-trivially, e.g. enumeration. That's why it doesn't implement any interfaces like
IEnumerable<T>
orIReadOnlyCollection<T>
that would force us to add such behavior and slow it down. This collection would be very specialized and used purely for its ability to have items enqueued and dequeued quickly. (Methods like TryPeek, ToArray, CopyTo, GetEnumerator, etc., all require the ability to look at data in the queue without removing it, and in the current implementation, that requires marking the segment as "preserving for observation", which means nothing will ever be removed from the queue; this has the effect of continuing to allow enqueues until the segment is full, but since dequeues don't end up removing data, at that point nothing further can be enqueued, even if everything is dequeued.ConcurrentQueue<T>
deals with this simply by creating a new segment, but that doesn't work for the segment itself.)EDIT @stephentoub 7/6/2018: See alternate proposal at https://github.com/dotnet/corefx/issues/24365#issuecomment-403074379.
The text was updated successfully, but these errors were encountered: