Champion "Async Streams" (including async disposable) #43

Open
gafter opened this Issue Feb 9, 2017 · 61 comments

Comments

Projects
None yet
@gafter
Member

gafter commented Feb 9, 2017

See also dotnet/roslyn#261

LDM notes:

@gafter

This comment has been minimized.

Show comment
Hide comment
@gafter

gafter Feb 15, 2017

Member

This would be something like

interface IAsyncEnumerable<T>
{
        IAsyncEnumerator<T> GetEnumerator();
}
interface IAsyncEnumerator<T>: IAsyncDisposable
{
        Task<bool> MoveNextAsync();
        T Current { get; }
}

With the following elements yet to be designed

  • IAsyncDisposable
  • async foreach loops
  • Declaring async iterator methods
  • Some kind of query expression support
Member

gafter commented Feb 15, 2017

This would be something like

interface IAsyncEnumerable<T>
{
        IAsyncEnumerator<T> GetEnumerator();
}
interface IAsyncEnumerator<T>: IAsyncDisposable
{
        Task<bool> MoveNextAsync();
        T Current { get; }
}

With the following elements yet to be designed

  • IAsyncDisposable
  • async foreach loops
  • Declaring async iterator methods
  • Some kind of query expression support

@gafter gafter changed the title from Champion "Async Streams" to Champion "Async Streams" (including async disposable) Feb 15, 2017

@gafter gafter added this to the 8.0 candidate milestone Feb 22, 2017

@onovotny

This comment has been minimized.

Show comment
Hide comment
@onovotny

onovotny Feb 23, 2017

Member

@gafter I thought we settled on

interface IAsyncEnumerable<T>
{
        IAsyncEnumerator<T> GetEnumerator(CancellationToken cancellation);
}

And potentially

static class AsyncEnumerableExtensions
{
    static IAsyncEnumerator<T> GetEnumerator(this IAsyncEnumerable<T> source) => source.GetEnumerator(CancellationToken.None);
}
Member

onovotny commented Feb 23, 2017

@gafter I thought we settled on

interface IAsyncEnumerable<T>
{
        IAsyncEnumerator<T> GetEnumerator(CancellationToken cancellation);
}

And potentially

static class AsyncEnumerableExtensions
{
    static IAsyncEnumerator<T> GetEnumerator(this IAsyncEnumerable<T> source) => source.GetEnumerator(CancellationToken.None);
}
@juepiezhongren

This comment has been minimized.

Show comment
Hide comment
@juepiezhongren

juepiezhongren May 11, 2017

After the appearance of "IAsyncEnumerable" in Azure Service Fabric, we want more.........
We want the entire async implementation of .net collection BCL part!!!!!!
AsyncLinq: SelectAsync... AsyncParalellLinq: AsAsyncParalell... await yield!!!!!

As ecmascript is proposing with generator async, there is to be the fulfilment in .net!
And with what i have mentioned being implemented, .net(C# and F#) is going to be the most developer-friendly language for big-data, and service-fabric is gonna boosting a completely new ecosystem with a firm asynchronous programming foundation.

wish these come true earlier!!!!

After the appearance of "IAsyncEnumerable" in Azure Service Fabric, we want more.........
We want the entire async implementation of .net collection BCL part!!!!!!
AsyncLinq: SelectAsync... AsyncParalellLinq: AsAsyncParalell... await yield!!!!!

As ecmascript is proposing with generator async, there is to be the fulfilment in .net!
And with what i have mentioned being implemented, .net(C# and F#) is going to be the most developer-friendly language for big-data, and service-fabric is gonna boosting a completely new ecosystem with a firm asynchronous programming foundation.

wish these come true earlier!!!!

@oliverjanik

This comment has been minimized.

Show comment
Hide comment
@oliverjanik

oliverjanik Jun 2, 2017

Any news on this? Seems this interface has been defined in several places already (IX, EF7)

Any news on this? Seems this interface has been defined in several places already (IX, EF7)

@benaadams

This comment has been minimized.

Show comment
Hide comment
@benaadams

benaadams Aug 22, 2017

As brought up in roslyn thread

interface IAsyncEnumerator<T>: IAsyncDisposable
{
    Task<bool> MoveNextAsync();
    T Current { get; }
}

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system (e.g. double gets on Current or missed items on Current)

As brought up in roslyn thread

interface IAsyncEnumerator<T>: IAsyncDisposable
{
    Task<bool> MoveNextAsync();
    T Current { get; }
}

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system (e.g. double gets on Current or missed items on Current)

@IvanKonov

This comment has been minimized.

Show comment
Hide comment
@IvanKonov

IvanKonov Aug 22, 2017

Maybe then:

interface IAsyncEnumerator<T> : IAsyncDisposable
{
    Task<bool> GetNextAsync(out T next);;
    Task<T> Current { get; }
}

IvanKonov commented Aug 22, 2017

Maybe then:

interface IAsyncEnumerator<T> : IAsyncDisposable
{
    Task<bool> GetNextAsync(out T next);;
    Task<T> Current { get; }
}
@HaloFour

This comment has been minimized.

Show comment
Hide comment
@HaloFour

HaloFour Aug 22, 2017

Contributor

An async method can't have out parameters.

Contributor

HaloFour commented Aug 22, 2017

An async method can't have out parameters.

@IvanKonov

This comment has been minimized.

Show comment
Hide comment
@IvanKonov

IvanKonov Aug 22, 2017

Another version:

interface IAsyncEnumerator<T> : IAsyncDisposable
{
    Task<(bool has_value, T next)> GetNextAsync();
    Task<T> Current { get; }
}

Another version:

interface IAsyncEnumerator<T> : IAsyncDisposable
{
    Task<(bool has_value, T next)> GetNextAsync();
    Task<T> Current { get; }
}
@onovotny

This comment has been minimized.

Show comment
Hide comment
@onovotny

onovotny Aug 22, 2017

Member

How about just adding out to async methods? That would make this far cleaner :trollface:

Member

onovotny commented Aug 22, 2017

How about just adding out to async methods? That would make this far cleaner :trollface:

@vladd

This comment has been minimized.

Show comment
Hide comment
@vladd

vladd Aug 22, 2017

@IvanKonov This definition would make IAsyncEnumerator<T> not covariant by T.

vladd commented Aug 22, 2017

@IvanKonov This definition would make IAsyncEnumerator<T> not covariant by T.

@benaadams

This comment has been minimized.

Show comment
Hide comment
@benaadams

benaadams Aug 22, 2017

An async method can't have out parameters.

but

Task<bool> GetNextAsync(out T next);

isn't async; its just Task<bool> returning, and you can await with out params so no problems? 😉

async Task DoThing()
{
    var enumerator = new Enumerator<int>();
    while(await enumerator.GetNextAsync(out var next))
    {
        // Do thing with next
    }
}

struct Enumerator<T>
{
    public Task<bool> GetNextAsync(out T next)
    {
        next = default(T);
        return Task.FromResult(false);
    }
}

An async method can't have out parameters.

but

Task<bool> GetNextAsync(out T next);

isn't async; its just Task<bool> returning, and you can await with out params so no problems? 😉

async Task DoThing()
{
    var enumerator = new Enumerator<int>();
    while(await enumerator.GetNextAsync(out var next))
    {
        // Do thing with next
    }
}

struct Enumerator<T>
{
    public Task<bool> GetNextAsync(out T next)
    {
        next = default(T);
        return Task.FromResult(false);
    }
}
@yaakov-h

This comment has been minimized.

Show comment
Hide comment
@yaakov-h

yaakov-h Aug 22, 2017

Contributor

Doesn't the out param have to be set before the task is returned? At that point, the task will most likely not be completed, so you won't have a "next" to pass as the out param. Then, by the time you have your next, it's too late to fill the out param.

Contributor

yaakov-h commented Aug 22, 2017

Doesn't the out param have to be set before the task is returned? At that point, the task will most likely not be completed, so you won't have a "next" to pass as the out param. Then, by the time you have your next, it's too late to fill the out param.

@benaadams

This comment has been minimized.

Show comment
Hide comment
@benaadams

benaadams Aug 22, 2017

Doesn't the out param have to be set before the task is returned?

😢 I'll go with @onovotny's suggestion instead then #43 (comment)

Doesn't the out param have to be set before the task is returned?

😢 I'll go with @onovotny's suggestion instead then #43 (comment)

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Aug 22, 2017

Member

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system

While true, if you have a source that supports multiple readers, you can also simply change the model to one where each reader is given its own enumerator. Then the concurrency issues evaporate / are left up to the coordinator that's giving the items out to the individual enumerators.

I'll go with @onovotny's suggestion instead

I don't understand the suggestion. When I do:

Task<bool> t = enumerator.GetMoveNext(out T result);
Use(result);
await t;

how is that intended to work?

covariant

Note that most of this was already discussed in dotnet/roslyn#261.

Member

stephentoub commented Aug 22, 2017

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system

While true, if you have a source that supports multiple readers, you can also simply change the model to one where each reader is given its own enumerator. Then the concurrency issues evaporate / are left up to the coordinator that's giving the items out to the individual enumerators.

I'll go with @onovotny's suggestion instead

I don't understand the suggestion. When I do:

Task<bool> t = enumerator.GetMoveNext(out T result);
Use(result);
await t;

how is that intended to work?

covariant

Note that most of this was already discussed in dotnet/roslyn#261.

@onovotny

This comment has been minimized.

Show comment
Hide comment
@onovotny

onovotny Aug 22, 2017

Member

I was trolling, but what if using out parameters with Task required use of await? Then the assignment order is guaranteed.

Member

onovotny commented Aug 22, 2017

I was trolling, but what if using out parameters with Task required use of await? Then the assignment order is guaranteed.

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Aug 22, 2017

Member

I was trolling, but what if using out parameters with Task required use of await? Then the assignment order is guaranteed.

a) that would be a breaking change, b) there are actually scenarios where you want to access the out value after the synchronous return of the method, and c) anything can be made awaitable, so task isn't special in this regard.

