Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Public APIs for C# 8 async streams #27547

Closed
stephentoub opened this issue Oct 5, 2018 · 76 comments
Closed

Proposal: Public APIs for C# 8 async streams #27547

stephentoub opened this issue Oct 5, 2018 · 76 comments
Assignees
Labels
api-approved API was approved in API review, it can be implemented area-System.Runtime.CompilerServices
Milestone

Comments

@stephentoub
Copy link
Member

We're at a point where we should start exposing the necessary library support for C# 8 async streams, aka async enumerables, aka async iterators. While we may find ourselves wanting to make further changes to these APIs, we've locked on what we plan to deliver initially, and we can adapt as necessary based on feedback.

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        ValueTask<bool> MoveNextAsync();
        T Current { get; }
    }
}

namespace System.Runtime.CompilerServices
{
    public struct AsyncIteratorMethodBuilder
    {
        public static AsyncIteratorMethodBuilder Create();

        public void MoveNext<TStateMachine>(ref TStateMachine stateMachine)
            where TStateMachine : IAsyncStateMachine;

        public void AwaitOnCompleted<TAwaiter, TStateMachine>(
            ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : INotifyCompletion
            where TStateMachine : IAsyncStateMachine;

        public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
            ref TAwaiter awaiter, ref TStateMachine stateMachine)
            where TAwaiter : ICriticalNotifyCompletion
            where TStateMachine : IAsyncStateMachine;

        public void Complete();
    }
}

Assembly:
We need to decide where we put these. My expectation is .NET Core we'd have all of this in System.Private.CoreLib, but we could also put it in System.Threading.Tasks.Extensions a new assembly. If we want to OOB this, it could be in both, with the .NET Core S.T.T.Extensions the new assembly just type forwarding to S.P.CoreLib as happens today for ValueTask.

A few notes:

  • Alternative WaitForNextAsync+TryGetNext current design. We spent a lot of time exploring this attractive alternative. It has perf advantages, but also non-trivial additional complexity when using the APIs directly rather than through compiler support (e.g. foreach await). We also have a design where we could light-up with the alternative API should we find that the perf benefits are desirable enough to introduce a second interface. As such, we're going with the simpler, more familiar, and easier to work with MoveNextAsync+Current design.
  • Thread safety. While these APIs are for asynchrony, that doesn't imply an instance of an enumerator can be used concurrently. Async enumerators are explicitly not thread safe, in terms of there being no way to correctly used MoveNextAsync+Current in an atomic fashion (without higher-level locking that provided the thread safety), but also in terms of accessing MoveNextAsync again before having consumed a previous calls ValueTask; doing so is erroneous and has undefined behavior. So, too, is accessing DisposeAsync after having called MoveNextAsync but without having consumed its ValueTask.
  • ValueTask instead of Task. The original design called for MoveNextAsync to return Task<bool> and DisposeAsync to return Task. That works well when MoveNextAsync and DisposeAsync complete synchronously, but when they complete asynchronously, it requires allocation of another object to represent the eventual completion of the async operation. By instead returning ValueTask<bool> and ValueTask, an implementation can choose to implement IValueTaskSource and reuse the same object repeatedly for one call after another; in this fashion, for example, the compiler-generated type that's returned from an iterator can serve as the enumerable, as the enumerator, and as the promise for every asynchronously-completing MoveNextAsync and DisposeAsync call made on that enumerator, such that the whole async enumerable mechanism can incur overhead of a single allocation.
  • Cancellation. Cancellation is provided external to the interfaces. If you want operations on the enumerable to be cancelable, you need to pass a CancellationToken to the thing creating the enumerable, such that the token can be embedded into the enumerable and used in its operation. You can of course choose to cancel awaits by awaiting something that itself represents both the MoveNextAsync ValueTask<bool> and a CancellationToken, but that would only cancel the await, not the underlying operation being awaited. As for IAsyncDisposable, while in theory it makes sense that anything async can be canceled, disposal is about cleanup, closing things out, freeing resources, etc., which is generally not something that should be canceled; cleanup is still important for work that's canceled. The same CancellationToken that caused the actual work to be canceled would typically be the same token passed to DisposeAsync, making DisposeAsync worthless because cancellation of the work would cause DisposeAsync to be a nop. If someone wants to avoid being blocked waiting for disposal, they can avoid waiting on the resulting ValueTask, or wait on it only for some period of time.
  • Naming. Async is used in the various method names (e.g. DisposeAsync) even though the type also includes Async so that a type might implement both the synchronous and asynchronous counterparts and easily differentiate them.
  • AsyncIteratorMethodBuilder. The compiler could get away with using the existing AsyncTaskMethodBuilder or AsyncVoidMethodBuilder types, but these both have negative impact that can be avoided by using a new, specially-designed type. AsyncTaskMethodBuilder allocates a Task to represent the async method, but that Task goes unused in an iterator. AsyncVoidMethodBuilder interacts with SynchronizationContext, because async void methods need to do so (e.g. calling OperationStarted and OperationCompleted on the current SynchronizationContext if there is one). And some of the methods are poorly named, e.g. Start makes sense when talking about starting an async method, but not when talking about iterating with an iterator. As such, we introduce a new type tailored to async iterators. Create just returns a builder that the compiler can use, which is likely just default(AsyncIteratorMethodBuilder), but the method might also do additional optional work, like tracing. MoveNext pushes the state machine forward, effectively just calling stateMachine.MoveNext(), but doing so with the appropriate handling of ExecutionContext. AwaitOnCompleted and AwaitUnsafeOnCompleted are exactly what they are on the existing builders. And Complete just serves to notify the builder that the iterator has finished iterating: technically this isn't necessary, and it may just be a nop, but it gives us a hook to be able to do things like tracing/logging should we choose to do so. (We could decide not to include this.)
  • Implementing sync and async interfaces. It's fine for a single type to implement both the sync and async counterparts, e.g. IEnumerable<T> and IAsyncEnumerable<T>. When consuming manually, the developer can easily distinguish which is being used based on naming (e.g. GetEnumerator vs GetAsyncEnumerator), and when consuming via the compiler, the compiler will provide syntax for differentiation (e.g. foreach vs foreach await).

