Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Proposal]: Add an IAsyncEnumerable<T>.ToEnumerable extension method. #60106

Closed
Tracked by #64599
davidfowl opened this issue Oct 7, 2021 · 22 comments · Fixed by #60363
Closed
Tracked by #64599

[API Proposal]: Add an IAsyncEnumerable<T>.ToEnumerable extension method. #60106

davidfowl opened this issue Oct 7, 2021 · 22 comments · Fixed by #60363
Assignees
Labels
api-approved API was approved in API review, it can be implemented api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Collections in-pr There is an active PR which will close this issue when it is merged
Milestone

Comments

@davidfowl
Copy link
Member

davidfowl commented Oct 7, 2021

EDIT: See #60106 (comment) for an API proposal.

Background and motivation

Attempting to adapt an IAsyncEnumerable<T> to an IEnumerable<T>, Blocking collection seems to be the ideal type. It can be used to generate an enumerable that waits until the IAsyncEnumerable<T> produces items:

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        await foreach (var item in asyncEnumerable)
        {
            list.Add(item);
        }

        list.CompleteAdding();
    }

    _ = AsyncIterate();

    return list.GetConsumingEnumerable();
}

This works well but there's no way to propagate exceptions to the consuming enumerable that may happen when enumerating the IAsyncEnumerable<T>. I'd like to write something like this:

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        try
        {
            await foreach (var item in asyncEnumerable)
            {
                list.Add(item);
            }
            list.CompleteAdding();
        }
        catch(Exception ex)
        {
            list.CompleteAdding(ex);
        }
    }

    _ = AsyncIterate();

    return list.GetConsumingEnumerable();
}

CompleteAdding with an exception would propagate it to the caller and it would throw on synchronous enumeration. The only way to throw an exception today AFAIK is to use a cancellation token but that won't propagate the original exception.

API Proposal

namespace System.Collections.Concurrent
{
    public class BlockingCollection<T> : IEnumerable<T>
    {
        public void CompleteAdding(Exception error);
    }
}

API Usage

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        try
        {
            await foreach (var item in asyncEnumerable)
            {
                list.Add(item);
            }
            list.CompleteAdding();
        }
        catch(Exception ex)
        {
            list.CompleteAdding(ex);
        }
    }

    _ = AsyncIterate();

    return list.GetConsumingEnumerable();
}

Risks

None that I'm aware of.

@davidfowl davidfowl added the api-suggestion Early API idea and discussion, it is NOT ready for implementation label Oct 7, 2021
@dotnet-issue-labeler dotnet-issue-labeler bot added area-System.Collections untriaged New issue has not been triaged by the area owner labels Oct 7, 2021
@ghost
Copy link

ghost commented Oct 7, 2021

Tagging subscribers to this area: @dotnet/area-system-collections
See info in area-owners.md if you want to be subscribed.

Issue Details

Background and motivation

Attempting to adapt an IAsyncEnumerable<T> to an IEnumerable<T>, Blocking collection seems to be the ideal type. It can be used to generate an enumerable that waits until the IAsyncEnumerable<T> produces items:

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        await foreach (var item in asyncEnumerable)
        {
            list.Add(item);
        }

        list.CompleteAdding();
    }

    _ = AsyncIterate();

    return list.GetConsumingEnumerable();
}

This works well but there's no way to propagate exceptions to the consuming enumerable that may happen when enumerating the IAsyncEnumerable<T>. I'd like to write something like this:

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        try
        {
            await foreach (var item in asyncEnumerable)
            {
                list.Add(item);
            }
            list.CompleteAdding();
        }
        catch(Exception ex)
        {
            list.CompleteAdding(ex);
        }
    }

    _ = AsyncIterate();

    return list.GetConsumingEnumerable();
}

CompleteAdding with an exception would propagate it to the caller and it would throw on synchronous enumeration. The only way to throw an exception today AFAIK is to use a cancellation token but that won't propagate the original exception.

API Proposal

namespace System.Collections.Concurrent
{
    public class BlockingCollection<T> : IEnumerable<T>
    {
        public void CompleteAdding(Exception error);
    }
}

API Usage

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        try
        {
            await foreach (var item in asyncEnumerable)
            {
                list.Add(item);
            }
            list.CompleteAdding();
        }
        catch(Exception ex)
        {
            list.CompleteAdding(ex);
        }
    }

    _ = AsyncIterate();

    return list.GetConsumingEnumerable();
}

Risks

None that I'm aware of.

Author: davidfowl
Assignees: -
Labels:

api-suggestion, area-System.Collections, untriaged

Milestone: -

@davidfowl
Copy link
Member Author

cc @stephentoub @tarekgh

@svick
Copy link
Contributor