Member

stephentoub commented Aug 22, 2017

I was trolling, but what if using out parameters with Task required use of await? Then the assignment order is guaranteed.

a) that would be a breaking change, b) there are actually scenarios where you want to access the out value after the synchronous return of the method, and c) anything can be made awaitable, so task isn't special in this regard.

@yaakov-h

This comment has been minimized.

Show comment
Hide comment
@yaakov-h

yaakov-h Aug 22, 2017

Contributor

@stephentoub is there a Cliffs Notes version of that thread?

Contributor

yaakov-h commented Aug 22, 2017

@stephentoub is there a Cliffs Notes version of that thread?

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Aug 22, 2017

Member

In the near future I'm planning to take some time to write down where we are, which would include summarizing salient points from that thread.

Member

stephentoub commented Aug 22, 2017

In the near future I'm planning to take some time to write down where we are, which would include summarizing salient points from that thread.

@scottt732

This comment has been minimized.

Show comment
Hide comment
@scottt732

scottt732 Aug 22, 2017

Have you flushed out how CancellationTokens would work here? Is the intent to explicitly call GetEnumerator... something like foreach (await var item in ae.GetEnumerator(cancellationToken)) { ... } ?

Task<bool> MoveNextAsync(CancellationToken cancellationToken) in the interface feels more consistent, but I gather that would require additional language changes to accommodate in foreach/while, LINQ query/method syntax.

Would it be possible to get the IEnumerable/LINQ and IObservable/Rx extension methods wired up to IAsyncEnumerable? Do IAsyncQueryable/IAsyncQbservable factor in to the plans at all?

scottt732 commented Aug 22, 2017

Have you flushed out how CancellationTokens would work here? Is the intent to explicitly call GetEnumerator... something like foreach (await var item in ae.GetEnumerator(cancellationToken)) { ... } ?

Task<bool> MoveNextAsync(CancellationToken cancellationToken) in the interface feels more consistent, but I gather that would require additional language changes to accommodate in foreach/while, LINQ query/method syntax.

Would it be possible to get the IEnumerable/LINQ and IObservable/Rx extension methods wired up to IAsyncEnumerable? Do IAsyncQueryable/IAsyncQbservable factor in to the plans at all?

@Igorbek

This comment has been minimized.

Show comment
Hide comment
@Igorbek

Igorbek Aug 23, 2017

discriminated unions would be useful here.

Igorbek commented Aug 23, 2017

discriminated unions would be useful here.

@Lukazoid

This comment has been minimized.

Show comment
Hide comment
@Lukazoid

Lukazoid Sep 6, 2017

If Nullable reference types landed, could you not just have

Task<T?> GetNextAsync()

Where null is returned for no next item.

Lukazoid commented Sep 6, 2017

If Nullable reference types landed, could you not just have

Task<T?> GetNextAsync()

Where null is returned for no next item.

@yaakov-h

This comment has been minimized.

Show comment
Hide comment
@yaakov-h

yaakov-h Sep 6, 2017

Contributor

@Lukazoid but what if null is the next item?

e.g. new string[] { "a", "b", null, "d" } is a perfectly valid enumerable sequence, and I'd expect it to remain valid with asynchronous sequences too.

Contributor

yaakov-h commented Sep 6, 2017

@Lukazoid but what if null is the next item?

e.g. new string[] { "a", "b", null, "d" } is a perfectly valid enumerable sequence, and I'd expect it to remain valid with asynchronous sequences too.

@Lukazoid

This comment has been minimized.

Show comment
Hide comment
@Lukazoid

Lukazoid Sep 6, 2017

@yaakov-h Well your array would actually be new string?[]{ .. } with the nullable reference types, T would be string? so the result of GetNextAsync would be string??, but thinking about it, I guess that probably isn't allowed in the same way int?? isn't allowed.

But if it were, you would use it like so:

var possibleNextItem = await asyncEnumerator.GetNextAsync();
if (possibleNextItem.HasValue) {

    // nextItem could be null if T were a nullable type
    var nextItem = possibleNextItem.Value;
}

It was just a thought which I personally feel is much cleaner than having two values, having it baked into the type makes more sense. As @Igorbek says, discriminated unions would also solve this cleanly.

Lukazoid commented Sep 6, 2017

@yaakov-h Well your array would actually be new string?[]{ .. } with the nullable reference types, T would be string? so the result of GetNextAsync would be string??, but thinking about it, I guess that probably isn't allowed in the same way int?? isn't allowed.

But if it were, you would use it like so:

var possibleNextItem = await asyncEnumerator.GetNextAsync();
if (possibleNextItem.HasValue) {

    // nextItem could be null if T were a nullable type
    var nextItem = possibleNextItem.Value;
}

It was just a thought which I personally feel is much cleaner than having two values, having it baked into the type makes more sense. As @Igorbek says, discriminated unions would also solve this cleanly.

@yaakov-h

This comment has been minimized.

Show comment
Hide comment
@yaakov-h

yaakov-h Sep 6, 2017

Contributor

@Lukazoid string? is not a separate CLR type from string under the nullable references proposal. string?? would be meaningless/impossible.

Contributor

yaakov-h commented Sep 6, 2017

@Lukazoid string? is not a separate CLR type from string under the nullable references proposal. string?? would be meaningless/impossible.

@stephentoub

This comment has been minimized.

Show comment
Hide comment
@stephentoub

stephentoub Sep 6, 2017

Member

If Nullable reference types landed, could you not just have

And not without losing covariance. This is discussed here:
https://github.com/dotnet/csharplang/blob/master/proposals/async-streams.md

Member

stephentoub commented Sep 6, 2017

If Nullable reference types landed, could you not just have

And not without losing covariance. This is discussed here:
https://github.com/dotnet/csharplang/blob/master/proposals/async-streams.md

@asyncawaitmvvm

This comment has been minimized.

Show comment
Hide comment
@asyncawaitmvvm

asyncawaitmvvm Sep 19, 2017

Have you flushed out how CancellationTokens would work here? Is the intent to explicitly call GetEnumerator... something like foreach (await var item in ae.GetEnumerator(cancellationToken)) { ... } ?

You have to abstract that operation at a higher level. The IAsyncEnumerableT is only the final piece.

foreach (await var item in ae.EnumThingsAsync(cancellationToken)) { ... }

public IAsyncEnumerableT EnumThingsAsync(
CancellationToken ct = default(CancellationToken))
{
return new ThingEnumerable(ct);
}

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system (e.g. double gets on Current or missed items on Current)

I beg to differ. See above.

MoveNextAsync() is supposed to handle ALL of the off-thread loading. You're not even supposed to think about Current until MoveNextAsync() is done. You're not supposed to have multiple readers unless you're locking so well that the enumerable doesn't think there are multiple readers. Be patient and await, or abstract it at a higher level like you're supposed to and hand out multiple IAsyncEnumerables.

