Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: BoundedConcurrentQueue<T> #23700

Closed
stephentoub opened this issue Oct 1, 2017 · 68 comments
Closed

Proposal: BoundedConcurrentQueue<T> #23700

stephentoub opened this issue Oct 1, 2017 · 68 comments
Labels
api-needs-work API needs work before it is approved, it is NOT ready for implementation area-System.Collections
Milestone

Comments

@stephentoub
Copy link
Member

ConcurrentQueue<T> is an unbounded, thread-safe queue, where the primary operations are Enqueue and TryDequeue. It's one of the more valuable concurrent collection types. However, unbounded collections aren't always desirable. For example, consider wanting to use a concurrent queue for object pooling. If you want to ensure you don't store more than N objects, this is difficult or impossible to achieve efficiently with ConcurrentQueue<T>, which will automatically grow its storage to store the item being added if there's insufficient room.

ConcurrentQueue<T>, however, is actually implemented as a wrapper for a bounded queue, internally called ConcurrentQueue<T>.Segment.
https://github.com/dotnet/corefx/blob/9c468a08151402a68732c784b0502437b808df9f/src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentQueue.cs#L820
In essence, ConcurrentQueue<T>.Segment provides the bounded queue behavior, and ConcurrentQueue<T> layers on top of that unbounded semantics.

We should clean up the Segment APIs and expose it as

namespace System.Collections.Concurrent
{
    public sealed class BoundedConcurrentQueue<T>
    {
        public BoundedConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2

        public int Capacity { get; }
        public int Count { get; }
        public bool IsEmpty { get; }

        public bool TryEnqueue(T item);
        public bool TryDequeue(out T item);
    }
}

The current implementation is fast, and is able to achieve that speed in part by eschewing some functionality that would weigh it down non-trivially, e.g. enumeration. That's why it doesn't implement any interfaces like IEnumerable<T> or IReadOnlyCollection<T> that would force us to add such behavior and slow it down. This collection would be very specialized and used purely for its ability to have items enqueued and dequeued quickly. (Methods like TryPeek, ToArray, CopyTo, GetEnumerator, etc., all require the ability to look at data in the queue without removing it, and in the current implementation, that requires marking the segment as "preserving for observation", which means nothing will ever be removed from the queue; this has the effect of continuing to allow enqueues until the segment is full, but since dequeues don't end up removing data, at that point nothing further can be enqueued, even if everything is dequeued. ConcurrentQueue<T> deals with this simply by creating a new segment, but that doesn't work for the segment itself.)


EDIT @stephentoub 7/6/2018: See alternate proposal at https://github.com/dotnet/corefx/issues/24365#issuecomment-403074379.

@stephentoub
Copy link
Member Author

cc: @kouvel, @tarekgh, @benaadams

@benaadams
Copy link
Member

For performance it would be desirable for boundedCapacity to be a power of 2 (to use & rather than %).

Would it be acceptable to have the capacity as:

(int minCapacity) // Will be rounded up to nearest power of two

@stephentoub
Copy link
Member Author

stephentoub commented Oct 1, 2017

Yes, we would want either to require the value be a power of 2 or be allowed to round it up to one. And I think it's best to require it, as that keeps the doors open in the future to allow for other sizes to be supported.

@benaadams
Copy link
Member

benaadams commented Oct 1, 2017

This is a vary desirable type for Object pooling (rather than array pooling)

One candidate use is "Improve performance of Immutable/SecureObjectPool/AllocFreeConcurrentStack" https://github.com/dotnet/corefx/issues/24337

However; testing wise it would be better to be a more generic type and have specific tests rather than be part of Immutable.

Other candidates:

Also the ObjectPools in aspnet https://github.com/aspnet/Common/tree/dev/src/Microsoft.Extensions.ObjectPool

System.Reflection.Internal.ObjectPool
https://github.com/dotnet/corefx/blob/master/src/System.Reflection.Metadata/src/System/Reflection/Internal/Utilities/ObjectPool%601.cs

System.Net.Http.HttpConnectionPool
https://github.com/dotnet/corefx/blob/master/src/System.Net.Http/src/System/Net/Http/Managed/HttpConnectionPool.cs

System.Data.ProviderBase.DbConnectionPool
https://github.com/dotnet/corefx/blob/master/src/System.Data.SqlClient/src/System/Data/ProviderBase/DbConnectionPool.cs