What else will we want?
I've opened several additional issues to cover related support we'll want to consider:

  • Extensions for ConfigureAwait (https://github.com/dotnet/corefx/issues/32684). The compiler will support pattern-based foreach await in addition to binding to the interface, and we can use that to enable the implicit awaits on MoveNextAsync and DisposeAsync to be done using ConfigureAwait(false) by having an extension method like public static ConfiguredAsyncEnumerable<T> ConfigureAwait<T>(this IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext), where the returned ConfiguredAsyncEnumerable<T> will propagate that through to a ConfiguredAsyncEnumerator<T>, and its MoveNextAsync and DisposeAsync will return ConfiguredValueTaskAwaitables.
  • ManualResetValueTaskSource (https://github.com/dotnet/corefx/issues/32664). If MoveNextAsync and DisposeAsync were defined to return Tasks, then the compiler could use TaskCompletionSource<T> to implement those async operations. But as it's returning ValueTask, and as we're doing so to enable object reuse, the compiler will be using its own implementation of IValueTaskSource. To greatly simplify that and to encapsulate all of the relevant logic, we should productize the ManualResetValueTaskSource/ManualResetValueTaskSourceLogic helper type from https://github.com/dotnet/corefx/blob/master/src/Common/tests/System/Threading/Tasks/Sources/ManualResetValueTaskSource.cs.
  • IAsyncDisposable implementations (https://github.com/dotnet/corefx/issues/32665). With IAsyncDisposable exposed, we'll want to implement on a variety of types in coreclr/corefx where the type could benefit from having an asynchronous disposable capability in addition to an existing synchronous ability.

cc: @jcouv, @MadsTorgersen, @jaredpar, @terrajobst, @tarekgh, @kouvel

@tarekgh
Copy link
Member

tarekgh commented Oct 5, 2018

would be nice if you include some sample code for the usage.

@stephentoub
Copy link
Member Author

stephentoub commented Oct 5, 2018

e.g. (subject to decisions around language syntax)

static async IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(
    IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using await (IAsyncEnumerator<TFirst> e1 = first.GetAsyncEnumerator())
    using await (IAsyncEnumerator<TSecond> e2 = second.GetAsyncEnumerator())
    {
        while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
            yield return resultSelector(e1.Current, e2.Current);
    }
}

@jnm2
Copy link
Contributor

jnm2 commented Oct 5, 2018

Too late to include these APIs in .NET Standard 2.1?

@clairernovotny
Copy link
Member

@stephentoub Did we determine what was happening with Ix.NET Async from dotnet/reactive? Especially with the Impl here:

dotnet/reactive#423

Are you taking that code and putting it here in corefx? What about the existing package, etc? We should likely have a discussion on this?

@stephentoub
Copy link
Member Author

stephentoub commented Oct 5, 2018

Are you taking that code and putting it here in corefx? What about the existing package, etc? We should likely have a discussion on this?

When we've (you, me, @MadsTorgersen, etc.) talked about this previously, my understanding of the plan was that it would continue to live in reactive, and it would be updated based on the actual APIs shipping in the platform. I don't believe anything has changed there. Has it?

@jaredpar
Copy link
Member

jaredpar commented Oct 5, 2018

@jnm2

Too late to include these APIs in .NET Standard 2.1?

Yes. Particularly because the associated language feature isn't complete yet, and isn't intended to be complete until the .NET Core 3.0 time frame. Need to ship the API + language feature at the same time so they can evolve together.

@clairernovotny
Copy link
Member

clairernovotny commented Oct 5, 2018

@stephentoub That's fine with me, it just wasn't clear if that was still the case since it has been a while. Once the core interfaces/compiler work is somewhere we can pull from a private feed, we can update that branch to use that interface and match the specs. Also happy to give direct commit access to any member on the team that needs it.

@stephentoub
Copy link
Member Author

Thanks, @onovotny. @MadsTorgersen, this is all still correct, right?

@jcouv
Copy link
Member

jcouv commented Oct 5, 2018

@stephentoub shared a code sample above using high-level primitives.
Here's a link to a lowered version of such an async-iterator method: https://gist.github.com/jcouv/ae7800985e3a8700bb84c6650d25bb69
This is still work-in-progress, but it shows how the new builder type would be called.

public class C
{
    public static async System.Collections.Generic.IAsyncEnumerable<int> M()
    {
        await System.Threading.Tasks.Task.CompletedTask;
        yield return 4;
    }
}

@akarnokd
Copy link

akarnokd commented Oct 5, 2018

I have a couple of questions:

  1. Do I have to defend against reentrant MoveNextAsync invocations? I.e., when the ValueTask<bool> completes and the continuation would call MoveNextAsync again?

  2. DisposeAsync() must be called after MoveNextAsync's ValueTask<bool> completes and before the next MoveNextAsync would be invoked, right? If I wanted to implement a Timeout operator (with time) over this, how can I stop the main IAsyncEnumerator safely while the current ValueTask<bool> is not yet complete?

  3. Cancellation must be external (via CancellationToken for example). How can I inject it to a specific instance returned by IAsyncEnumerable.GetAsyncEnumerator? In other words, how can I cancel individual runs of the IAsyncEnumerable?

  4. If the cancellation has to be established on the source generator, how can I compose LINQ-style operators over such source and carry the cancellation support along with it?

  5. Is it mandatory to call (and await) DisposeAsync() when the MoveNextAsync's ValueTask<bool> completes with false, indicating no more values?

  6. If the ValueTask<bool> completes with false, should I defend against another invocation of MoveNextAsync and if so, what should I do: a) keep returning a false ValueTask, b) throw an invalid operation exception, c) have the ValueTask failing with the exception?