Have you flushed out how CancellationTokens would work here? Is the intent to explicitly call GetEnumerator... something like foreach (await var item in ae.GetEnumerator(cancellationToken)) { ... } ?

You have to abstract that operation at a higher level. The IAsyncEnumerableT is only the final piece.

foreach (await var item in ae.EnumThingsAsync(cancellationToken)) { ... }

public IAsyncEnumerableT EnumThingsAsync(
CancellationToken ct = default(CancellationToken))
{
return new ThingEnumerable(ct);
}

Has issues with parallelazation/concurrency/atomicity which is a desirable property of an async system (e.g. double gets on Current or missed items on Current)

I beg to differ. See above.

MoveNextAsync() is supposed to handle ALL of the off-thread loading. You're not even supposed to think about Current until MoveNextAsync() is done. You're not supposed to have multiple readers unless you're locking so well that the enumerable doesn't think there are multiple readers. Be patient and await, or abstract it at a higher level like you're supposed to and hand out multiple IAsyncEnumerables.

@yaakov-h

This comment has been minimized.

Show comment
Hide comment
@yaakov-h

yaakov-h Sep 19, 2017

Contributor

Given that it's just an interface, MoveNextAsync doesn't have to actually start or control the enumeration. It simply returns a series of Tasks which represent the asynchronous operation to retrieve the next object.

It's up to the implementation as to whether loading begins when the IAsyncEnumerable<> is created, or when MoveNextAsync is called. Any intermediary item could be prefetched too. For example, you could enumerate a paginated remote call, so you get 50 items at a time, and only every 50th MoveNextAsync is actually asynchronous.

Contributor

yaakov-h commented Sep 19, 2017

Given that it's just an interface, MoveNextAsync doesn't have to actually start or control the enumeration. It simply returns a series of Tasks which represent the asynchronous operation to retrieve the next object.

It's up to the implementation as to whether loading begins when the IAsyncEnumerable<> is created, or when MoveNextAsync is called. Any intermediary item could be prefetched too. For example, you could enumerate a paginated remote call, so you get 50 items at a time, and only every 50th MoveNextAsync is actually asynchronous.

@asyncawaitmvvm

This comment has been minimized.

Show comment
Hide comment
@asyncawaitmvvm

asyncawaitmvvm Sep 20, 2017

Please don't bake something as implementation-dependent as CancellationToken into this. Although it is a fundamentally important pattern, I can foresee replacements being written with internal optimizations. IAsyncEnumerable needs to be plain and simple because deep threading usually has custom situational requirements and IAE couldn't meet them all anyway. Are you going to ref-count my UI threads too? Didn't think so. Keep it simple.

Please don't bake something as implementation-dependent as CancellationToken into this. Although it is a fundamentally important pattern, I can foresee replacements being written with internal optimizations. IAsyncEnumerable needs to be plain and simple because deep threading usually has custom situational requirements and IAE couldn't meet them all anyway. Are you going to ref-count my UI threads too? Didn't think so. Keep it simple.

@scalablecory

This comment has been minimized.

Show comment
Hide comment
@scalablecory

scalablecory Sep 20, 2017

While I'd prefer WaitForNextAsync / TryGetNext for manually written iterators, I'm not sure how you'd translate yield iterators into these two methods. If we're going for broke with performance, a way to avoid allocating a new async state for every WaitForNextAsync would be preferred but I don't know how that could be done without massively complicating things.

scalablecory commented Sep 20, 2017

While I'd prefer WaitForNextAsync / TryGetNext for manually written iterators, I'm not sure how you'd translate yield iterators into these two methods. If we're going for broke with performance, a way to avoid allocating a new async state for every WaitForNextAsync would be preferred but I don't know how that could be done without massively complicating things.

@svick

This comment has been minimized.

Show comment
Hide comment
@svick

svick Sep 21, 2017

Contributor

@asyncawaitmvvm

Please don't bake something as implementation-dependent as CancellationToken into this.

How is CancellationToken any more implementation-dependent than Task? It's part of the core library and used by many methods on core types.

I can foresee replacements being written with internal optimizations.

Could you be more specific? What optimizations would including CancellationToken in the interface prohibit? How would not including it help?

Contributor

svick commented Sep 21, 2017

@asyncawaitmvvm

Please don't bake something as implementation-dependent as CancellationToken into this.

How is CancellationToken any more implementation-dependent than Task? It's part of the core library and used by many methods on core types.

I can foresee replacements being written with internal optimizations.

Could you be more specific? What optimizations would including CancellationToken in the interface prohibit? How would not including it help?

@mattwar

This comment has been minimized.

Show comment
Hide comment
@mattwar

mattwar Sep 21, 2017

@scalablecory Maybe we could add a 'yield all' statement that would yield an entire array/list of values.

mattwar commented Sep 21, 2017

@scalablecory Maybe we could add a 'yield all' statement that would yield an entire array/list of values.

@stazz

This comment has been minimized.

Show comment
Hide comment
@stazz

stazz Sep 25, 2017

Hello all,

It seems anyone can comment on this issue, so I've decided to chime in and share some of my views on and experiences with async enumeration in C# so far - I hope this post will help in creating a good and versatile API for async enumeration.
I am not sure if this kind of feedback comes too late - I've only recently myself learned some of the more complex aspects of asynchronous enumeration.
I am working on CBAM project, which provides asynchronous task-based API to execute some kind of statement (e.g. SQL command or HTTP request/response sequence) remotely and then asynchronously iterate over the results.

From this point of view, the asynchronous enumerables and asynchronous enumeration has the following concepts, requirements, and definitions:

  • If the underlying enumeration source is synchronous or buffering, the BCL API for asynchronous enumeration must not force creation of heap objects such as Task<T>. This implies using struct ValueTask<T> instead of class Task<T> as return type for any asynchronous methods in the API. Unless there is some kind of mechanism to cache Task<T> objects, which will still cause some heap allocation.
  • The API must have an interface with out (covariant) generic type parameter. This means that if there are any asynchronous methods which use type parameter T, they must also have out variance, implying creating some sort of ITask<out T> interface, since out can only be used on interfaces. However, if the ValueTask<T> are used, none of the asynchronous methods of the interface can return result with type parameter T.
  • In synchronous enumeration everything is (or should be, at least) contained in the same process, structured in memory by language-implied mechanisms and logic, invisible to the actual creator of the code. Asynchrony, however, typically implies IO, which usually implies protocols of various kind of statefulness. Therefore it would be suitable to provide callback-based skeleton implementation of the API, which could be used by 80-90% of the scenarios. The special snowflakes (10-20% of the scenarios) can implement the interfaces directly.
  • There are (at least) two kind of enumerations that can be done on IAsyncEnumerator: sequential async (async foreach) and parallel async (parallel foreach). Parallel async usually implies stateless protocol (e.g. HTTP), while sequential usually implies stateful protocol (e.g. connection to SQL database). While performing parallel async enumeration, having Current property of synchronous IEnumerator<out T> counterpart interface would not be feasible, as it would cause concurrency etc issues, as previously pointed out by @benaadams.

Putting all of these points together is done in UtilPack.AsyncEnumeration library, which is used by the CBAM projects to actually expose asynchronous enumerator.