Also items like SocketAsyncEventArgs docs suggest using a pool for them

Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool.
...
Reuse the context for another operation, put it back in the pool, or discard it.

However there is no easy way to do a high performance bounded pool in the framework (hence all the different implementations within the framework itself)

Note: the applications for a bounded non-blocking concurrent queue are much bigger than object pooling; its just a common pattern; and its very easy to put an object pool on top (with other semantics like auto-create when empty, clear etc)

@svick
Copy link
Contributor

svick commented Oct 2, 2017

What about implementing IProducerConsumerCollection<T>? This type would seemingly fit that interface, except then it would also have to implement IEnumerable<T> and ICollection.

@benaadams
Copy link
Member

What would

void ICollection.Add(T item);

Do? Throw when full?

Can an interface be inserted into an inheritance chain and be binary compat?

interface IBoundedCollection<T>
{
    bool TryAdd(T item);
    bool TryTake(out T item);
}

interface IProducerConsumerCollection<T> : IBoundedCollection<T>, IEnumerable<T>, ICollection
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
}

I assume not; else the IReadOnlyList and IReadOnlyCollection were put in wrong place

@svick
Copy link
Contributor

svick commented Oct 2, 2017

@benaadams

What would void ICollection.Add(T item); do?

IProducerConsumerCollection<T> implements System.Collections.ICollection, but not System.Collections.Generic.ICollection<T>, so there is no Add.

@stephentoub
Copy link
Member Author

This type would seemingly fit that interface

Yes, but in addition to IEnumerable being problematic, CopyTo and ToArray are problematic for the same reasons.

@tarekgh
Copy link
Member

tarekgh commented Oct 2, 2017

@stephentoub do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue?

namespace System.Collections.Concurrent
{
    public sealed class ConcurrentQueue<T>
    {
        public ConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2

        public int Capacity { get; }
        ...
    }
}

@stephentoub
Copy link
Member Author

do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue?

A few concerns:

  1. ConcurrentQueue already exposes all of the APIs I mentioned make it difficult to efficiently implement the core TryEnqueue/TryDequeue. For example, what happens if someone enumerates the queue when the sole bounded segment is 1/3 full? Today in the unbounded case, ConcurrentQueue will just create a new segment, such that subsequent enqueues will go to that. But then if it did that for the bounded case, how would it efficiently managed the bound across data in two different segments?
  2. ConcurrentQueue exposes Enqueue rather than TryEnqueue... what's the behavior of Enqueue if the queue is full?
  3. ConcurrentQueue would need to support both the bounded and unbounded modes. That would make certain operations more expensive, as they'd need to check which mode they were in and act accordingly.

@tarekgh
Copy link
Member

tarekgh commented Oct 2, 2017

@stephentoub I am not pushing back on the new type but trying to evaluate if it is really needed. is it possible we can have a static method on ConcurrentQueue (e.g. CreateBoundedQueue) and this method will just return an internal instance of BoundedConcurrentQueue (considering it is subclassed of ConcurrentQueue). by doing that we'll provide the same implementation you were willing to have in BoundedConcurrentQueue anyway and will not affect the current ConcurrentQueue implementation. which should address the first and third points you mentioned.

ConcurrentQueue exposes Enqueue rather than TryEnqueue... what's the behavior of Enqueue if the queue is full?

I think we are defining a new behavior so I think it is ok to throw at that time. This is similar to the pattern we have in other classes anyway.

My only point in having a new type is, developers, have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints.

@stephentoub
Copy link
Member Author

trying to evaluate if it is really needed

That's a good thing to do 😄

is it possible we can have a static method on ConcurrentQueue (e.g. CreateBoundedQueue) and this method will just return an internal instance of BoundedConcurrentQueue (considering it is subclassed of ConcurrentQueue)

That would require the relevant methods to be made virtual, which can add non-trivial expense.

have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints.

Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things.

@tarekgh
Copy link
Member

tarekgh commented Oct 2, 2017

That would require the relevant methods to be made virtual, which can add non-trivial expense.

agree. I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.

Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things.

Usually, devs can avoid that by just using TryEnque. so it is up to the dev to decide what works better in their scenario.

@stephentoub
Copy link
Member Author

I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.

If you have a method DoWork(ConcurrentQueue<T> queue) that operates on that queue, unless DoWork gets inlined and the call site was able to determine the concrete type of the queue, there's no way the JIT could help avoid the virtuals here.

@stephentoub
Copy link
Member Author

