Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async.Ix: Some fixes and proposed changes to operators #843

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,228 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;
using Xunit;
using System.Linq;
using System.Threading.Tasks;
using System.Threading;

namespace Tests
{
public class Amb : AsyncEnumerableExTests
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no unit test for Amb at all.

{
[Fact]
public void TwoArg_First_Wins_NonEmpty()
{
var en = AsyncEnumerableEx.Amb(
AsyncEnumerable.Range(1, 5),
AsyncEnumerable.Range(6, 5).Select(async v =>
{
if (v == 6)
{
await Task.Delay(200);
}
return v;
})
).GetAsyncEnumerator();

try
{
HasNext(en, 1);
HasNext(en, 2);
HasNext(en, 3);
HasNext(en, 4);
HasNext(en, 5);
NoNext(en);

}
finally
{
en.DisposeAsync().AsTask().Wait();
}
}

[Fact]
public void TwoArg_Second_Wins_NonEmpty()
{
var en = AsyncEnumerableEx.Amb(
AsyncEnumerable.Range(1, 5).Select(async v =>
{
if (v == 1)
{
await Task.Delay(200);
}
return v;
}),
AsyncEnumerable.Range(6, 5)
).GetAsyncEnumerator();

try
{
HasNext(en, 6);
HasNext(en, 7);
HasNext(en, 8);
HasNext(en, 9);
HasNext(en, 10);
NoNext(en);
}
finally
{
en.DisposeAsync().AsTask().Wait();
}
}

[Fact]
public void TwoArg_First_Wins_Empty()
{
var en = AsyncEnumerableEx.Amb(
AsyncEnumerable.Empty<int>(),
AsyncEnumerable.Range(6, 5).Select(async v =>
{
if (v == 6)
{
await Task.Delay(200);
}
return v;
})
).GetAsyncEnumerator();

try
{
NoNext(en);
}
finally
{
en.DisposeAsync().AsTask().Wait();
}
}

[Fact]
public void TwoArg_Second_Wins_Empty()
{
var en = AsyncEnumerableEx.Amb(
AsyncEnumerable.Range(1, 5).Select(async v =>
{
if (v == 1)
{
await Task.Delay(200);
}
return v;
}),
AsyncEnumerable.Empty<int>()
).GetAsyncEnumerator();

try
{
NoNext(en);
}
finally
{
en.DisposeAsync().AsTask().Wait();
}
}

[Fact]
public void TwoArg_Cancel()
{
var cts = new CancellationTokenSource();

var en = AsyncEnumerableEx.Amb(
AsyncEnumerable.Range(1, 5).Select(async (v, t) =>
{
if (v == 1)
{
await Task.Delay(2000, t);
}
return v;
}),
AsyncEnumerable.Range(6, 5).Select(async (v, t) =>
{
if (v == 6)
{
await Task.Delay(2000, t);
}
return v;
})
).GetAsyncEnumerator(cts.Token);

try
{
en.MoveNextAsync();

Task.Delay(200).Wait();

cts.Cancel();

Task.Delay(200).Wait();
}
finally
{
Assert.True(en.DisposeAsync().AsTask().Wait(1000));
}
}
}

internal static class AsyncTestEx
{
internal static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> mapper)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Select overload exposing a cancellation token is missing from the standard API. I've added one here so I can test Amb propagating cancellation properly.

{
return new SelectTokenTask<TSource, TResult>(source, mapper);
}

private sealed class SelectTokenTask<TSource, TResult> : IAsyncEnumerable<TResult>
{
private readonly IAsyncEnumerable<TSource> _source;

private readonly Func<TSource, CancellationToken, Task<TResult>> _mapper;

public SelectTokenTask(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> mapper)
{
_source = source;
_mapper = mapper;
}

public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return new SelectTokenTaskEnumerator(_source.GetAsyncEnumerator(cancellationToken), _mapper, cancellationToken);
}

private sealed class SelectTokenTaskEnumerator : IAsyncEnumerator<TResult>
{
private readonly IAsyncEnumerator<TSource> _source;

private readonly Func<TSource, CancellationToken, Task<TResult>> _mapper;

private readonly CancellationToken _token;

public SelectTokenTaskEnumerator(IAsyncEnumerator<TSource> source, Func<TSource, CancellationToken, Task<TResult>> mapper, CancellationToken token)
{
_source = source;
_mapper = mapper;
_token = token;
}

public TResult Current { get; private set; }

public ValueTask DisposeAsync()
{
return _source.DisposeAsync();
}

public async ValueTask<bool> MoveNextAsync()
{
if (await _source.MoveNextAsync())
{
Current = await _mapper(_source.Current, _token);
return true;
}
return false;
}
}
}
}

}
Expand Up @@ -65,7 +65,7 @@ public async Task Using2()
}

[Fact]
public void Using3()
public async void Using3()
{
var ex = new Exception("Bang!");
var i = 0;
Expand All @@ -82,7 +82,22 @@ public void Using3()

Assert.Equal(0, i);

AssertThrows<Exception>(() => xs.GetAsyncEnumerator(), ex_ => ex_ == ex);
var enumerator = xs.GetAsyncEnumerator();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DisposeAsync failures are somewhat peculiar as the chain may not follow through upon an intermediate crash.


while (await enumerator.MoveNextAsync())
{
// ignore any items
}

try
{
await enumerator.DisposeAsync();
Assert.False(true, "Should have thrown!");
}
catch (Exception exc)
{
Assert.Same(exc, ex);
}

Assert.Equal(1, d);
}
Expand Down
36 changes: 33 additions & 3 deletions Ix.NET/Source/System.Interactive.Async/AsyncIterator.cs
Expand Up @@ -36,10 +36,10 @@ public IAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken token)
{
enumerator.OnGetEnumerator(token);
}
catch
catch (Exception ex)
{
enumerator.DisposeAsync(); // REVIEW: fire-and-forget?
throw;
// GetAsyncEnumerator should not throw but rather return a failing enumerator
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply return a special IAsyncEnumerator that fails after the source DisposeAsync is called with the original exception.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale here? Why can't a call to GetAsyncEnumerator throw?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My undestanding is that GetAsyncEnumerator on an arbitrary IAsyncEnumerable should be trusted, otherwise every such call should be defended with try-catches and also make sure the resulting cleanup does everything correctly and at the right time. Besides, the main point here is to avoid "fire-and-forget".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if that's a fair assumption, but it's a good question @stephentoub, any thoughts on what the API contract is for GetAsyncEnumerator?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be any different from IEnumerable<T>.GetEnumerator, and to my knowledge there's no guidance about GetEnumerator not throwing, though @MadsTorgersen can correct me if he knows otherwise. That said, it's super rare for GetEnumerator to throw.

Regardless, I agree with avoiding the fire-and-forget DisposeAsync call... having such a fire-and-forget is problematic because the caller doesn't know whether it can immediately use the resources referenced by the thing that should have been disposed.

return new FailingEnumerator(enumerator, ex);
}

return enumerator;
Expand Down Expand Up @@ -98,6 +98,35 @@ public async ValueTask<bool> MoveNextAsync()
protected virtual void OnGetEnumerator(CancellationToken cancellationToken)
{
}

/// <summary>
/// Disposes a source enumerator then fails with an exception in DisposeAsync.
/// </summary>
private sealed class FailingEnumerator : IAsyncEnumerator<TSource>
{
private readonly IAsyncEnumerator<TSource> _source;

private readonly Exception _error;

public FailingEnumerator(IAsyncEnumerator<TSource> source, Exception error)
{
_source = source;
_error = error;
}

public TSource Current => throw new InvalidOperationException("Enumerator is in an invalid state");

public async ValueTask DisposeAsync()
{
await _source.DisposeAsync();
throw _error;
akarnokd marked this conversation as resolved.
Show resolved Hide resolved
}

public ValueTask<bool> MoveNextAsync()
{
return TaskExt.False;
}
}
}

internal enum AsyncIteratorState
Expand All @@ -107,4 +136,5 @@ internal enum AsyncIteratorState
Iterating = 2,
Disposed = -1,
}

}