Points of observation of the UtilPack.AsyncEnumeration:

  • The gist is this interface (not all members are exposed in this comment)
   public interface AsyncEnumerator<out T>
   {
      ValueTask<Int64?> MoveNextAsync( CancellationToken token = default );
      T OneTimeRetrieve( Int64 retrievalToken );
   }
  • This is one more step in the direction of the WaitForNextAsync / TryGetNext pattern mentioned in the proposal. The retrieval token idea stems from the fact that we need somehow separate the retrievals of items of enumerable which is being enumerated in parallel. The MoveNextAsync returns one-time-use nullable token (this library chose it to be Int64, but it can be any e.g. struct), and OneTimeRetrieve method will synchronously fetch the item based on that token (and remove it from any potential private caches). The design is trade-off: it allows out covariance on the interface and parallel enumeration, but if all parallel requests complete at same time, there is big memory usage as the items are cached by enumerator. When enumerating sequentially, there is no need for caching other than previously seen retrieval token, so the potential big memory consumption is not a problem there.
  • Sequential and parallel async enumeration is implemented by E_UtilPack.EnumerateSequentiallyAsync and by E_UtilPack.EnumerateInParallelAsync extension methods, respectively. Both methods accept synchronous and asynchronous callbacks. I am not fully familiar with explicit usage of Task API (only moved to asynchrony when await was available), so the parallel enumeration methods might have some errors in their implementation, but the core idea should still hold.
    • The parallel enumeration might end up doing a bit of useless work if the parallel AsyncEnumerator is "not nice" - that is, it cannot tell whether it has next element synchronously.
    • The implementation of those methods is essentially what compiler would emit when user types async foreach or parallel foreach.
  • Only Enumerator type is provided by this library - but you can imagine IAsyncEnumerable<out T> interface with AsyncEnumerator<out T> GetEnumerator() method in it.
  • Two kinds of implementations of AsyncEnumerator exist: sequential-only, and with parallel support, both of these creatable via factory methods, requiring callbacks to implement actual functionality. The sample test usage code is availabe in unit test code.
  • Currently, AsyncEnumerator does not implement IAsyncDisposable, instead having EnumerationEnded method, but it is mainly for historical reasons. I'll probably change that for next release.

So using this kind of enumeration interface, it is possible to enumerate sequentially asynchronously:

  // Enumerate ascending sequence of 10 integers (0..9) sequentially asynchronously:
  const MAX_ITEMS = 10;
  var start = MAX_ITEMS;
  MoveNextAsyncDelegate<Int32> moveNext = async token =>
  {
    var decremented = Interlocked.Decrement( ref start );
    // Pretend to do something asycnhronously
    await Task.Delay( 100, token );
    // Return tuple of: 1. boolean indicating whether we still have more elements, and 2. the actual element
    return (decremented >= 0, MAX_ITEMS - decremented - 1);
  };

  // Use factory-pattern combined with callback-based skeleton implementation to get an enumerator
  var enumerator = AsyncEnumeratorFactory.CreateSequentialEnumerator<Int32>(
    async token =>
    {
      (var success, var item) = await moveNext( token );
      return (success, item, moveNext, null);
    });
    
  // This will print values from 0 to 9 to console output, and itemsEncountered will be 10 after awaiting
  var itemsEncountered = await enumerator.EnumerateSequentiallyAsync( item =>
  {
    Console.WriteLine( "Current item: " + item );
  } );

and also in parallel asynchronously:

  // Enumerate sequence of 10 integers (0..9) in parallel asynchronously:
  const MAX_ITEMS = 10;
  var start = MAX_ITEMS;

  // Use factory-pattern combined with callback-based skeleton implementation to get an enumerator
  var enumerator = AsyncEnumeratorFactory.CreateParallelEnumerator(
    () =>
    {
      // Notice that this callback is synchronous - in order for parallel enumerator to be "nice" and don't cause extra calls when enumerated.
      var decremented = Interlocked.Decrement( ref start );
      // Return tuple of 1. boolean indicating whether we still have more elements, and 2. state information to pass to next callback ( in this case it is the decremented value)
      return (decremented >= 0, decremented);
    },
    async ( decremented, token ) =>
    {
      // Pretend to do something asynchronously
      await Task.Delay( 100, token );
      // Return the actual item
      return MAX_ITEMS - decremented - 1;
    },
    null
    );

  // This will print values from 0 to 9 to console output (but not necessarily in that order), and itemsEncountered will be 10 after awaiting
  var itemsEncountered = await enumerator.EnumerateInParallelAsync( item =>
  {
    Console.WriteLine( "Current item: " + item );
  } );

  // The sequential enumeration actually works too, and just like one would expect (printing ascending series of values from 0 to 9 to console output)
  var itemsEncountered = await enumerator.EnumerateSequentiallyAsync( item =>
  {
    Console.WriteLine( "Current item: " + item );
  } );

It seems that the concept of the parallel asynchronous enumeration is missing from this discussion.
Is it intended to be implemented somewhere else separately?
What are your opinions about the parallel asynchronous enumeration, and about the token-based concept of coupling between MoveNextAsync and OneTimeRetrieve (or WaitForNextAsync / TryGetNext )?

stazz commented Sep 25, 2017

Hello all,

It seems anyone can comment on this issue, so I've decided to chime in and share some of my views on and experiences with async enumeration in C# so far - I hope this post will help in creating a good and versatile API for async enumeration.
I am not sure if this kind of feedback comes too late - I've only recently myself learned some of the more complex aspects of asynchronous enumeration.
I am working on CBAM project, which provides asynchronous task-based API to execute some kind of statement (e.g. SQL command or HTTP request/response sequence) remotely and then asynchronously iterate over the results.

From this point of view, the asynchronous enumerables and asynchronous enumeration has the following concepts, requirements, and definitions:

  • If the underlying enumeration source is synchronous or buffering, the BCL API for asynchronous enumeration must not force creation of heap objects such as Task<T>. This implies using struct ValueTask<T> instead of class Task<T> as return type for any asynchronous methods in the API. Unless there is some kind of mechanism to cache Task<T> objects, which will still cause some heap allocation.
  • The API must have an interface with out (covariant) generic type parameter. This means that if there are any asynchronous methods which use type parameter T, they must also have out variance, implying creating some sort of ITask<out T> interface, since out can only be used on interfaces. However, if the ValueTask<T> are used, none of the asynchronous methods of the interface can return result with type parameter T.
  • In synchronous enumeration everything is (or should be, at least) contained in the same process, structured in memory by language-implied mechanisms and logic, invisible to the actual creator of the code. Asynchrony, however, typically implies IO, which usually implies protocols of various kind of statefulness. Therefore it would be suitable to provide callback-based skeleton implementation of the API, which could be used by 80-90% of the scenarios. The special snowflakes (10-20% of the scenarios) can implement the interfaces directly.
  • There are (at least) two kind of enumerations that can be done on IAsyncEnumerator: sequential async (async foreach) and parallel async (parallel foreach). Parallel async usually implies stateless protocol (e.g. HTTP), while sequential usually implies stateful protocol (e.g. connection to SQL database). While performing parallel async enumeration, having Current property of synchronous IEnumerator<out T> counterpart interface would not be feasible, as it would cause concurrency etc issues, as previously pointed out by @benaadams.

Putting all of these points together is done in UtilPack.AsyncEnumeration library, which is used by the CBAM projects to actually expose asynchronous enumerator.

Points of observation of the UtilPack.AsyncEnumeration:

  • The gist is this interface (not all members are exposed in this comment)
   public interface AsyncEnumerator<out T>
   {
      ValueTask<Int64?> MoveNextAsync( CancellationToken token = default );
      T OneTimeRetrieve( Int64 retrievalToken );
   }
  • This is one more step in the direction of the WaitForNextAsync / TryGetNext pattern mentioned in the proposal. The retrieval token idea stems from the fact that we need somehow separate the retrievals of items of enumerable which is being enumerated in parallel. The MoveNextAsync returns one-time-use nullable token (this library chose it to be Int64, but it can be any e.g. struct), and OneTimeRetrieve method will synchronously fetch the item based on that token (and remove it from any potential private caches). The design is trade-off: it allows out covariance on the interface and parallel enumeration, but if all parallel requests complete at same time, there is big memory usage as the items are cached by enumerator. When enumerating sequentially, there is no need for caching other than previously seen retrieval token, so the potential big memory consumption is not a problem there.
  • Sequential and parallel async enumeration is implemented by E_UtilPack.EnumerateSequentiallyAsync and by E_UtilPack.EnumerateInParallelAsync extension methods, respectively. Both methods accept synchronous and asynchronous callbacks. I am not fully familiar with explicit usage of Task API (only moved to asynchrony when await was available), so the parallel enumeration methods might have some errors in their implementation, but the core idea should still hold.
    • The parallel enumeration might end up doing a bit of useless work if the parallel AsyncEnumerator is "not nice" - that is, it cannot tell whether it has next element synchronously.
    • The implementation of those methods is essentially what compiler would emit when user types async foreach or parallel foreach.
  • Only Enumerator type is provided by this library - but you can imagine IAsyncEnumerable<out T> interface with AsyncEnumerator<out T> GetEnumerator() method in it.
  • Two kinds of implementations of AsyncEnumerator exist: sequential-only, and with parallel support, both of these creatable via factory methods, requiring callbacks to implement actual functionality. The sample test usage code is availabe in unit test code.
  • Currently, AsyncEnumerator does not implement IAsyncDisposable, instead having EnumerationEnded method, but it is mainly for historical reasons. I'll probably change that for next release.