I'm also not understanding why adding such a factory is better in this case. 90% of the surface area of the type would need to throw for the special case, code would need to know how the type it's handed was constructed / what its constraint is, none of the interface implementations would work, etc. What is the benefit over exposing a new type?

@tarekgh
Copy link
Member

tarekgh commented Oct 2, 2017

What is the benefit overexposing a new type?

The only reason is what I mentioned which is devs don't have to use 2 types which really doing exact same things. Imagine someone wants to write a method which takes the queue as a parameter. having 2 types means they need to provide 2 methods doing exactly same things.

If you think having a factory method on the ConcurrentQueue will complicate the implementation and the behavior, then I am fine with 2 types at that time.

@4creators
Copy link
Contributor

Is it possible to make educated guess what performance gain could be achived (or even simple perf test using internal ConcurrentQueue.Segment)?

@benaadams
Copy link
Member

@4creators current issue is you can't create a bounded queue with ConcurrentQueue; calling .Count rather than .IsEmpty causes an allocation; and also you'd have a race condition between .Count and .Enqueue.

In order to get round that you need to wrap it in another structure that does interlocks; but then you are getting a performance hit for the extra overhead - whereas its already behaving in the desired way in the internal Segment without a race or extra overhead.

@kouvel
Copy link
Member

kouvel commented Oct 3, 2017

One case for which I had to create a bounded concurrent queue before was when I had multiple producers and a single consumer that was transferring results from producers over the network. Producers had to be throttled such that they don't cause too much memory to be used while the sole consumer is lagging behind. Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?

For that particular case it may still be benefical to have a different implementation, as it may be possible to have more efficient Dequeue in some cases since there is only one consumer.

@kouvel
Copy link
Member

kouvel commented Oct 3, 2017

Another requirement for that particular scenario was to bound the queue based on an estimation of memory usage rather than number of items, throttling on some memory usage bound. Would it be useful to allow some way for the user to determine the bound?

@svick
Copy link
Contributor

svick commented Oct 3, 2017

@kouvel

Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?

Can't you use BlockingCollection<T> for that already?

@stephentoub
Copy link
Member Author

Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?

This is what the Channel.CreateBounded<T> API from the channels library on corefxlab that's moving to corefx lab provides, or at least the asynchronous version. BlockingCollection<T> already provides that for synchronous.

an estimation of memory usage rather than number of items

Would that really need to be built-in? I'd think such an estimation could just be accomplished by the user and translated by the user into a count. This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing.

@kouvel
Copy link
Member

kouvel commented Oct 3, 2017

Can't you use BlockingCollection for that already?

I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock

Channel.CreateBounded

I see, I'll take a look

I'd think such an estimation could just be accomplished by the user and translated by the user into a count.

The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.

This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing.

Yea probably not worth folding that into this implementation

@stephentoub
Copy link
Member Author

I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock

It does support concurrent enqueues, and it does use a lock, though I think there are ways around that. One of the things I'd like help with in the channels lib is improving the perf of the implementation, including removing some of the locking that's currently being done, e.g. currently the channel implementations all lock on enqueue (most avoid that on dequeue), but we should be able to avoid that in many cases.

The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.

You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast?

@svick
Copy link
Contributor

svick commented Oct 3, 2017

@kouvel

Can't you use BlockingCollection for that already?

I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock

From the docs:

The Dispose method is not thread-safe. All other public and protected members of BlockingCollection<T> are thread-safe and may be used concurrently from multiple threads.

So, concurrent enqueues are definitely allowed.

@kouvel
Copy link
Member

kouvel commented Oct 3, 2017

It does support concurrent enqueues

I see. BlockingCollection then seems to be very similar to BoundedConcurrentQueue in the functionality it provides. I guess it's not possible to have one implementation that does both because of the interfaces that BlockingCollection implements.

You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast?

The bound could be based on the sum of opaque values that are passed in to Enqueue, which could for instance represent the size of the item being enqueued.

BoundedChannel appears to be something close to what I was looking for.

@stephentoub
Copy link
Member Author

BoundedChannel appears to be something close to what I was looking for.

I think there's more flexibility in that API. Note that a bounded channel is created from a BoundedChannelOptions, so if there's something other than a count you think should be provided there, let's chat about it. I currently have dotnet/corefxlab#1805 out to revise the API surface area based on the API review we did.

@kouvel
Copy link
Member

kouvel commented Oct 3, 2017