svick commented Oct 7, 2021

If some loss of performance was acceptable here, then I think a simple modification of the code would make this work:

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> asyncEnumerable)
{
    var list = new BlockingCollection<T>();

    async Task AsyncIterate()
    {
        try
        {
            await foreach (var item in asyncEnumerable)
            {
                list.Add(item);
            }
        }
        finally
        {
            list.CompleteAdding();
        }
    }

    var task = AsyncIterate();

    foreach (var item in list.GetConsumingEnumerable())
    {
        yield return item;
    }
    
    task.GetAwaiter().GetResult();
}

Does this make the proposal less useful?


Also, I don't like the suggested naming: CompleteAdding(Exception) does something quite different from CompleteAdding(), so I think it should have a different name. Inspiration could be taken from TPL Dataflow, where the corresponding methods are called Complete() and Fault(Exception).

@eiriktsarpalis
Copy link
Member

I believe @svick's alternative has the added advantage of delaying async enumeration until blocking enumeration has started. Instead of retrofitting BlockingCollection<T>, wouldn't it perhaps make more sense to add bespoke extension methods for IAsyncEnumerable? For example:

namespace System.Collections.Generic
{

    public static class CollectionExtensions
    {
         public static IEnumerable<T> ToBlockingEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default);
         public static ValueTask<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default);    
    }
}

@davidfowl
Copy link
Member Author

@eiriktsarpalis sure, I wasn't sure about our tolerance for adding extensions to IAsyncEnumerable since we didn't add any to date 😄

@stephentoub
Copy link
Member

stephentoub commented Oct 7, 2021

I'd be more inclined to add a ToEnumerable extension method than to add a CompleteAdding(Exception) to BlockingCollection. We've avoided adding a full set of LINQ methods for IAsyncEnumerable, instead deferring to https://github.com/dotnet/reactive for that, but for helpers for going between core types, e.g. IAsyncEnumerable to IEnumerable, I'm ok with it. (And, just to be pedantic, we actually have added IAsyncEnumerable extension methods 😄 : https://github.com/dotnet/runtime/blob/6195d98cad42bc7ab70729e6c970e2c420b29db1/src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/TaskAsyncEnumerableExtensions.cs)

I don't know, though, that BlockingCollection should be used in the implementation. If you really want these semantics, you can do what @svick has done, but this does effectively change the iteration semantics to be that once you start enumerating, background operations are filling up the collection concurrently with consumption of data, and that's generally not how either enumerables nor async enumerables work. On top of that, BlockingCollection, in large part due to its design of being able to wrap arbitrary collections and delegate to them for a variety of functionality, isn't terribly efficient, in particular from an allocation perspective when data isn't available. If we wanted to add a ToEnumerable method, I'd prefer to see us just doing something like:

public static IEnumerable<T> ToEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default)
{
    IAsyncEnumerator<T> e = source.GetAsyncEnumerator(cancellationToken);
    try
    {
        while (true)
        {
            ValueTask<bool> moveNext = e.MoveNextAsync();
            if (moveNext.IsCompletedSuccessfully ? moveNext.Result : moveNext.AsTask().GetAwaiter().GetResult())
            {
                yield return e.Current;
            }
            else break;            
        }
    }
    finally
    {
        e.DisposeAsync().AsTask().GetAwaiter().GetResult();
    }
}

That keeps everything in the world of "nothing is in flight while not interacting with the enumerator", avoids needing to expose some kind of bounded buffering policy on how many items a backing collection would enable, etc.

We'd also want to consider whether we really want to encourage such blocking behaviors by exposing this as an extension method... might be worth forcing someone to call it via a static just to add a tiny bit more friction. Or something along those lines. Or maybe @eiriktsarpalis's naming suggestion would help.

@davidfowl
Copy link
Member Author

If some loss of performance was acceptable here, then I think a simple modification of the code would make this work:

Somewhat, but I can think of scenarios that are the synchronous version of Channel<T> that would like some form of exception propagation to the consumer without wrapping the enumerator. That's a little cumbersome.

@davidfowl
Copy link
Member Author

That keeps everything in the world of "nothing is in flight while not interacting with the enumerator", avoids needing to expose some kind of bounded buffering policy on how many items a backing collection would enable, etc.

The most obvious implementation is usually the best one 😄.

I don't think it avoids a need to add CompleteAdding with an exception, or some way to push the exception to the caller, unless we're saying that BlockingCollection isn't something we want to modify. If that's the case, what's the blocking version of a Channel<T>?

@stephentoub
Copy link
Member

If that's the case, what's the blocking version of a Channel?

channelReader.ReadAllAsync().ToEnumerable()
😄

@davidfowl
Copy link
Member Author