So using this kind of enumeration interface, it is possible to enumerate sequentially asynchronously:

  // Enumerate ascending sequence of 10 integers (0..9) sequentially asynchronously:
  const MAX_ITEMS = 10;
  var start = MAX_ITEMS;
  MoveNextAsyncDelegate<Int32> moveNext = async token =>
  {
    var decremented = Interlocked.Decrement( ref start );
    // Pretend to do something asycnhronously
    await Task.Delay( 100, token );
    // Return tuple of: 1. boolean indicating whether we still have more elements, and 2. the actual element
    return (decremented >= 0, MAX_ITEMS - decremented - 1);
  };

  // Use factory-pattern combined with callback-based skeleton implementation to get an enumerator
  var enumerator = AsyncEnumeratorFactory.CreateSequentialEnumerator<Int32>(
    async token =>
    {
      (var success, var item) = await moveNext( token );
      return (success, item, moveNext, null);
    });
    
  // This will print values from 0 to 9 to console output, and itemsEncountered will be 10 after awaiting
  var itemsEncountered = await enumerator.EnumerateSequentiallyAsync( item =>
  {
    Console.WriteLine( "Current item: " + item );
  } );

and also in parallel asynchronously:

  // Enumerate sequence of 10 integers (0..9) in parallel asynchronously:
  const MAX_ITEMS = 10;
  var start = MAX_ITEMS;

  // Use factory-pattern combined with callback-based skeleton implementation to get an enumerator
  var enumerator = AsyncEnumeratorFactory.CreateParallelEnumerator(
    () =>
    {
      // Notice that this callback is synchronous - in order for parallel enumerator to be "nice" and don't cause extra calls when enumerated.
      var decremented = Interlocked.Decrement( ref start );
      // Return tuple of 1. boolean indicating whether we still have more elements, and 2. state information to pass to next callback ( in this case it is the decremented value)
      return (decremented >= 0, decremented);
    },
    async ( decremented, token ) =>
    {
      // Pretend to do something asynchronously
      await Task.Delay( 100, token );
      // Return the actual item
      return MAX_ITEMS - decremented - 1;
    },
    null
    );

  // This will print values from 0 to 9 to console output (but not necessarily in that order), and itemsEncountered will be 10 after awaiting
  var itemsEncountered = await enumerator.EnumerateInParallelAsync( item =>
  {
    Console.WriteLine( "Current item: " + item );
  } );

  // The sequential enumeration actually works too, and just like one would expect (printing ascending series of values from 0 to 9 to console output)
  var itemsEncountered = await enumerator.EnumerateSequentiallyAsync( item =>
  {
    Console.WriteLine( "Current item: " + item );
  } );

It seems that the concept of the parallel asynchronous enumeration is missing from this discussion.
Is it intended to be implemented somewhere else separately?
What are your opinions about the parallel asynchronous enumeration, and about the token-based concept of coupling between MoveNextAsync and OneTimeRetrieve (or WaitForNextAsync / TryGetNext )?

@mattwar

This comment has been minimized.

Show comment
Hide comment
@mattwar

mattwar Sep 25, 2017

I have a prototype of LINQ operators implemented using this proposal, in one of its current states.
https://github.com/mattwar/AsyncLinq

mattwar commented Sep 25, 2017

I have a prototype of LINQ operators implemented using this proposal, in one of its current states.
https://github.com/mattwar/AsyncLinq

@scalablecory

This comment has been minimized.

Show comment
Hide comment
@scalablecory

scalablecory Sep 25, 2017

@stazz I believe this proposal should focus purely on async as a separate concept from parallelism, ignoring parallelism other than to prevent making it completely incompatible. There's no reason PLINQ couldn't be extended to handle any of the current IAsyncEnumerable proposals, so I think that is already accomplished. Libraries like TPL Dataflow are more appropriate for non-trivial parallelism anyway, imho.

@stazz I believe this proposal should focus purely on async as a separate concept from parallelism, ignoring parallelism other than to prevent making it completely incompatible. There's no reason PLINQ couldn't be extended to handle any of the current IAsyncEnumerable proposals, so I think that is already accomplished. Libraries like TPL Dataflow are more appropriate for non-trivial parallelism anyway, imho.

@mattwar

This comment has been minimized.

Show comment
Hide comment
@mattwar

mattwar Sep 25, 2017

What happens when MoveNextAsync is called before all the items have been exhausted via calling TryGetNext?

  1. Calling MoveNextAsync does nothing, returning true, requiring you to always iterate through all items via TryGetNext first before MoveNextAsync will move forward.
  2. Calling MoveNextAsync abandons any remaining items and fetches the next group/block/etc.

If you were using IEnumerator<T>, calling MoveNext repeatedly will always move on, regardless of whether you've accessed the Current property. This makes it easy to think about IEnumeratorAsync<T> as being just like IEnumerator<T>, except each "MoveNext" gets you multiple values instead of just one. This would imply that calling MoveNext before you've accessed all the items would just move the iteration on to the next set. So option 2.

On the other hand, if you tend to think of the two methods of the IAsyncEnumerator<T> type as IEnumerator<T>.MoveNext broken into two parts, then it may make more sense to choose option 1.

If you are trying to write a wrapper class around an existing IEnumeratorAsync<T> to apply additional logic, supporting option 1 would make that task more complicated, as you would need to track more state and writing the class would be more complicated because of it. (This is from experience trying to write the async linq prototype.)

However, if you are trying to support parallel consumers via IEnumeratorAsync<T>, then option 2 would make that incredibly difficult, because you would have to have two different kinds of synchronization across the two methods and that would get tricky or expensive. Though, it may be a lost cause to have this API work for parallel consumers, since the TryGet API could easily lead to consumer starvation, since there is no way to queue the accesses.

mattwar commented Sep 25, 2017

What happens when MoveNextAsync is called before all the items have been exhausted via calling TryGetNext?

  1. Calling MoveNextAsync does nothing, returning true, requiring you to always iterate through all items via TryGetNext first before MoveNextAsync will move forward.
  2. Calling MoveNextAsync abandons any remaining items and fetches the next group/block/etc.

If you were using IEnumerator<T>, calling MoveNext repeatedly will always move on, regardless of whether you've accessed the Current property. This makes it easy to think about IEnumeratorAsync<T> as being just like IEnumerator<T>, except each "MoveNext" gets you multiple values instead of just one. This would imply that calling MoveNext before you've accessed all the items would just move the iteration on to the next set. So option 2.

On the other hand, if you tend to think of the two methods of the IAsyncEnumerator<T> type as IEnumerator<T>.MoveNext broken into two parts, then it may make more sense to choose option 1.

If you are trying to write a wrapper class around an existing IEnumeratorAsync<T> to apply additional logic, supporting option 1 would make that task more complicated, as you would need to track more state and writing the class would be more complicated because of it. (This is from experience trying to write the async linq prototype.)

However, if you are trying to support parallel consumers via IEnumeratorAsync<T>, then option 2 would make that incredibly difficult, because you would have to have two different kinds of synchronization across the two methods and that would get tricky or expensive. Though, it may be a lost cause to have this API work for parallel consumers, since the TryGet API could easily lead to consumer starvation, since there is no way to queue the accesses.

@HaloFour

This comment has been minimized.

Show comment
Hide comment
@HaloFour

HaloFour Sep 26, 2017

Contributor

@stazz

It seems that the concept of the parallel asynchronous enumeration is missing from this discussion.

The idea came up a couple of times on the Roslyn repo but it never gained traction. I think that the idea of a foreach-like construct where multiple threads could be executing simultaneously within the body was unpalatable.

This proposal is specifically about consumer-requested single pull model, and the interfaces describe only that. But those interfaces could be used to bridge a parallel or reactive producer with whatever appropriate backpressure/buffering policy is required. That would be done through separate libraries, though, and is not explicitly captured as a part of this proposal.

Contributor

HaloFour commented Sep 26, 2017

@stazz

It seems that the concept of the parallel asynchronous enumeration is missing from this discussion.

The idea came up a couple of times on the Roslyn repo but it never gained traction. I think that the idea of a foreach-like construct where multiple threads could be executing simultaneously within the body was unpalatable.

This proposal is specifically about consumer-requested single pull model, and the interfaces describe only that. But those interfaces could be used to bridge a parallel or reactive producer with whatever appropriate backpressure/buffering policy is required. That would be done through separate libraries, though, and is not explicitly captured as a part of this proposal.

@onovotny

This comment has been minimized.

Show comment
Hide comment
@onovotny

onovotny Sep 26, 2017

Member

There is a fully working implementation of this proposal, with all of the operators, here: https://github.com/Reactive-Extensions/Rx.NET/tree/IxAsyncCSharp8/Ix.NET/Source

Member

onovotny commented Sep 26, 2017

There is a fully working implementation of this proposal, with all of the operators, here: https://github.com/Reactive-Extensions/Rx.NET/tree/IxAsyncCSharp8/Ix.NET/Source

@stazz

This comment has been minimized.

Show comment
Hide comment
@stazz

stazz Sep 26, 2017

Hmm, I am not sure if I used term "parallel" correctly in my previous post. What I meant was that there are scenarios when we don't want to await for MoveNextAsync to complete before calling it again. Maybe the term "concurrent", as in concurrent enumeration, is better?

@scalablecory : I understand the concern of keeping this proposal "on topic", and not to derail too much. I guess the main motivation for my scenario was that since there will be a language construct for sequentially enumerating async enumerable (foreach await), for the sake of orthogonality and user-friendlyness, there should be a language construct for concurrently enumerating async enumerable (maybe foreach concurrent ?). Using PLINQ and separate libraries manually tends to make code a bit "plumby". Unless... those libaries could make their own foreach concurrent language construct by code analyzer or other way of hooking up to Roslyn compilation process. Is that possible? Should it be considered in this proposal?

@HaloFour : Thanks for the additional clarification! I wasn't aware that this proposal is only about consumer-requested single pull model - it explains the lack of conversation about the parallel/concurrent enumeration.

@mattwar Hmm, interesting point of view - writing async LINQ library. I am not sure how the WaitForNextAsync / TryGetNext pattern would fare in such test. I could make a small test though, since I am curious myself as to how one would write async LINQ implementation for such enumerable.

stazz commented Sep 26, 2017

Hmm, I am not sure if I used term "parallel" correctly in my previous post. What I meant was that there are scenarios when we don't want to await for MoveNextAsync to complete before calling it again. Maybe the term "concurrent", as in concurrent enumeration, is better?

@scalablecory : I understand the concern of keeping this proposal "on topic", and not to derail too much. I guess the main motivation for my scenario was that since there will be a language construct for sequentially enumerating async enumerable (foreach await), for the sake of orthogonality and user-friendlyness, there should be a language construct for concurrently enumerating async enumerable (maybe foreach concurrent ?). Using PLINQ and separate libraries manually tends to make code a bit "plumby". Unless... those libaries could make their own foreach concurrent language construct by code analyzer or other way of hooking up to Roslyn compilation process. Is that possible? Should it be considered in this proposal?

@HaloFour : Thanks for the additional clarification! I wasn't aware that this proposal is only about consumer-requested single pull model - it explains the lack of conversation about the parallel/concurrent enumeration.

@mattwar Hmm, interesting point of view - writing async LINQ library. I am not sure how the WaitForNextAsync / TryGetNext pattern would fare in such test. I could make a small test though, since I am curious myself as to how one would write async LINQ implementation for such enumerable.

@asyncawaitmvvm

This comment has been minimized.

Show comment
Hide comment
@asyncawaitmvvm

asyncawaitmvvm Sep 26, 2017

. Unless... those libaries could make their own foreach concurrent language construct by code analyzer or other way of hooking up to Roslyn compilation process. Is that possible? Should it be considered in this proposal?

Parallel.ForEach() is completely useless and irrelevant for async methods, but ForEachAsync() has many possible competing algorithms which is why everyone writes their own. I maintain two different ones myself and can foresee a third soon. The easiest way is to create a work item for every core and have each item read-loop the enumerable inside an async lock (SemaphoreSlim?) until it's empty. The only problem is that the cores are idle if all the tasks are pending I/O, and there are ways to capitalize on that.

. Unless... those libaries could make their own foreach concurrent language construct by code analyzer or other way of hooking up to Roslyn compilation process. Is that possible? Should it be considered in this proposal?

Parallel.ForEach() is completely useless and irrelevant for async methods, but ForEachAsync() has many possible competing algorithms which is why everyone writes their own. I maintain two different ones myself and can foresee a third soon. The easiest way is to create a work item for every core and have each item read-loop the enumerable inside an async lock (SemaphoreSlim?) until it's empty. The only problem is that the cores are idle if all the tasks are pending I/O, and there are ways to capitalize on that.

@markusschaber

This comment has been minimized.

Show comment
Hide comment
@markusschaber

markusschaber Sep 26, 2017

I think the best way for parallel enumeration / iteration is to have one enumerator instance for each consumer, and the iterators internally know how to coordinate themselves.

So, we could define a interface IParallelEnumerable to get an IParallelEnumeratorSource, which itself produces an arbitrary amount of IEnumerator<T> instances which internally know how to coordinate themselves, and IAsyncParallelEnumerable creates IAsyncParallelEnumeratorSources which produce IAsyncEnumerator<T> instances.

We could simplify the XXXParallelEnumeratorSource interfaces to IEnumerable<IXXXEnumerator<T>>, possibly.

I think the best way for parallel enumeration / iteration is to have one enumerator instance for each consumer, and the iterators internally know how to coordinate themselves.

So, we could define a interface IParallelEnumerable to get an IParallelEnumeratorSource, which itself produces an arbitrary amount of IEnumerator<T> instances which internally know how to coordinate themselves, and IAsyncParallelEnumerable creates IAsyncParallelEnumeratorSources which produce IAsyncEnumerator<T> instances.

We could simplify the XXXParallelEnumeratorSource interfaces to IEnumerable<IXXXEnumerator<T>>, possibly.

@stazz

This comment has been minimized.

Show comment
Hide comment
@stazz

stazz Sep 28, 2017

@mattwar It took me a while to understand the exact point of your message. I had to convert my UtilPack.AsyncEnumeration project to use WaitForNextAsync / TryGetNext pattern, and write some LINQ extension methods (just to test and learn and investigate) for modified IAsyncEnumerator, and then I started to see it. The synchronous callback LINQ (IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> enumerable, Func<T, Boolean> syncPredicate )) was easy, and I ended up in pretty much identical enumerator class as in your implementation. However, the asynchronous callback LINQ (IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> enumerable, Func<T, Task<Boolean>> asyncPredicate )) was more difficult indeed, and that was when I realized what exactly your message was about.

The root cause is that using the WaitForNextAsync / TryGetNext pattern in the proposal, the context is lost between the invocations:

Task<bool> WaitForNextAsync();
T TryGetNext(out bool success);

However, if the method signatures of that pattern would be changed like this (very close to the ones I had in my original message):

// This example uses Int64, but can be any struct, really
// Return null to signal that enumerator has reached the end
Task<Int64?> WaitForNextAsync(); // Or maybe ValueTask<Int64?> here?
T TryGetNext( Int64 waitToken, out bool success );

Then the context could be preserved via the Int64 value. Obviously, this means that those enumerators, which intend to support multiple concurrent consumers, need to have some sort of ConcurrentDictionary mapping Int64 tokens to T values. This does indeed introduce some memory and complexity issues, but IMO supporting multiple concurrent consumers should be completely optional. Those implementations that won't support concurrent consumers could have much simpler code, since they wouldn't need to use ConcurrentDictionary since they can assume that only one wait-token is used at a time.

I hope that if the WaitForNextAsync / TryGetNext pattern survives over MoveNextAsync / Current pattern in the proposal, then the WaitForNextAsync / TryGetNext pattern would be modified in such way that TryGetNext is aware of exact invocation of WaitForNextAsync (for example, like in the code sample above, Int64 could be used to pass invocation capture). Otherwise, the concurrent consumers mentioned as minor benefit in the proposal would not be a real benefit, as it doesn't scale well in e.g. writing LINQ library.

stazz commented Sep 28, 2017