Another random thought, if the main use case for BoundedConcurrentQueue is object pooling, where I imagine the order in which items are removed from the queue doesn't matter, why not call it ObjectPool? If we feel that the queue behavior is beneficial there could also be BoundedConcurrentQueue and if we further feel that they could benefit from the same implementation, that could be the case behind the scenes.

@kouvel
Copy link
Member

kouvel commented Oct 3, 2017

Is it possible to implement an ObjectPool which has less restrictions to be more efficient than a BoundedConcurrentQueue?

@stephentoub
Copy link
Member Author

stephentoub commented Oct 3, 2017

why not call it ObjectPool?

I think we do need to invest in an ObjectPool<T>, but I don't know that it has the same surface area / implementation as this BoundedConcurrentQueue<T> would. Object pooling often requires varying policies, one of which could be the super simple policy effectively implemented by this queue type, but it's not the only one. I don't think we want to burn the ObjectPool<T> name on this.

That said, it's certainly possible there's a better name than the one I proposed.

@karelz
Copy link
Member

karelz commented Dec 5, 2017

FYI: The API review discussion was recorded - see https://youtu.be/BI3iXFT8H7E?t=6317 (5 min duration)

Conclusion: We want to wait on @stephentoub in the API review meeting (Feb).

@benaadams
Copy link
Member

This isn't blocked anymore?

@karelz
Copy link
Member

karelz commented Feb 14, 2018

Correct, @stephentoub is back, it is unblocked :)

@safern
Copy link
Member

safern commented Mar 12, 2018

@karelz is this approved now?

@karelz
Copy link
Member

karelz commented Mar 12, 2018

Nope, it was not critical for 2.1, so we didn't prioritize it.

@terrajobst
Copy link
Member

@stephentoub: Can we make this a new ctor on the existing ConcurrentQueue<T> type?

@stephentoub
Copy link
Member Author

stephentoub commented Jul 6, 2018

Can we make this a new ctor on the existing ConcurrentQueue<T>?

We could. Here's what the implementation would likely look like:
stephentoub/coreclr@186dc3e

If we went this route, the proposed additional surface area for the existing ConcurrentQueue<T> would be:

public class ConcurrentQueue<T>
{
    public ConcurrentQueue(int boundedCapacity);
    public int? BoundedCapacity { get; }
    public bool TryEnqueue(T item);
    ...
}

and there would be an impact on some existing surface area when constructed with this new constructor:

  • IProducerConsumerCollection<T>.TryAdd would return false if there was no space available. This is in contrast to today where it just calls Enqueue and thus always returns true.
  • Enqueue on a full queue would spin until the item could be added (the alternative would be to throw if we don't want to spin).
  • Most impactfully, ToArray, CopyTo, ICollection.CopyTo, GetEnumerator, and IEnumerable.GetEnumerator would all throw InvalidOperationException.

I'm fine with this approach if that's the preferred way to go.

@stephentoub stephentoub removed their assignment Jul 6, 2018
@terrajobst
Copy link
Member

terrajobst commented Jul 24, 2018

Video

Personally, I'm fine with the first two bullet points but I don't understand the last. Could you elaborate? Reading the issue description it sounds like peeking could result in marking all data as "preserve" which would block dequeues from removing data and thus could result in a state where enqueues would fail and the collection is stuck. Is copying not an option?

There are also some concerns about the second bullet point although I find this acceptable as callers have to opt-in.

@stephentoub
Copy link
Member Author

stephentoub commented Jul 25, 2018

I don't understand the last. Could you elaborate?

You mean the bullet about ToArray/CopyTo/GetEnumerator/etc.? ConcurrentQueue is implemented as a linked list of circular buffers. If a segment fills up, a new segment is allocated and linked from the previous one, at which point no enqueueing happens into the previous one, only into the new one, and the old one will go away as soon as all items are dequeued from it and no one still has a reference to it.

The bounding feature is implemented by only having a single segment, ever, such that if the segment fills up, rather than creating a new one, it simply says "it's full, you can't enqueue", such that TryEnqueue returns false. If you could have multiple segments at the same time, then you don't have a good way to enforce the bound that sized the original segment. The moment the second segment is created, you've now got a bunch of items in the first segment, and any enqueuer can come along and enqueue to the second segment, so you've just increased the capacity / bound artificially and incorrectly.

Certain operations also cause a segment to no longer be viable for adds. For example, ConcurrentQueue supports enumerating the contents of the collection, and it does so with snapshot semantics, meaning the moment you call GetEnumerator, you're effectively creating a moment-in-time view of what was in the ConcurrentQueue... subsequent dequeues and enqueues will not affect what's returned from the enumerator. That means that a dequeue can't allow the element being dequeued to be overwritten if there's an active enumerator, because that enumerator will need to be able to get the data from the relevant slot at effectively any time in the future. Which means that once you start enumerating (which includes ToArray, CopyTo, etc.), the segments in the queue become blocked from any future enqueues. In a normal ConcurrentQueue, that's fine, subsequent enqueues will just create a new segment. But in a bounded queue where you're limited to a single segment and can't create additional segments, you're stuck. So, either a) you completely change the implementation and make it much slower to be able to handle this (e.g. all enqueues/dequeues are blocked while any thread is enumerating the queue), b) add additional synchronization to maintain a separate count for the number of items in the queue, c) once you enumerate the queue you say it can never have any more enqueues to it ever, or d) you say that enumerating operations are blocked.