@jkotas
Copy link
Member

jkotas commented Oct 5, 2018

we could also put it in System.Threading.Tasks.Extensions. If we want to OOB this, it could be in both, with the .NET Core S.T.T.Extensions just type forwarding to S.P.CoreLib as happens today for ValueTask.

System.Threading.Tasks.Extensions shipped inbox on at least one platform. IIRC, the rules for OOBing that we have agreed on some time ago say that you cannot add to the OOB assembly once it shipped inbox on at least one platform.

@stephentoub
Copy link
Member Author

System.Threading.Tasks.Extensions shipped inbox on at least one platform. IIRC, the rules for OOBing that we have agreed on some time ago say that you cannot add to the OOB assembly once it shipped inbox on at least one platform.

Hmm. I had it in my head that you couldn't modify/augment APIs of any types that had shipped in such assemblies, but I hadn't internalized that you couldn't add additional unrelated types to them, either. What's the cause of that? @weshaggard, @terrajobst, is that actually the case such that if we wanted to go partial OOB here we'd need to introduce a new assembly?

@stephentoub
Copy link
Member Author

Do I have to defend against reentrant MoveNextAsync invocations? I.e., when the ValueTask completes and the continuation would call MoveNextAsync again?

You need to expect that code will run synchronously as part of the completion of the ValueTask, so whatever code is completing the ValueTask needs to be ok with arbitrary code running as part of that, including another invocation of MoveNextAsync. If it's not ok with that, it needs to move the completion to a place where it is ok with that, which could include queueing it. For example, TaskCompletionSource<T> can be constructed to RunContinuationsAsynchronously, so if you were handing back a ValueTask that wrapped a TCS Task, you could set that option and then it wouldn't matter where you called {Try}Set{Result/Exception/Canceled}, at the expense of the continuation getting queued. I expect/hope that ManualResetValueTaskSource{Logic} will provide a similar option in some fashion.

DisposeAsync() must be called after MoveNextAsync's ValueTask completes and before the next MoveNextAsync would be invoked, right?

Yes. An implementation of the interfaces is not guaranteed to be thread-safe, and most will not be. So you should not make a second asynchronous call on the object while a first is still in progress.

If I wanted to implement a Timeout operator (with time) over this, how can I stop the main IAsyncEnumerator safely while the current ValueTask is not yet complete?

You can't:
https://blogs.msdn.microsoft.com/pfxteam/2012/10/05/how-do-i-cancel-non-cancelable-async-operations/
And see my comments on Cancellation in my original post. You can cancel or timeout your action of await'ing, but you can't influence the implementation you're awaiting unless you own it as well or unless it provides a means for doing so, such as accepting a CancellationToken into whatever method/ctor created the async method. If you cancel/timeout your await, then you shouldn't interact with that object again, and just let it get GC'd and any objects appropriately finalized.

Cancellation must be external (via CancellationToken for example). How can I inject it to a specific instance returned by IAsyncEnumerable.GetAsyncEnumerator? In other words, how can I cancel individual runs of the IAsyncEnumerable?

You can't. It'd be part of the enumerable, or else you can choose to cancel between awaits on your own. The alternative is to take a CancellationToken into MoveNextAsync, which does not compose well, in particular with language support, and which can add a lot of cost.

If the cancellation has to be established on the source generator, how can I compose LINQ-style operators over such source and carry the cancellation support along with it?

The cancellation would be built into the source. Any LINQ operations over it would implicitly inherit that as part of their awaiting on that source. If you wanted to further inject cancellation into individual query operators, then those query operators should accept a token as well, e.g. .FooAsync(..., token).BarAsync(..., token).

There's a whole discussion of cancellation at https://github.com/dotnet/csharplang/blob/master/proposals/async-streams.md#cancellation.

Is it mandatory to call (and await) DisposeAsync() when the MoveNextAsync's ValueTask completes with false, indicating no more values?

It's never mandatory to call Dispose, and it's never mandatory to call DisposeAsync. But it's a good idea to. And just as you'd want to call Dispose on an IEnumerator<T> today after its MoveNext returns false, you'd want to call DisposeAsync on an IAsyncEnumerator<T> after its MoveNextAsync hands back false.

If the ValueTask completes with false, should I defend against another invocation of MoveNextAsyncand if so, what should I do: a) keep returning a false ValueTask, b) throw an invalid operation exception, c) have the ValueTask failing with the exception?

Yes, it's valid (though uncommon) to do so. With IEnumerator<T>, calling MoveNext after you've already reached the end of the iteration is documented to return false. The same will be true for MoveNextAsync.

@jkotas
Copy link
Member

jkotas commented Oct 5, 2018

I hadn't internalized that you couldn't add additional unrelated types to them, either. What's the cause of that?

The OOB would need to target a netstandard. If it does not target a netstandard, it is platform specific and we do not need unified type identity. It can be one type identity for netfx and different one for netcoreapp, etc. You can ship OOB like that, but it has limited value.

If the OOB targets netstandard, it should better work on all platforms that support netstandard, including .NET Core 2.1 where this shipped inbox. The problem is which implementation are you going to pick when somebody references the OOB in .NET Core 2.1. It cannot be the inbox implementation because of it is missing the new methods; and it cannot be the OOB implementation because of it would introduce second copy of the inbox types.

@stephentoub
Copy link
Member Author

Thanks, @jkotas. That makes sense. We've made a bit of a mess, haven't we ;)

Ok, so if we decide we need to go OOB for platforms other than .NET Core 3, we'll need to introduce a new assembly.

@clairernovotny
Copy link
Member

@stephentoub Shot in the dark, and subject to API/design reviews anyway, but @bartdesmet split up IxAsync into System.Linq.Async and System.Interactive.Async.

Perhaps that's a good place for those interfaces?
https://github.com/dotnet/reactive/tree/IxAsyncCSharp8/Ix.NET/Source/System.Linq.Async/System/Collections/Generic

@stephentoub
Copy link
Member Author

Perhaps that's a good place for those interfaces?

Can you clarify what you mean? The interfaces are going to ship in .NET Core, not as part of an external library. If you mean if we decide to ship something OOB, sure, we could consider using such a name for an assembly. Though OOB isn't the main plan right now; I only mentioned it as an off-hand thing... the delivery vehicle here is .NET Core 3.

@clairernovotny
Copy link
Member

@stephentoub I guess I mean that the assembly could/should be part of .NET Core. Ix Async is in grey area already and EF Core depends on it today.

@akarnokd
Copy link

akarnokd commented Oct 5, 2018

Thanks.

I have a question regarding the ValueTask. Is it more efficient to have a constant ValueTask of true/false than having a constant Task.FromResult of true/false in synchronous return cases of MoveNextAsync?

@stephentoub
Copy link
Member Author

I have a question regarding the ValueTask. Is it more efficient to have a constant ValueTask of true/false than having a constant Task.FromResult of true/false in synchronous return cases of MoveNextAsync?

ValueTask<T> is just a struct, and constructing it from a T just initializes that struct's fields:
https://github.com/dotnet/coreclr/blob/27c848e37e9998142b60e776cf5b5d08a3543fe1/src/System.Private.CoreLib/shared/System/Threading/Tasks/ValueTask.cs#L474-L481
so it's very cheap to just do return new ValueTask<bool>(true) or return new ValueTask<bool>(false). I'd expect that to be as cheap (or cheaper) as reading/copying from a static containing a "cached" version, but you'd need to measure to see for sure.

@slang25
Copy link
Contributor

slang25 commented Oct 6, 2018

the delivery vehicle here is .NET Core 3.

Does that mean that the interfaces will be shipped in a separate package and then be shipped in .NET Core 3.0, a bit like how ValueTuple (and Span<T>) were introduced? Or am I confused?

@jkotas
Copy link
Member

jkotas commented Oct 7, 2018

the interfaces will be shipped in a separate package and then be shipped in .NET Core 3.0, a bit like how ValueTuple (and Span) were introduced

We have found that trying to patch the existing runtimes with new core features via NuGet package like we have done with ValueTuple always results in sub-par experience. We have tried that many times. Based on the discussions about this, I believe we are going to prioritize the long-term sustainability of the platform over patching the existing runtimes with new core features in creative ways.

For this feature, it would mean:

Ship async streams in .NET Core 3.0 only first in its natural place, without introducing a special little assembly for it. You want to be on .NET Core 3.0 to get the first class async stream experience.

Once it ships in .NET Core, these is an option to ship a best-effort standalone NuGet package to provide async streams for earlier .NET versions. This package can be shipped from corefx or even shipped by community (like https://www.nuget.org/packages/AsyncBridge/). You may use this NuGet package if you desperately need async streams on .NET Framework or earlier .NET Core versions and you can live with the sub-par experience that it provides.

@stephentoub stephentoub self-assigned this Oct 7, 2018
@stephentoub stephentoub changed the title Public APIs for C# 8 async streams Proposal: Public APIs for C# 8 async streams Oct 7, 2018
@slang25
Copy link
Contributor

slang25 commented Oct 7, 2018

I remember when async was first introduced and there was the Microsoft.Bcl.Async package which did a similar job to AsyncBridge.

Perhaps the types could be shipped with Ix.Async for those lower "unsupported" TFMs? The community will want to fill the gap, so this seems like a sensible home for it.

@GSPP
Copy link

GSPP commented Oct 9, 2018

Alternative WaitForNextAsync+TryGetNext

I have seen many hard-core optimizations around tasks and await on this issue tracker. It seems that over time performance has become a significant priority. Maybe it is prudent to directly skip to the fastest design possible.

This should be seen as low-level machinery that is only touched by experts. Normal users use library functions and language support. Similarly it is very rare that normal C# users must touch enumerators manually.

I think it's correct that performance was so much prioritized in all the other measures described.

@stephentoub
Copy link
Member Author

stephentoub commented Oct 9, 2018

Maybe it is prudent to directly skip to the fastest design possible.

I understand the position; this was my original stance as well. I'm the one who suggested the alternative of WaitForNextAsync+TryGetNext, and then I'm also the one who recently re-raised the issue and suggested we might want to revert back to MoveNextAsync+Current, even after @jcouv had mostly implemented the WaitForNextAsync+TryGetNext approach in the compiler.

There are several concerns with the alternative. First, from a theoretical perspective, I have a general problem with APIs that launch asynchronous work but don't give you back a handle to it, yet that's exactly what TryGetNext can do. This means you invoke TryGetNext, it may return false, and if it does, it's possible there's asynchronous work now happening in the background, and thus you must call WaitForNextAsync again in order to get the promise that represents the eventual completion of that work. It adds non-trivially to the complexity.

Second, there's something very nice about maintaining direct correspondence with the existing synchronous interface, which is something that's been around for a long time and that many people are familiar with.

Third, it's easy to get wrong. Yes, we expect the majority of consumption to be via foreach await, but there are absolutely times when you need to drop down to the interfaces. Just as an example, I found one of the simplest LINQ implementations using MoveNext+Current and looked at what it would take to implement with both approaches. For reference, here's the synchronous implementation:

private static IEnumerable<TResult> ZipIterator<TFirst, TSecond, TResult>(IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using (IEnumerator<TFirst> e1 = first.GetEnumerator())
    using (IEnumerator<TSecond> e2 = second.GetEnumerator())
    {
        while (e1.MoveNext() && e2.MoveNext())
            yield return resultSelector(e1.Current, e2.Current);
    }
}

Converting that to use the MoveNextAsync+Current API is trivial (to the point where it could likely be easily automated):

private static async IAsyncEnumerable<TResult> ZipIterator<TFirst, TSecond, TResult>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using await (IAsyncEnumerator<TFirst> e1 = first.GetAsyncEnumerator())
    using await (IAsyncEnumerator<TSecond> e2 = second.GetAsyncEnumerator())
    {
        while (await e1.MoveNextAsync() && await e2.MoveNextAsync())
            yield return resultSelector(e1.Current, e2.Current);
    }
}

Not so much with the alternative. First, we can take an approach where we replace MoveNextAsync with WaitForNextAsync+TryGetNext, e.g.

private static async IAsyncEnumerable<TResult> ZipIterator<TFirst, TSecond, TResult>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using await (IAsyncEnumerator<TFirst> e1 = first.GetEnumerator())
    using await (IAsyncEnumerator<TSecond> e2 = second.GetEnumerator())
    {
        while (true)
        {
            if (!await e1.WaitForNextAsync()) break;
            TFirst currentFirst = e1.TryGetNext(out bool success);
            if (!success) break;
        
            if (!await e2.WaitForNextAsync()) break;
            TSecond currentSecond = e1.TryGetNext(out bool success);
            if (!success) break;
        
            yield return resultSelector(currentFirst, currentSecond);
        }
    }
}

This works, but it's more complicated, isn't as easily proven correct, and isn't any more efficient than the MoveNextAsync+Current approach, so the extra complexity isn't buying us anything. The benefit of WaitForNextAsync+TryGetNext is it allows us to have an inner loop using just TryGetNext so that we can have one interface call rather than two when items are yielded synchronously. So, what if we wanted to get those benefits? We could try what feels like a natural translation:

private static async IAsyncEnumerable<TResult> ZipIterator<TFirst, TSecond, TResult>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using await (IAsyncEnumerator<TFirst> e1 = first.GetEnumerator())
    using await (IAsyncEnumerator<TSecond> e2 = second.GetEnumerator())
    {
        while (true)
        {
            if (!await e1.WaitForNextAsync()) break;
            if (!await e2.WaitForNextAsync()) break;
            while (true)
            {
                TFirst currentFirst = e1.TryGetNext(out bool success);
                if (!sucess) break;

                TSecond currentSecond = e2.TryGetNext(out success);
                if (!sucess) break;

                yield return resultSelector(currentFirst, currentSecond);
            }
        }
    }
}

but this is actually buggy, which may or may not be immediately obvious: if e1.TryGetNext returns true but then e2.TryGetNext returns false, we will have consumed an element from e1 and not from e2, such that subsequent retrievals will be out of sync between the two enumerables. To fix that while still retaining the potential for one interface call per element, we have to get non-trivially more complex:

private static async IAsyncEnumerable<TResult> ZipIterator<TFirst, TSecond, TResult>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using await (IAsyncEnumerator<TFirst> e1 = first.GetEnumerator())
    using await (IAsyncEnumerator<TSecond> e2 = second.GetEnumerator())
    {
        while (true)
        {
            if (!await e1.WaitForNextAsync()) break;
            if (!await e2.WaitForNextAsync()) break;
            while (true)
            {
                TFirst currentFirst = e1.TryGetNext(out bool success);
                if (!success) break;
        
                TSecond currentSecond;
                while (true)
                {
                    currentSecond = e1.TryGetNext(out success);
                    if (success) break;
                    if (!await e2.WaitForNextAsync()) return;
                }

                yield return resultSelector(currentFirst, currentSecond);
            }
        }
    }
}

So, doable, but really easy to get wrong, and difficult to prove is right. This relies on being able to call WaitForNextAsync multiple times without intervening TryGetNexts and having it advance only once: that would be a documented guarantee, but it's yet another thing you need to understand and really internalize.

Fourth, similar complexity exists if you're manually implementing an IAsyncEnumerable<T>, and while that's even less common, it's still something to keep in mind. Such an implementation needs to keep track of additional state beyond that which is necessary for MoveNextAsync+Current, such as whether TryGetNext launched an asynchronous operation such that WaitForNextAsync should "simply" return a promise for the launched work rather than itself moving the state machine forward. This also means the compiler-generated state machine is larger and more complex than it already would be.

Finally, the perf benefits are there (at most saving one interface call per element), but they generally pale in comparison to any kind of I/O being done. You can demonstrate the perf benefits with microbenchmarks... it's much harder to demonstrate with anything real in an app.

I agree it's painful to leave any perf potential on the table. Part of me doesn't like the other part of me that's pushing for the simple approach. But I also believe the advantages of the simple approach outweigh the cons. And if we find that there really are important workloads that would benefit from the advanced interfaces, we've convinced ourselves that they can be introduced in a light-up fashion that would enable those workloads to retain 90% of the benefits.

cc: @vancem, @migueldeicaza, @MadsTorgersen

@buybackoff
Copy link

It's fine for a single type to implement both the sync and async counterparts, e.g. IEnumerable and IAsyncEnumerable.

In my library I already use async streams for a while and implement both IEnumerator and IAsyncEnumerator so that they could reuse Current. I had to change the contract for IEnumerator.MoveNext so that if it returns false the enumerator is still valid and Current is defined as the value as of last true move.

To the point of performance discussion, another contract of MoveNext is that:

when MoveNext() returns false it means that there are no more elements right now,
and a consumer must call MoveNextAsync() and await a new element, or spin
and repeatedly call IEnumerator.MoveNext when a new element is expected very soon. Repeated calls to MoveNext() could eventually return true.

The idea is that MoveNext could return data as fast as possible and all possible optimizations should be done for the synchronous case. Even wrapping a result in ValueTask, calling any unneeded method or checking any condition could be slower than just trying to get a value from an internal buffer and return it, preferably inlining everything along the way.

Finally, the perf benefits are there (at most saving one interface call per element), but they generally pale in comparison to any kind of I/O being done.

If there are no more values available instantly in memory then some IO is required and all micro-optimizations are already quite meaningless. The design proposed here with MoveNextAsync: void -> ValueTask<bool> is much more convenient and additionally allows to write most of the code as if it is synchronous without any special way of handling async. Then adding await is trivial.

Composition of synchronous enumerators is also much simpler (LINQ vs Ix style) and availability of new data could be propagated via a side channel, while complex composed operations could be micro-optimized without touching async machinery. My MoveNextAsync implementation just awaits for a signal that some new data is available in root data sources and then just calls any complex tree of sync calculations that use synchronous MoveNext. That would be possible as well with the alternative design with TryGetNext/WaitForNextAsync, which kind of does exactly what my contract with combined IEnumerator/IAsyncEnumerator, but will be much more complex.

I believe async streams should not try to replace their sync counterparts when performance matters because they will always be slower. Max performance in terms of method inlining and similar micro optimizations should be outside the scope of async streams. They should not allocate and that is the must and already done. And they should have a quick path probing if a value is already available but mostly to avoid jumping to thread pool and yielding, especially on Windows with 15 msec thread time slice. Pragmatically there is almost no such new data that could arrive faster than a couple of virtual/interface calls.

@vancem
Copy link
Contributor

vancem commented Oct 9, 2018

I do like the general approach ultimately taken. We chose the simplest design, with the fallback of adding an advanced version that can improve performance, but we don't do it until it is clear that the extra perf can actually be realized and is valuable in real scenarios. It is a low risk strategy, which is good.

@quinmars
Copy link

quinmars commented Oct 9, 2018

First of all I really appreciate that you found a way to keep a simple interface with acceptable performance characteristics. Out of curiosity or nitpicking, shouldn't a Zip implementation await the next items in parallel?

private static async IAsyncEnumerable<TResult> ZipIterator<TFirst, TSecond, TResult>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
{
    using await (IAsyncEnumerator<TFirst> e1 = first.GetAsyncEnumerator())
    using await (IAsyncEnumerator<TSecond> e2 = second.GetAsyncEnumerator())
    {
        while (true)
        {
            var t1 = e1.MoveNextAsync();
            var t2 = e2.MoveNextAsync();
            if (!await t1 | !await t2)
                yield break;
            yield return resultSelector(e1.Current, e2.Current);
       }
    }
}

@stephentoub
Copy link
Member Author

stephentoub commented Nov 6, 2018

Otherwise, I'm not understanding?

I'm talking about using iterator support, i.e. yield return.

If I implement the whole thing myself, then sure, I can do exactly what the compiler does and return this from GetAsyncEnumerator.

But if I implement my own IAsyncEnumerable<T> class:

internal sealed class MyAsyncEnumerable<T> : IAsyncEnumerable<T>
{
    ...
}

that then uses iterator support to implement GetAsyncEnumerator, e.g.

public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
    ...
    yield return await ...;
    ...
}

then I can't do that.

That was the point I was trying to make. If you want to implement your IAsyncEnumerable<T> with an iterator, and if we lack the ability to access the CancellationToken supplied to GetAsyncEnumerator from inside of the IAsyncEnumerable<T> iterator, then you can still achieve it with some more boilerplate by writing your own async enumerable class that has its GetAsyncEnumerator implemented with iterator support... but in doing so, you can no longer apply the return this optimization. The compiler is going to allocate a new instance for that GetAsyncEnumerator call: it has zero knowledge of the custom async enumerable instance, which also doesn't implement the -ator interface.

@clairernovotny
Copy link
Member

Ah, ok, got it, yes, I wasn't thinking of the generated iterator, thanks.

Is that something that future language or compiler support could optimize?

With that trade-off it still seems like a worthy trade-off to have a complete solution. The current POR (option 2) concerns me deeply as it prevents me from passing IAsyncEnumerable's around in a similar manner as I do IEnumerable. The reason comes down to scope and lifetime of the cancellation...all the reasons you articulated.

@stephentoub
Copy link
Member Author

it prevents me from passing IAsyncEnumerable's around in a similar manner as I do IEnumerable

How so? You have the exact same expressivity between them. There's no way to cancel a MoveNext call on IEnumerator<T>, so how is it different?

@clairernovotny
Copy link
Member

It's different because cancellation is a thing for async operations where it's not really for non-async. I don't expect to be able to cancel an IEnumerator, I would expect to control cancellation of an IAsyncEnumerator.

@jnm2
Copy link
Contributor

jnm2 commented Nov 6, 2018

The ConcurrentZipAsync example in the Option 2 section doesn't guard against an exception in await completed. This could allow the other task to be finalized with an unobserved exception. This probably isn't important to the point being made about cancellation, other than that a common pattern I'd want when implementing these things is cancel-then-await.