Hmmm, performance wise, is it the same?

@theodorzoulias
Copy link
Contributor

@davidfowl wouldn't the proposal have side effects on other members of the class? Like the BlockingCollection<T>.Take method for example. What if a consumer was blocked on the Take, while the CompleteAdding(ex) was invoked from another thread?

@davidfowl
Copy link
Member Author

Then Take would throw an exception? What happens when CompletedAdding is called from another thread while Take was blocked?

@Clockwork-Muse
Copy link
Contributor

Then Take would throw an exception? What happens when CompletedAdding is called from another thread while Take was blocked?

OperationCanceledException
The BlockingCollection is empty and has been marked as complete with regards to additions.

... so attempting to have a different exception pop up would likely be considered a breaking change.

@svick
Copy link
Contributor

svick commented Oct 8, 2021

We've avoided adding a full set of LINQ methods for IAsyncEnumerable, instead deferring to https://github.com/dotnet/reactive for that, but for helpers for going between core types, e.g. IAsyncEnumerable to IEnumerable, I'm ok with it.

For the record, that method already exists in dotnet/reactive.

@stephentoub
Copy link
Member

Hmmm, performance wise, is it the same?

We'd need to measure. My gut is serialized throughout is slightly lower but allocation is much better. We also haven't invested in making blocking throughout with tasks super fast; if it's something we wanted to improve, I expect we could. I admit I'm surprised you care ;-)

@eiriktsarpalis
Copy link
Member

eiriktsarpalis commented Oct 8, 2021

I'd support adding a ToBlockingEnumerable extension method as listed in #60106 (comment). The term "Blocking" is technically redundant but should serve to emphasize behavior and discourage use.

@eiriktsarpalis eiriktsarpalis added api-ready-for-review API is ready for review, it is NOT ready for implementation and removed untriaged New issue has not been triaged by the area owner labels Oct 8, 2021
@eiriktsarpalis eiriktsarpalis added this to the 7.0.0 milestone Oct 8, 2021
@eiriktsarpalis eiriktsarpalis self-assigned this Oct 8, 2021
@eiriktsarpalis eiriktsarpalis changed the title [API Proposal]: Add BlockingCollection<T>.CompleteAdding(Exception) [API Proposal]: Add an IAsyncEnumerable<T>.ToEnumerable extension method. Oct 8, 2021
@davidfowl
Copy link
Member Author

I admit I'm surprised you care ;-)

Thought experiment 😄

@terrajobst
Copy link
Member

terrajobst commented Oct 12, 2021

Video

  • We think ToListAsync goes down path of System.Linq.Async. It seems once you want this functionality, you're better off adding a reference to that.
  • System.Linq.Async already has ToEnumerable method (even though it doesn't take a cancellation token), so we'd prefer adding Blocking to avoid any ambiguity errors in case they ever add an overload with a cancellation token. It also puts more attention of the potentially problematic blocking behavior.
namespace System.Threading.Tasks
{
    public static class TaskAsyncEnumerableExtensions
    {
        public static IEnumerable<T> ToBlockingEnumerable<T>(this IAsyncEnumerable<T> source, CancellationToken cancellationToken = default);
    }
}

@terrajobst terrajobst added api-approved API was approved in API review, it can be implemented and removed api-ready-for-review API is ready for review, it is NOT ready for implementation labels Oct 12, 2021
@davidfowl
Copy link
Member Author

cc @ajcvickers about ToListAsynx

@ajcvickers
Copy link
Member

Agree with @terrajobst.

/cc @smitpatel @roji @AndriySvyryd for their thoughts.

@smitpatel
Copy link
Member

Agree with @terrajobst above.
Since EF Core is library about translating queries, we only have added extension methods over IQueryable. Users can convert to an IQueryable to IAsyncEnumerable and use System.Linq.Async library to work with the async enumerable. We do have sugar method for ToListAsync which converts a queryable to list async directly without having to call queryable.AsAsyncEnumerable().ToListAsync() and adding additional reference to System.Linq.Async.

@AndriySvyryd
Copy link
Member

I also agree with @terrajobst.

If every MoveNextAsync call will be executed asynchronously and this extension would be used in a perf-sensitive place then I suspect it would be better to buffer the results in blocks of X size. But this should be relatively rare, so I don't think it's worth to add a specific method for this case to the framework.

@ghost ghost added the in-pr There is an active PR which will close this issue when it is merged label Oct 13, 2021
@ghost ghost locked as resolved and limited conversation to collaborators Nov 17, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-approved API was approved in API review, it can be implemented api-suggestion Early API idea and discussion, it is NOT ready for implementation area-System.Collections in-pr There is an active PR which will close this issue when it is merged
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants