New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async.Ix: Some fixes and proposed changes to operators #843
Conversation
|
||
namespace Tests | ||
{ | ||
public class Amb : AsyncEnumerableExTests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no unit test for Amb
at all.
|
||
internal static class AsyncTestEx | ||
{ | ||
internal static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> mapper) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A Select
overload exposing a cancellation token is missing from the standard API. I've added one here so I can test Amb
propagating cancellation properly.
@@ -82,7 +82,22 @@ public void Using3() | |||
|
|||
Assert.Equal(0, i); | |||
|
|||
AssertThrows<Exception>(() => xs.GetAsyncEnumerator(), ex_ => ex_ == ex); | |||
var enumerator = xs.GetAsyncEnumerator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DisposeAsync failures are somewhat peculiar as the chain may not follow through upon an intermediate crash.
{ | ||
enumerator.DisposeAsync(); // REVIEW: fire-and-forget? | ||
throw; | ||
// GetAsyncEnumerator should not throw but rather return a failing enumerator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply return a special IAsyncEnumerator that fails after the source DisposeAsync
is called with the original exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the rationale here? Why can't a call to GetAsyncEnumerator throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My undestanding is that GetAsyncEnumerator
on an arbitrary IAsyncEnumerable
should be trusted, otherwise every such call should be defended with try-catches and also make sure the resulting cleanup does everything correctly and at the right time. Besides, the main point here is to avoid "fire-and-forget".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if that's a fair assumption, but it's a good question @stephentoub, any thoughts on what the API contract is for GetAsyncEnumerator
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't be any different from IEnumerable<T>.GetEnumerator
, and to my knowledge there's no guidance about GetEnumerator not throwing, though @MadsTorgersen can correct me if he knows otherwise. That said, it's super rare for GetEnumerator to throw.
Regardless, I agree with avoiding the fire-and-forget DisposeAsync call... having such a fire-and-forget is problematic because the caller doesn't know whether it can immediately use the resources referenced by the thing that should have been disposed.
/// all source items until they are requested via MoveNextAsync. | ||
/// </summary> | ||
/// <typeparam name="TSource">The element type of the source and result.</typeparam> | ||
private sealed class ObservableToAsyncEnumerable<TSource> : IAsyncEnumerable<TSource> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete rewrite. Uses atomics instead of locks.
|
||
private void DisposeSource() | ||
{ | ||
var old = Interlocked.Exchange(ref _disposable, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deferred disposing, the usual Rx stuff.
return enumerator; | ||
} | ||
|
||
private sealed class ToObservableEnumerator : IDisposable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete rewrite. Now features all the dance around possible reentrant MoveNextAsync
s and the need for an anytime dispose to wait out a MoveNextAsync
.
/// <summary> | ||
/// A constant TaskCompletionSource already completed with true. | ||
/// </summary> | ||
public static readonly TaskCompletionSource<bool> ResumeTrue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the atomic resumption mechanism used in ToAsyncEnumerable
and in many more coordinating operators for the future.
@bartdesmet any thoughts? |
|
||
public AmbAsyncIterator(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second) | ||
public AmbEnumerable(IAsyncEnumerable<TSource> source1, IAsyncEnumerable<TSource> source2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but should the parameter names still be first
, and second
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be renamed, but this is internal stuff so doesn't matter.
_enumerator2 = _source2.GetAsyncEnumerator(_token2.Token); | ||
|
||
#pragma warning disable CS4014 // ContinueWith used | ||
_enumerator1.MoveNextAsync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can an await with a local function be used instead of a ContinueWith?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with await
is that only one thing can be awaited in a logical flow, so which one to await? The first or the second source? By awaiting one before the other, we give a guaranteed pereference to one of them thus makes Amb
obsolete. Also await Task.WhenAny
is no good either because if any of the source fails, await crashes and without an explicit winner, the operator can break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davidfowl what would you suggest here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also await Task.WhenAny is no good either because if any of the source fails, await crashes and without an explicit winner, the operator can break.
What do you mean by await crashes
? Task.WhenAny
will return a Task<Task<bool>>
here, so awaiting it will give you back the task that completed, at which point you can choose to do with it whatever you want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I overlooked the definition of Task.WhenAny
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use Task.WhenAny
, but I can't see it scale to the N-ary variant as the DisposeAsync()
has to wait for all non-winners.
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs
Outdated
Show resolved
Hide resolved
@bartdesmet Given that this is mostly your work in this branch, I'll defer merging until you're ready (or just merge whenever you're ready). |
There were a lot of recent commits conflicting with this one. I'll have to do a re-review anyway so I'm closing this. |
I did a more detailed run-through of the code and these are my proposed changes/fixes. See additional comments among the code changes below.