And I'm suggesting that (d) is the best option if you want to build this bounding functionality into ConcurrentQueue rather than creating a new type. We could always fall back to (b) in the future if the constraints were prohibitive and we were willing to accept the additional cost (though it would likely add at least some cost even for the non-bounded case, which would be concerning).

@omariom
Copy link
Contributor

omariom commented Jul 25, 2018

  • IProducerConsumerCollection.TryAdd would return false if there was no space available.
  • Enqueue on a full queue would spin until the item could be added.
  • Most impactfully, ToArray, CopyTo, ICollection.CopyTo, GetEnumerator, and IEnumerable.GetEnumerator would all throw InvalidOperationException.

Too many changes in behavior because of a ctor param.
This functionality deserves to be a separate type.

@benaadams
Copy link
Member

Just occurred to me... as well as don't enqueue if full behaviour, I have other scenarios where overwrite oldest if full is also desirable (temporal data discarding oldest if behind, live video/data streaming, networking with packetloss)

With that in mind; would this work instead?

namespace System.Collections.Generic
{
    public sealed class CircularQueue<T>
    {
        public CircularQueue(int capacity); // capacity must a power of 2 that's >= 2

        public int Capacity { get; }
        public int Count { get; }
        public bool IsEmpty { get; }

        public bool Enqueue(T item); // Returns whether queue was full, so item overwritten
        public bool Enqueue(T item, out T discarded); // Additionally returns overwritten item
        
        public bool TryEnqueue(T item);
        
        public bool TryDequeue(out T item);
    }
}

namespace System.Collections.Concurrent
{
    public sealed class ConcurrentCircularQueue<T>
    {
        public ConcurrentCircularQueue(int capacity); // capacity must a power of 2 that's >= 2

        public int Capacity { get; }
        public int Count { get; }
        public bool IsEmpty { get; }

        public bool Enqueue(T item); // Returns whether queue was full, so item overwritten
        public bool Enqueue(T item, out T discarded); // Additionally returns overwritten item
        
        public bool TryEnqueue(T item);
        
        public bool TryDequeue(out T item);
    }
}

@benaadams
Copy link
Member

benaadams commented Aug 19, 2018

Then more controversially....

public partial sealed class ConcurrentCircularQueue<T>
{
    // Completes when item queued (will wait if full)
    public Task EnqueueAsync(T item, CancellationToken token = default);
}

@stephentoub
Copy link
Member Author

stephentoub commented Aug 19, 2018

as well as don't enqueue if full behaviour, I have other scenarios where overwrite oldest if full is also desirable
Then more controversially

Why isn't Channels sufficient for this, where this exact behavior is already supported? If you have a more efficient implementation that can be used for the bounded implementation in Channels, excellent, please submit that as a PR. I'm not seeing how this can be done efficiently in the current ConcurrentQueue implementation.

@benaadams
Copy link
Member

Why isn't Channels sufficient for this,

It probably is :)

Are you just referring to the Async; or the overwrite oldest on Enqueue?

@benaadams
Copy link
Member

Enqueue on a full queue would spin until the item could be added (the alternative would be to throw if we don't want to spin).

Was suggesting alternative is to overwrite instead; however then you'd want a return value saying if it did overwrite and possibly the item that was overwritten. So it still wouldn't fit with the existing api.

@stephentoub
Copy link
Member Author

So it still wouldn't fit with the existing api.

Or the existing implementation... how would overwrite be implemented safely and efficiently without harming the rest of the implementation?

Are you just referring to the Async; or the overwrite oldest on Enqueue?

Primarily the former, but also the latter.

@benaadams
Copy link
Member

benaadams commented Aug 20, 2018

In principle it would be

bool Enqueue(T item, out T discard)
{
    while (true)
    {
        if (TryEnqueue(T item))
        {
            discard = default;
            return false;
        }
        else if (TryEnqueueDequeue(T item, out T discard))
        {
            return true;
        }
    }
}

Though I will admit TryEnqueueDequeue is close to step 2 in "How to draw an owl" 😄

image

Mostly I'm arguing for having a different data-structure rather than changing the behaviour of the existing ConcurrentQueue's methods based on a "mode" provided at construction.

A new data-struture would be more extensible for other methods that also fit its bounded mode of operation; whereas adding something like "overwrite when full" for Enqueue at a future date (working out Step 2.) would just make ConcurrentQueue more confusing.

@stephentoub
Copy link
Member Author

A new data-struture would be more extensible for other methods that also fit its bounded mode of operation; whereas adding something like "overwrite when full" for Enqueue at a future date (working out Step 2.)

I'm just not understanding how this is different from Channel.CreateBounded<T>(...), which provides the exact support you're asking for.

I don't believe we should add any kind of asynchronous methods to ConcurrentQueue<T>. Changing the data structure to support a policy of overwriting some element would be complicated and very likely to not be pay-for-play. Changing the data structure to support a policy of dropping anything other than the head would similarly be problematic. It would be relatively simple to add a policy that supported dropping the oldest element, i.e. the next to be dequeued, but that could also be implemented on top just as easily (e.g. while (!TryEnqueue) TryDequeue), and since it would add at least some branching to the implementation, isn't warranted.

From my perspective, that means the only additional policy we might want to add would be the one expounded on in this issue, allowing a TryEnqueue to return false if the collection is full. But it does come with the additional constraints, where creating the queue in this mode would make some of the other APIs throw.

I don't have a strong preference about which way to go. I think we should add this functionality, it's just a question of whether to:

  • Add it to the existing ConcurrentQueue. No new type required, doesn't impact the performance of existing ConcurrentQueue scenarios. But some APIs on ConcurrentQueue will throw if used on the ConcurrentQueue created with certain parameters.
  • Expose a new type. New type required, but exposes only the surface area this functionality can support.

@benaadams
Copy link
Member

I don't believe we should add any kind of asynchronous methods to ...

Fine with that

Add it to the existing ConcurrentQueue... some APIs on ConcurrentQueue will throw if used

Don't think that's a good option

Expose a new type

This is good

Having a data structure change mode based on the constructor isn't very good for a consumer of that data structure ; its runtime/convention enforcement/erroring rather than compile time. In some situations its hard to avoid, but this would be just to avoid adding a new type.

@terrajobst
Copy link
Member

There is a trade-off decision here: how much do we care about the type behaving like a collection (i.e. being enumerable) vs how much we care about performance. @stephentoub added this item as we're using it internally for building object pools. It seems there is an idea to make object pooling more first class. Let's look at this type when we're productizing an object pool to make sure we're building a type that doesn't end up being the worst of all worlds (not quite fast enough, not quite usable enough).

@msftgits msftgits transferred this issue from dotnet/corefx Jan 31, 2020
@msftgits msftgits added this to the 3.0 milestone Jan 31, 2020
@ghost ghost locked as resolved and limited conversation to collaborators Dec 20, 2020
@davidfowl
Copy link
Member

Can we revisit this issue again? This is a very useful type and is a core building block for something like an object pool (which has policy on top of this). I want to think of this like a synchronous bounded channel with 2 possible behaviors (drop, and drop oldest).

@stephentoub
Copy link
Member Author

i want to think of this like a synchronous bounded channel with 2 possible behaviors (drop, and drop oldest

What do you mean by this? Drop oldest would not be atomic.

@davidfowl
Copy link
Member

I was commenting on 2 modes expressed on this issue. It seems like the thing to build here is the equivalent of a bounded channel but synchronous (not blocking) with the possibility of adding more options for what to do when those bounds are reached.

var queue = new BoundedQueue<T>(new BoundedQueueOptions ...)

We would start with the current mode (DropWrite) and discuss the other modes (DropNewest, DropOldest)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-needs-work API needs work before it is approved, it is NOT ready for implementation area-System.Collections
Projects
None yet
Development

No branches or pull requests