@mattwar It took me a while to understand the exact point of your message. I had to convert my UtilPack.AsyncEnumeration project to use WaitForNextAsync / TryGetNext pattern, and write some LINQ extension methods (just to test and learn and investigate) for modified IAsyncEnumerator, and then I started to see it. The synchronous callback LINQ (IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> enumerable, Func<T, Boolean> syncPredicate )) was easy, and I ended up in pretty much identical enumerator class as in your implementation. However, the asynchronous callback LINQ (IAsyncEnumerable<T> Where<T>( this IAsyncEnumerable<T> enumerable, Func<T, Task<Boolean>> asyncPredicate )) was more difficult indeed, and that was when I realized what exactly your message was about.

The root cause is that using the WaitForNextAsync / TryGetNext pattern in the proposal, the context is lost between the invocations:

Task<bool> WaitForNextAsync();
T TryGetNext(out bool success);

However, if the method signatures of that pattern would be changed like this (very close to the ones I had in my original message):

// This example uses Int64, but can be any struct, really
// Return null to signal that enumerator has reached the end
Task<Int64?> WaitForNextAsync(); // Or maybe ValueTask<Int64?> here?
T TryGetNext( Int64 waitToken, out bool success );

Then the context could be preserved via the Int64 value. Obviously, this means that those enumerators, which intend to support multiple concurrent consumers, need to have some sort of ConcurrentDictionary mapping Int64 tokens to T values. This does indeed introduce some memory and complexity issues, but IMO supporting multiple concurrent consumers should be completely optional. Those implementations that won't support concurrent consumers could have much simpler code, since they wouldn't need to use ConcurrentDictionary since they can assume that only one wait-token is used at a time.

I hope that if the WaitForNextAsync / TryGetNext pattern survives over MoveNextAsync / Current pattern in the proposal, then the WaitForNextAsync / TryGetNext pattern would be modified in such way that TryGetNext is aware of exact invocation of WaitForNextAsync (for example, like in the code sample above, Int64 could be used to pass invocation capture). Otherwise, the concurrent consumers mentioned as minor benefit in the proposal would not be a real benefit, as it doesn't scale well in e.g. writing LINQ library.

@markusschaber

This comment has been minimized.

Show comment
Hide comment
@markusschaber

markusschaber Sep 28, 2017

@stazz I think that the prize (in terms of complexity) is too high for those who don't need parallel enumeration. The proposal with the multiple enumerators I described has the advantage that only those users who need parallel enumeration pay any prize at all.
(I don't claim any ownership on that proposal, I saw it somewhere else, but don't remember where)

@stazz I think that the prize (in terms of complexity) is too high for those who don't need parallel enumeration. The proposal with the multiple enumerators I described has the advantage that only those users who need parallel enumeration pay any prize at all.
(I don't claim any ownership on that proposal, I saw it somewhere else, but don't remember where)

@stazz

This comment has been minimized.

Show comment
Hide comment
@stazz

stazz Sep 28, 2017

Ah yes, I forgot to mention in previous message, that another option would just indeed make this IAsyncEnumerator purely for sequential asynchronous enumeration, and handle parallel/concurrent enumeration entirely via external libraries. That would be completely ok as well.

@markusschaber But I am curious - what exactly did you mean by complexity price? From consumer point of view, it would be just changing this loop (direct copypaste from the proposal):

IAsyncEnumerable<T> enumerator = enumerable.GetAsyncEnumerator();
while (await enumerator.WaitForNextAsync())
{
    while (true)
    {
        int item = enumerator.TryGetNext(out bool success);
        if (!success) break;
        Use(item);
    }
}

into this loop:

IAsyncEnumerable<T> enumerator = enumerable.GetAsyncEnumerator();
Int64? waitToken;
while ((waitToken = await enumerator.WaitForNextAsync()).HasValue)
{
    while (true)
    {
        int item = enumerator.TryGetNext(waitToken.Value, out bool success);
        if (!success) break;
        Use(item);
    }
}

From a sequential producer point of view, all it needs is to have one extra Int64 field, which could be just Interlocked.Incremented on each successful WaitForNextAsync call, and TryGetNext would just add one extra condition that given waitToken parameter matches the previous one stored in the field.

Or did you mean something else entirely? :)

stazz commented Sep 28, 2017

Ah yes, I forgot to mention in previous message, that another option would just indeed make this IAsyncEnumerator purely for sequential asynchronous enumeration, and handle parallel/concurrent enumeration entirely via external libraries. That would be completely ok as well.

@markusschaber But I am curious - what exactly did you mean by complexity price? From consumer point of view, it would be just changing this loop (direct copypaste from the proposal):

IAsyncEnumerable<T> enumerator = enumerable.GetAsyncEnumerator();
while (await enumerator.WaitForNextAsync())
{
    while (true)
    {
        int item = enumerator.TryGetNext(out bool success);
        if (!success) break;
        Use(item);
    }
}

into this loop:

IAsyncEnumerable<T> enumerator = enumerable.GetAsyncEnumerator();
Int64? waitToken;
while ((waitToken = await enumerator.WaitForNextAsync()).HasValue)
{
    while (true)
    {
        int item = enumerator.TryGetNext(waitToken.Value, out bool success);
        if (!success) break;
        Use(item);
    }
}

From a sequential producer point of view, all it needs is to have one extra Int64 field, which could be just Interlocked.Incremented on each successful WaitForNextAsync call, and TryGetNext would just add one extra condition that given waitToken parameter matches the previous one stored in the field.

Or did you mean something else entirely? :)

@markusschaber

This comment has been minimized.

Show comment
Hide comment
@markusschaber

markusschaber Sep 28, 2017

Exactly that extra Int64 field is what I meant.

It requires that non-parallel consumers also handle a token, which is overhead.

Additionally, it forces implementors to have some indirection via the token and an internal lookup. When every consumer has it's own IAsyncEnumerator instance, that indirection can be avoided, which is probably more efficient in some cases.

Exactly that extra Int64 field is what I meant.

It requires that non-parallel consumers also handle a token, which is overhead.

Additionally, it forces implementors to have some indirection via the token and an internal lookup. When every consumer has it's own IAsyncEnumerator instance, that indirection can be avoided, which is probably more efficient in some cases.

@jnm2

This comment has been minimized.

Show comment
Hide comment
@jnm2

jnm2 Sep 28, 2017

Contributor

Additionally, it forces implementors to have some indirection via the token and an internal lookup.

Yeah– no way. This isn't about parallelization. It's about a non-blocking version of the single-consumer idiom IEnumerable.

Contributor

jnm2 commented Sep 28, 2017

Additionally, it forces implementors to have some indirection via the token and an internal lookup.

Yeah– no way. This isn't about parallelization. It's about a non-blocking version of the single-consumer idiom IEnumerable.

@stazz

This comment has been minimized.

Show comment
Hide comment
@stazz

stazz Sep 28, 2017

@markusschaber Ahh, I see. Fair enough point! I was thinking that one Int64 field is not that bad (especially since asynchrony usually involves I/O, which tends to be more bottleneck than CPU/memory), but I do agree that it is overhead. 👍 Oh well, let's see how this proposal develops - it still might end up with MoveNextAsync / Current pattern anyways.

stazz commented Sep 28, 2017

@markusschaber Ahh, I see. Fair enough point! I was thinking that one Int64 field is not that bad (especially since asynchrony usually involves I/O, which tends to be more bottleneck than CPU/memory), but I do agree that it is overhead. 👍 Oh well, let's see how this proposal develops - it still might end up with MoveNextAsync / Current pattern anyways.

@benaadams

This comment has been minimized.

Show comment
Hide comment
@benaadams

benaadams Dec 13, 2017

Async filesystem enumeration would be an example use case; as there are currently no async .NET filesystem apis

Async filesystem enumeration would be an example use case; as there are currently no async .NET filesystem apis

@HaloFour HaloFour referenced this issue in electronicarts/ea-async Dec 13, 2017

Closed

ea-async with Java 9 #11

@jcouv jcouv referenced this issue in dotnet/roslyn Jan 4, 2018

Open

[Umbrella] Work items and test plan for async streams #24037

14 of 59 tasks complete
@NetMage

This comment has been minimized.

Show comment
Hide comment
@NetMage

NetMage Jan 12, 2018

Personally I would really prefer flipping TryGetNext around:

Task WaitForNextAsync();
bool TryGetNext(out T nextVal);

NetMage commented Jan 12, 2018

Personally I would really prefer flipping TryGetNext around:

Task WaitForNextAsync();
bool TryGetNext(out T nextVal);

@benaadams

This comment has been minimized.

Show comment
Hide comment
@benaadams

benaadams Jan 12, 2018

Personally I would really prefer flipping TryGetNext around:

Discarded options considered:

Task<bool> WaitForNextAsync(); bool TryGetNext(out T result);: out parameters can't be covariant. There's also a small impact here (an issue with the try pattern in general) that this likely incurs a runtime write barrier for reference type results.

Probably should be When not Wait? As When is normally used in preference to Wait for async?

Task<bool> WhenNextAsync();

Personally I would really prefer flipping TryGetNext around:

Discarded options considered:

Task<bool> WaitForNextAsync(); bool TryGetNext(out T result);: out parameters can't be covariant. There's also a small impact here (an issue with the try pattern in general) that this likely incurs a runtime write barrier for reference type results.

Probably should be When not Wait? As When is normally used in preference to Wait for async?

Task<bool> WhenNextAsync();
@bbarry

This comment has been minimized.

Show comment
Hide comment
@bbarry

bbarry Jan 12, 2018

Contributor

I like the "viable alternative"

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        Task<bool> WaitForNextAsync();
        T TryGetNext(out bool success);
    }
}

Is it being considered to implement a method pattern version instead of explicitly settling on specific interfaces:

TAsyncEnumerable<out TItem>
{
    TAsyncEnumerator<TItem> GetAsyncEnumerator();
}

TAsyncEnumerator<out TItem>
{
    TTaskLike<bool> WaitForNextAsync();
    T TryGetNext(out bool success);
    TTaskLike DisposeAsync();
}
Contributor

bbarry commented Jan 12, 2018

I like the "viable alternative"

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        Task<bool> WaitForNextAsync();
        T TryGetNext(out bool success);
    }
}

Is it being considered to implement a method pattern version instead of explicitly settling on specific interfaces:

TAsyncEnumerable<out TItem>
{
    TAsyncEnumerator<TItem> GetAsyncEnumerator();
}

TAsyncEnumerator<out TItem>
{
    TTaskLike<bool> WaitForNextAsync();
    T TryGetNext(out bool success);
    TTaskLike DisposeAsync();
}
@alrz

This comment has been minimized.

Show comment
Hide comment
@alrz

alrz Jan 17, 2018

Contributor

Currently, using statements swallow try block exception if Dispose also throws and async methods do not aggregate exceptions even though they throw AggregateException. Are we going to change that behavior for using await or DisposeAsync is supposed to be error free? I suspect async methods are more likely to fail since there are more moving parts involved, so it could be harder to maintain a safe implementation to take advantage of using await and not fallback to a nested try finally as a precaution for critical code.


Another question for async streams, what if the iteration itself is not async but I have an await somewhere?

async IAsyncEnumerable<T> IteratorAsync() {
  using (var reader = await ExecuteReader()) {
    while (reader.Read())
      yield return RowParser(reader);
  }
}

Currently I can accomplish that with Task<IEnumerable<T>> and a iterator local function. MoveNextAsync for this example is unnecessary. Do we allow Task<IEnumerable<T>> as the return type in these situations?

Contributor

alrz commented Jan 17, 2018

Currently, using statements swallow try block exception if Dispose also throws and async methods do not aggregate exceptions even though they throw AggregateException. Are we going to change that behavior for using await or DisposeAsync is supposed to be error free? I suspect async methods are more likely to fail since there are more moving parts involved, so it could be harder to maintain a safe implementation to take advantage of using await and not fallback to a nested try finally as a precaution for critical code.


Another question for async streams, what if the iteration itself is not async but I have an await somewhere?

async IAsyncEnumerable<T> IteratorAsync() {
  using (var reader = await ExecuteReader()) {
    while (reader.Read())
      yield return RowParser(reader);
  }
}

Currently I can accomplish that with Task<IEnumerable<T>> and a iterator local function. MoveNextAsync for this example is unnecessary. Do we allow Task<IEnumerable<T>> as the return type in these situations?

@quinmars

This comment has been minimized.

Show comment
Hide comment
@quinmars

quinmars Jan 19, 2018

@alrz I guess that would be very tricky. Who will dispose the reader? I think that will only work with buffering of the complete list or with IAsyncEnumerable<T>. The first MoveNextAsync will take sometime because the reader needs to be awaited, the following MoveNextAsync will be fast. For them, the state machine can simply return Task.FromResult(true) or Task.FromResult(false) or even better a cached or global reference of them.

@alrz I guess that would be very tricky. Who will dispose the reader? I think that will only work with buffering of the complete list or with IAsyncEnumerable<T>. The first MoveNextAsync will take sometime because the reader needs to be awaited, the following MoveNextAsync will be fast. For them, the state machine can simply return Task.FromResult(true) or Task.FromResult(false) or even better a cached or global reference of them.

@alrz

This comment has been minimized.

Show comment
Hide comment
@alrz

alrz Jan 19, 2018

Contributor

RE "Who will dispose the reader" The iterator.

async Task<IEnumerable<T>> IteratorAsync() {
  var reader = await ExecuteReader();
  IEnumerable<T> Iterator() {
    using (reader)
      while (reader.Read())
        yield return RowParser(reader);
  }
  return Iterator();
}

It's not always easy to workaround this, would be nice if the compiler can see if the iterator is not awaiting and permit Task<IEnumerable<T>> as the return type.

Contributor

alrz commented Jan 19, 2018

RE "Who will dispose the reader" The iterator.

async Task<IEnumerable<T>> IteratorAsync() {
  var reader = await ExecuteReader();
  IEnumerable<T> Iterator() {
    using (reader)
      while (reader.Read())
        yield return RowParser(reader);
  }
  return Iterator();
}

It's not always easy to workaround this, would be nice if the compiler can see if the iterator is not awaiting and permit Task<IEnumerable<T>> as the return type.

@quinmars

This comment has been minimized.

Show comment
Hide comment
@quinmars

quinmars Jan 19, 2018

@alrz I see. But it's rather unnoticeable, that you will leak resources if you do not use the IEnumerable<T> once.

@alrz I see. But it's rather unnoticeable, that you will leak resources if you do not use the IEnumerable<T> once.

@bbarry

This comment has been minimized.

Show comment
Hide comment
@bbarry

bbarry Jan 21, 2018

Contributor

If the alternative interfaces are used, I don't think there is any benefit to permitting Task<IEnumerable<T>> (maybe 1 virtual call? If the pattern form was used they could potentially even be inlineable).

Contributor

bbarry commented Jan 21, 2018

If the alternative interfaces are used, I don't think there is any benefit to permitting Task<IEnumerable<T>> (maybe 1 virtual call? If the pattern form was used they could potentially even be inlineable).

@petroemil petroemil referenced this issue in dotnet/reactive Feb 28, 2018

Closed

Become an official dotnet repo? #466

@divega divega referenced this issue in aspnet/EntityFrameworkCore May 2, 2018

Open

Query: Add ForEachAsync operator with async lambda #11866

@TylerBrinkley

This comment has been minimized.

Show comment
Hide comment
@TylerBrinkley

TylerBrinkley May 16, 2018

I was watching the Build demo of this feature and was wondering how we can specify .ConfigureAwait(false) for async disposal and async enumeration?

I was watching the Build demo of this feature and was wondering how we can specify .ConfigureAwait(false) for async disposal and async enumeration?

@TylerBrinkley

This comment has been minimized.

Show comment
Hide comment
@TylerBrinkley

TylerBrinkley May 16, 2018

Nevermind, it appears this is being considered in the proposal.

Nevermind, it appears this is being considered in the proposal.

@svick

This comment has been minimized.

Show comment
Hide comment
@svick

svick May 24, 2018

Contributor

From May 21 2018 LDM notes:

foreach await over dynamic

Block it. For synchronous foreach we resort to the nongeneric IEnumerable, but there is no nongeneric IAsyncEnumerable, and there won't be.

Since the IAsyncEnumerable interface is going to be covariant, would it make sense if foreach await over dynamic worked with IAsyncEnumerable<object>? It would work only for async enumerables of reference types, but maybe that's better than nothing?

Contributor

svick commented May 24, 2018

From May 21 2018 LDM notes:

foreach await over dynamic

Block it. For synchronous foreach we resort to the nongeneric IEnumerable, but there is no nongeneric IAsyncEnumerable, and there won't be.

Since the IAsyncEnumerable interface is going to be covariant, would it make sense if foreach await over dynamic worked with IAsyncEnumerable<object>? It would work only for async enumerables of reference types, but maybe that's better than nothing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment