Skip to content

Commit

Permalink
Fix Create livelock
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Oct 30, 2018
1 parent b6cf679 commit 0125fe0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 26 deletions.
17 changes: 11 additions & 6 deletions async-enumerable-dotnet-benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ class Program
// ReSharper disable once ArrangeTypeMemberModifiers
static void Main(string[] args)
{
for (var i = 0; i < 100000; i++)
for (var j = 0; j < 100000; j++)
{
if (i % 10 == 0)
if (j % 10 == 0)
{
Console.WriteLine(i);
Console.WriteLine(j);
}
var list = AsyncEnumerable.Range(1, 100_000)
.SwitchMap(v => AsyncEnumerable.Range(v, 2))
var list = AsyncEnumerable.Create<int>(async e =>
{
for (var i = 0; i < 10 && !e.DisposeAsyncRequested; i++)
{
await e.Next(i);
}
})
.Last()
.GetAsyncEnumerator();

Expand All @@ -39,7 +44,7 @@ static void Main(string[] args)
Console.WriteLine("Empty?");
}

if (list.Current != 100_001)
if (list.Current != 9)
{
Console.WriteLine(list.Current);
Console.ReadLine();
Expand Down
10 changes: 9 additions & 1 deletion async-enumerable-dotnet-test/CreateTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public async void Empty()
}

[Fact]
public async void Range()
public async ValueTask Range()
{
var result = AsyncEnumerable.Create<int>(async e =>
{
Expand All @@ -35,5 +35,13 @@ public async void Range()
await result.AssertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
}

[Fact]
public async void Range_Loop()
{
for (int j = 0; j < 1000; j++)
{
await Range();
}
}
}
}
37 changes: 18 additions & 19 deletions async-enumerable-dotnet/impl/CreateEmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ private sealed class CreateEmitterEnumerator : IAsyncEnumerator<T>, IAsyncEmitte

private volatile bool _disposeRequested;

private volatile bool _taskComplete;

public bool DisposeAsyncRequested => _disposeRequested;

private bool _hasValue;

public T Current { get; private set; }

private TaskCompletionSource<bool> _valueReady;
Expand All @@ -42,9 +42,19 @@ private sealed class CreateEmitterEnumerator : IAsyncEnumerator<T>, IAsyncEmitte
internal void SetTask(Task task)
{
_task = task;
task.ContinueWith(t =>
task.ContinueWith(async t =>
{
_taskComplete = true;
if (_disposeRequested)
{
return;
}
await ResumeHelper.Await(ref _consumed);
ResumeHelper.Clear(ref _consumed);
if (_disposeRequested)
{
return;
}
ResumeHelper.Resume(ref _valueReady);
});
}
Expand All @@ -58,28 +68,16 @@ public ValueTask DisposeAsync()

public async ValueTask<bool> MoveNextAsync()
{
if (_taskComplete)
{
if (_task.IsFaulted)
{
throw _task.Exception;
}
return false;
}
ResumeHelper.Resume(ref _consumed);

await ResumeHelper.Await(ref _valueReady);
ResumeHelper.Clear(ref _valueReady);

if (!_taskComplete)
if (_hasValue)
{
_hasValue = false;
return true;
}

if (_task.IsFaulted)
{
throw _task.Exception;
}
Current = default;
return false;
}

Expand All @@ -97,6 +95,7 @@ public async ValueTask Next(T value)
}

Current = value;
_hasValue = true;

ResumeHelper.Resume(ref _valueReady);
}
Expand Down

0 comments on commit 0125fe0

Please sign in to comment.