@stephentoub
Copy link
Member Author

The ConcurrentZipAsync example in the Option 2 section doesn't guard against an exception in await completed

Yes. This is all email-compiled and email-tested. I hope we don't focus on that :)

@akarnokd
Copy link

akarnokd commented Nov 6, 2018

Just considering the IAsyncEnumerator API, an option would be to define a void Cancel() method, which should then propagate upwards to the source-like asnyc enumerators. It is then up to the chain and sources to turn them into the relevant CancellationTokenSource.Cancel() calls when needed, otherwise it could be left as no-op or pass-through. This way, no registration/deregistration dance would be needed on an async enumerator level just to get the cancellation notification.

I have no idea how the foreach/using could work with it, or if it even should. The closest analogous problem I can remember is from Ix.NET where the Finally operator has to intercept the Dispose call ending the iteration which is inaccessible in a yield return style method implementation afaik. The operator had to be custom, hand-crafted IEnumerator.

@stephentoub
Copy link
Member Author

stephentoub commented Nov 6, 2018

It's different because cancellation is a thing for async operations where it's not really for non-async. I don't expect to be able to cancel an IEnumerator, I would expect to control cancellation of an IAsyncEnumerator.

To play devil's advocate:

  • There are plenty of asynchronous things that aren't cancelable (or even if they are, that don't respond to a cancellation request, or that don't respond in a timely fashion... the nature of a cooperative model).
  • Even if the model is changed, there are zero guarantees that the implementation respects cancellation. In fact, one might argue it'll make it even less likely to respect it, at least for IAsyncEnumerable<T> implemented with iterator support, because the CancellationToken won't be in the signature, and you have to know to go looking for the CancellationToken somewhere else.
  • Plenty of MoveNext operations take a non-trivial amount of time, do networking operations, etc. So while you might expect it to be more likely to be able to cancel a MoveNextAsync equivalent, that doesn't necessarily impact the desire to be able to cancel MoveNext. I've even seen people make it asynchronous (from the caller's perspective) via a Task.Run call, at which point it's still not really any more cancelable.

Just considering the IAsyncEnumerator API, an option would be to define a void Cancel() method, which should then propagate upwards to the source-like asnyc enumerators. It is then up to the chain and sources to turn them into the relevant CancellationTokenSource.Cancel() calls when needed, otherwise it could be left as no-op or pass-through. This way, no registration/deregistration dance would be needed on an async enumerator level just to get the cancellation notification.

Thanks, but I'm not seeing how that helps, and at least from my perspective it hurts:

  • At that point there's nothing to expose in the body of the iterator. At least with a CancellationToken you could come up with a language mechanism to expose the CancellationToken, as previously elaborated on.
  • The vast majority of the rest of the .NET ecosystem's cancellation model is implemented with CancellationToken, so to actually forward on the Cancel call to most anything else, you'd need to translate it into that model anyway.
  • Such implementations would then need to pay the cost of being able to support a Cancel call at some point in the future even if cancellation is never used. In contrast, with CancellationToken, an implementation can avoid all such costs by checking CanBeCanceled, which returns false for CancellationToken.None/default(CancellationToken).

@akarnokd
Copy link

akarnokd commented Nov 6, 2018

Indeed it is not very .NET like. All in all, I think option 2 is the least problematic from my perspective and I'll have to figure out an API to give the user a token in exchange for a async enumerable source:

zipMany(int count, Func<int, CancellationToken, IAsyncEnumerable<T>> sourceSupplier)

@stephentoub
Copy link
Member Author

Since everything else here has been done and the only known open issue is around whether we want to change something for cancellation, I've opened a separate issue just for that.
https://github.com/dotnet/corefx/issues/33338

@stephentoub
Copy link
Member Author

Closed by dotnet/corefx#33104.

@akarnokd
Copy link

akarnokd commented Nov 8, 2018

Last final question about the lifecycle of the enumerator. Is the following legal?

var source = asyncSource.GetAsyncEnumerator();
if (!shouldWeContinue) {
   await source.DisposeAsync();
   return;
}

I.e., not calling MoveNextAsync at all but just disposing right away?

@stephentoub
Copy link
Member Author

Yup, that's legal. Just as it is for IEnumerator<T>, there's no requirement for IAsyncEnumerator<T> that you must call MoveNext{Async}.

@tmds
Copy link
Member

tmds commented Nov 9, 2018

@stephentoub I read your interesting blog post about ValueTask. It left me with the question: why MoveNextAsync() returns a ValueTask<bool>. Since Task<bool> is cached for completed, what advantages does the ValueTask bring here?

@tmds
Copy link
Member

tmds commented Nov 9, 2018

Answering my own question: this is to be able to reduce allocations when the MoveNextAsync() doesn't complete synchronously (re-using the IValueTaskSource).

@stephentoub
Copy link
Member Author

this is to be able to reduce allocations when the MoveNextAsync() doesn't complete synchronously

Corrrect. In the normal case, we just allocate an object that is an IAsyncEnumerable<T>, IAsyncEnumerator<T>, IValueTaskSource<bool>, and IValueTaskSource, and that one object is then what's handed out when you invoke the iterator method, when you call its GetAsyncEnumerator method, when you invoke its MoveNextAsync and it completes asynchronously, and when you invoke its DisposeAsync and it completes asynchronously. Further, the runtime is generally able to use that same object as the continuation that's stored into an awaited Task or ValueTask. So you end up with essentially one (maybe two) allocations of overhead for the entire async iteration.

@SeanFarrow
Copy link

I apologise if this is handled in another issue, but I'm unable to find anything!

Is there a list of existing BCL types that will get API's returning IAsyncEnumerables in .Net Core 3? I'm thinking of things such as File.ReadAllLines etc?

@stephentoub
Copy link
Member Author

Is there a list of existing BCL types that will get API's returning IAsyncEnumerables in .Net Core 3? I'm thinking of things such as File.ReadAllLines etc?

Very few. Right now just a new method in the channels library was added, and I would not expect any others in 3.0.

@SeanFarrow
Copy link

SeanFarrow commented Mar 29, 2019 via email

@stephentoub
Copy link
Member Author

Questions that come to mind are things like should I use a StreamReader as per .Net Core 2.x and yield return, or should I derive from IAsyncEnumerable.

IAsyncEnumerable isn't any different in this regard from IEnumerable. If you would find the programming model of using an iterator to access lines from the stream valuable, then you'd expose it via I{Async}Enumerable.

Is there any public discussion around what will be added in future?

We're not planning to proactively add a ton of IAsyncEnumerable implementations, and will instead be demand driven here. If there's a particular API you'd like to see, please feel free to propose something by opening an issue.
https://github.com/dotnet/corefx/blob/master/Documentation/project-docs/api-review-process.md

Ok, are there going to be examples to show how to do things like read from streams using IAsyncEnumerable?

Like this?

static async IAsyncEnumerable<string> ReadLinesAsync(StreamReader reader)
{
    string line;
    while ((line = await reader.ReadLineAsync().ConfigureAwait(false)) != null)
    {
        yield return line;
    }
}
...
await foreach (string line in ReadLinesAsync(reader))
{
    ...
}

@SeanFarrow
Copy link

SeanFarrow commented Mar 31, 2019 via email

@quinmars
Copy link

Like this?

static async IAsyncEnumerable<string> ReadLinesAsync(StreamReader reader)
{

Will IAsyncEnumerable returning methods also share the XxxxAsync naming convention athough they are not awaitable?

@stephentoub
Copy link
Member Author

Yes. The most common consumption will be via await foreach, and it serves as a similar visual indicator of where asynchrony is involved, allows for differentiation from sync enumerables, etc.

@atykhyy
Copy link

atykhyy commented Apr 27, 2019

Converting my existing APIs to use async streams, quite a lot of places have cropped up where an IAsyncEnumerable<T> is a poor fit semantically because it has to represent the result of a one-time operation. I.e. once GetAsyncEnumerator() has been called on the new enumerable and used for enumeration, the underlying source of items is consumed and cannot be enumerated any more. Simple example: an asynchronous parser that parses a Stream into MIME parts. Nothing says that the stream has to be rewindable, and in fact usually it isn't. The choices seem to be either to have the method return an IAsyncEnumerable and document that it is not reusable, or return an IAsyncEnumerator which can be await foreached only by means of wrapping it in a dummy struct implementing the async enumerable pattern. This feels like a nasty kludge to express a simple intent.

This problem has knock-on effects on converters or processors that asynchronously enumerate items of one kind and convert/join/split them into items of a different kind. If they take an IAsyncEnumerable, the resulting enumerable may or may not be reusable depending on what is passed to the converter, which may not be obvious to the caller. If they take an IAsyncEnumerator, callers cannot use await foreach without a kludge.

The issue of how to supply the cancellation token that is discussed in dotnet/corefx#33338 and the LDM discussion linked from it also appears to be connected to the distinction between an asynchronous enumeration that can be repeated as many times as necessary (in this case supplying the cancellation token to each particular instance of the enumeration via GetAsyncEnumerator is logical, whereas supplying one to the method creating the enumerable doesn't make much sense) and a one-shot enumeration, where the situation is reversed. If users are allowed to use IAsyncEnumerator to express this intent, the issue will be mitigated and the two cancellation token problem can be avoided completely.

My ideal solution would be to allow await foreach on IAsyncEnumerators. The lifetime question brought up above is important, but it could be handled in two ways: (a) minor syntax modification, something like await foreach (var t in using EnumerateItemsAsync (...)) or await using foreach (var t in EnumerateItemsAsync (...)) with perhaps a warning message if await foreach without using is used on a return value rather than a local variable to prevent common error patterns; or (b) a ConfigureAwait-like generic extension method that would return a struct wrapping the original enumerator object and only exposing MoveNextAsync and Current, preventing await foreach from seeing the IAsyncDisposable and disposing the enumerator.

A potentially more lightweight alternative might be to allow the compiler to pick up extension GetAsyncEnumerator methods as is already done for GetAwaiter. This would permit use of await foreach on IAsyncEnumerators if a dummy extension method similar to

public static IAsyncEnumerator GetAsyncEnumerator (this IAsyncEnumerator enumerator) => enumerator ;

is in scope. The lifetime question can be handled along the lines of option (b) above.

@atykhyy
Copy link

atykhyy commented May 5, 2019

Having played for a week with the extension-method approach that I'd called kludgy above, it actually feels quite natural, and is not especially burdensome compared to IAsyncEnumerable because in library code every enumeration of an IAsyncEnumerable needs a .ConfigureAwait(false) tacked on, too. These same extension methods let me express explicitly whether a given await foreach has to dispose the enumerator or not. I have made a NuGet package with these extension methods, and retracted my syntax proposals above.

@msftgits msftgits transferred this issue from dotnet/corefx Jan 31, 2020
@msftgits msftgits added this to the 3.0 milestone Jan 31, 2020
@dotnet dotnet locked as resolved and limited conversation to collaborators Dec 15, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-approved API was approved in API review, it can be implemented area-System.Runtime.CompilerServices
Projects
None yet
Development

No branches or pull requests