Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

[WIP] Expose/implement/test async iterator support in corefx #32890

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
734f9f8
Expose new types / members added in CoreLib
stephentoub Oct 17, 2018
b457d68
Add tests for CancellationTokenRegistration.DisposeAsync
stephentoub Oct 17, 2018
c185f9e
Add tests for Timer.DisposeAsync
stephentoub Oct 17, 2018
af2f5b6
Add tests for BinaryWriter.DisposeAsync
stephentoub Oct 17, 2018
9c47b38
Add tests for TextWriter.DisposeAsync
stephentoub Oct 17, 2018
1241006
Add tests for StreamWriter.DisposeAsync
stephentoub Oct 17, 2018
7899021
Add tests for FileStream.DisposeAsync
stephentoub Oct 17, 2018
a9874b2
Add IsolatedStorageFileStream.DisposeAsync override
stephentoub Oct 18, 2018
40fd972
Add tests for MemoryStream.DisposeAsync
stephentoub Oct 17, 2018
eba3f49
Add tests for Stream.DisposeAsync
stephentoub Oct 17, 2018
99f3c0b
Add tests for UnmanagedMemoryStream.DisposeAsync
stephentoub Oct 17, 2018
770f071
Add DeflateStream.DisposeAsync and GZipStream.DisposeAsync overrides
stephentoub Oct 17, 2018
b7e34ba
Add BrotliStream.DisposeAsync override
stephentoub Oct 17, 2018
f00d267
Add StringWriter.DisposeAsync override
stephentoub Oct 17, 2018
cd19800
Add BufferedStream.DisposeAsync override
stephentoub Oct 17, 2018
5ad0447
Add PipeStream.DisposeAsync override
stephentoub Oct 17, 2018
f5b33e0
Add NetworkStream.DisposeAsync override
stephentoub Oct 17, 2018
80fc110
Add SslStream/NegotiateStream/AuthenticatedStream.DisposeAsync overrides
stephentoub Oct 17, 2018
2a60e0f
Add CryptoStream.DisposeAsync override
stephentoub Oct 18, 2018
282b3cf
Add DisposeAsync overrides in System.Net.Http
stephentoub Oct 18, 2018
2c1828e
Use new ManualResetValueTaskSourceLogic in tests, and add tests
stephentoub Oct 18, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Common/src/System/IO/DelegatingStream.cs
Expand Up @@ -76,6 +76,11 @@ protected override void Dispose(bool disposing)
base.Dispose(disposing);
}

public override ValueTask DisposeAsync()
{
return _innerStream.DisposeAsync();
}

#region Read

public override long Seek(long offset, SeekOrigin origin)
Expand Down
2 changes: 2 additions & 0 deletions src/Common/src/System/IO/ReadOnlyMemoryStream.cs
Expand Up @@ -36,6 +36,8 @@ public override long Position
_position = (int)value;
}
}

public override ValueTask DisposeAsync() => default;

public override long Seek(long offset, SeekOrigin origin)
{
Expand Down
8 changes: 7 additions & 1 deletion src/Common/tests/System/IO/DelegateStream.cs
Expand Up @@ -24,6 +24,7 @@ internal sealed class DelegateStream : Stream
private readonly Action<long> _setLengthFunc;
private readonly Action<byte[], int, int> _writeFunc;
private readonly Func<byte[], int, int, CancellationToken, Task> _writeAsyncFunc;
private readonly Action<bool> _disposeFunc;

public DelegateStream(
Func<bool> canReadFunc = null,
Expand All @@ -39,7 +40,8 @@ internal sealed class DelegateStream : Stream
Func<long, SeekOrigin, long> seekFunc = null,
Action<long> setLengthFunc = null,
Action<byte[], int, int> writeFunc = null,
Func<byte[], int, int, CancellationToken, Task> writeAsyncFunc = null)
Func<byte[], int, int, CancellationToken, Task> writeAsyncFunc = null,
Action<bool> disposeFunc = null)
{
_canReadFunc = canReadFunc ?? (() => false);
_canSeekFunc = canSeekFunc ?? (() => false);
Expand All @@ -60,6 +62,8 @@ internal sealed class DelegateStream : Stream

_writeFunc = writeFunc;
_writeAsyncFunc = writeAsyncFunc ?? ((buffer, offset, count, token) => base.WriteAsync(buffer, offset, count, token));

_disposeFunc = disposeFunc;
}

public override bool CanRead { get { return _canReadFunc(); } }
Expand All @@ -80,5 +84,7 @@ internal sealed class DelegateStream : Stream

public override void Write(byte[] buffer, int offset, int count) { _writeFunc(buffer, offset, count); }
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { return _writeAsyncFunc(buffer, offset, count, cancellationToken); }

protected override void Dispose(bool disposing) { _disposeFunc?.Invoke(disposing); }
}
}
Expand Up @@ -23,6 +23,8 @@ public VirtualNetworkStream(VirtualNetwork network, bool isServer)
_isServer = isServer;
}

public bool Disposed { get; private set; }

public override bool CanRead
{
get
Expand Down Expand Up @@ -161,6 +163,7 @@ protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposed = true;
_network.BreakConnection();
}

Expand Down
Expand Up @@ -2,235 +2,21 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;

namespace System.Runtime.CompilerServices
{
public interface IStrongBox<T>
{
ref T Value { get; }
}
}

namespace System.Threading.Tasks.Sources
{
public sealed class ManualResetValueTaskSource<T> : IStrongBox<ManualResetValueTaskSourceLogic<T>>, IValueTaskSource<T>, IValueTaskSource
public sealed class ManualResetValueTaskSource<T> : IValueTaskSource<T>, IValueTaskSource
{
private ManualResetValueTaskSourceLogic<T> _logic; // mutable struct; do not make this readonly

public ManualResetValueTaskSource() => _logic = new ManualResetValueTaskSourceLogic<T>(this);

public bool RunContinuationsAsynchronously { get => _logic.RunContinuationsAsynchronously; set => _logic.RunContinuationsAsynchronously = value; }
public short Version => _logic.Version;

public void Reset() => _logic.Reset();

public void SetResult(T result) => _logic.SetResult(result);

public void SetException(Exception error) => _logic.SetException(error);

public T GetResult(short token) => _logic.GetResult(token);
void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);

public ValueTaskSourceStatus GetStatus(short token) => _logic.GetStatus(token);

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags) => _logic.OnCompleted(continuation, state, token, flags);

ref ManualResetValueTaskSourceLogic<T> IStrongBox<ManualResetValueTaskSourceLogic<T>>.Value => ref _logic;
}

public struct ManualResetValueTaskSourceLogic<TResult>
{
private static readonly Action<object> s_sentinel = new Action<object>(s => throw new InvalidOperationException());

private readonly IStrongBox<ManualResetValueTaskSourceLogic<TResult>> _parent;
private Action<object> _continuation;
private object _continuationState;
private object _capturedContext;
private ExecutionContext _executionContext;
private bool _completed;
private TResult _result;
private ExceptionDispatchInfo _error;
private short _version;

public ManualResetValueTaskSourceLogic(IStrongBox<ManualResetValueTaskSourceLogic<TResult>> parent)
{
_parent = parent ?? throw new ArgumentNullException(nameof(parent));
_continuation = null;
_continuationState = null;
_capturedContext = null;
_executionContext = null;
_completed = false;
_result = default;
_error = null;
_version = 0;
}

public short Version => _version;

private void ValidateToken(short token)
{
if (token != _version)
{
throw new InvalidOperationException();
}
}

public ValueTaskSourceStatus GetStatus(short token)
{
ValidateToken(token);

return
!_completed ? ValueTaskSourceStatus.Pending :
_error == null ? ValueTaskSourceStatus.Succeeded :
_error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
ValueTaskSourceStatus.Faulted;
}

public TResult GetResult(short token)
{
ValidateToken(token);

if (!_completed)
{
throw new InvalidOperationException();
}

_error?.Throw();
return _result;
}

public void Reset()
{
_version++;

_completed = false;
_continuation = null;
_continuationState = null;
_result = default;
_error = null;
_executionContext = null;
_capturedContext = null;
}

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (continuation == null)
{
throw new ArgumentNullException(nameof(continuation));
}
ValidateToken(token);

if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
{
_executionContext = ExecutionContext.Capture();
}

if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
{
SynchronizationContext sc = SynchronizationContext.Current;
if (sc != null && sc.GetType() != typeof(SynchronizationContext))
{
_capturedContext = sc;
}
else
{
TaskScheduler ts = TaskScheduler.Current;
if (ts != TaskScheduler.Default)
{
_capturedContext = ts;
}
}
}

_continuationState = state;
if (Interlocked.CompareExchange(ref _continuation, continuation, null) != null)
{
_executionContext = null;

object cc = _capturedContext;
_capturedContext = null;

switch (cc)
{
case null:
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
break;

case SynchronizationContext sc:
sc.Post(s =>
{
var tuple = (Tuple<Action<object>, object>)s;
tuple.Item1(tuple.Item2);
}, Tuple.Create(continuation, state));
break;

case TaskScheduler ts:
Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
break;
}
}
}

public void SetResult(TResult result)
{
_result = result;
SignalCompletion();
}

public void SetException(Exception error)
{
_error = ExceptionDispatchInfo.Capture(error);
SignalCompletion();
}

private void SignalCompletion()
{
if (_completed)
{
throw new InvalidOperationException();
}
_completed = true;

if (Interlocked.CompareExchange(ref _continuation, s_sentinel, null) != null)
{
if (_executionContext != null)
{
ExecutionContext.Run(
_executionContext,
s => ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value.InvokeContinuation(),
_parent ?? throw new InvalidOperationException());
}
else
{
InvokeContinuation();
}
}
}

private void InvokeContinuation()
{
object cc = _capturedContext;
_capturedContext = null;

switch (cc)
{
case null:
_continuation(_continuationState);
break;

case SynchronizationContext sc:
sc.Post(s =>
{
ref ManualResetValueTaskSourceLogic<TResult> logicRef = ref ((IStrongBox<ManualResetValueTaskSourceLogic<TResult>>)s).Value;
logicRef._continuation(logicRef._continuationState);
}, _parent ?? throw new InvalidOperationException());
break;

case TaskScheduler ts:
Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
break;
}
}
}
}
Expand Up @@ -3,9 +3,8 @@
// See the LICENSE file in the project root for more information.

using System.Runtime.ExceptionServices;
using System.Threading.Tasks.Sources;

namespace System.Threading.Tasks.Tests
namespace System.Threading.Tasks.Sources.Tests
{
internal static class ManualResetValueTaskSourceFactory
{
Expand Down
Expand Up @@ -36,6 +36,7 @@ public sealed partial class BrotliStream : System.IO.Stream
public override long Length { get { throw null; } }
public override long Position { get { throw null; } set { } }
protected override void Dispose(bool disposing) { }
public override System.Threading.Tasks.ValueTask DisposeAsync() { throw null; }
public override void Flush() { }
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback asyncCallback, object asyncState) { throw null; }
public override int EndRead(IAsyncResult asyncResult) { throw null; }
Expand Down
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO.Compression
{
Expand Down Expand Up @@ -74,6 +75,31 @@ protected override void Dispose(bool disposing)
}
}

public override async ValueTask DisposeAsync()
{
try
{
if (_stream != null)
{
if (_mode == CompressionMode.Compress)
{
await WriteAsyncMemoryCore(ReadOnlyMemory<byte>.Empty, CancellationToken.None, isFinalBlock: true).ConfigureAwait(false);
}

if (!_leaveOpen)
{
await _stream.DisposeAsync().ConfigureAwait(false);
}
}
}
finally
{
_stream = null;
_encoder.Dispose();
_decoder.Dispose();
}
}

private static void ValidateParameters(byte[] array, int offset, int count)
{
if (array == null)
Expand Down
Expand Up @@ -81,7 +81,7 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
WriteAsyncMemoryCore(buffer, cancellationToken));
}

private async Task WriteAsyncMemoryCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
private async Task WriteAsyncMemoryCore(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken, bool isFinalBlock = false)
{
AsyncOperationStarting();
try
Expand All @@ -92,7 +92,7 @@ private async Task WriteAsyncMemoryCore(ReadOnlyMemory<byte> buffer, Cancellatio
Memory<byte> output = new Memory<byte>(_buffer);
int bytesConsumed = 0;
int bytesWritten = 0;
lastResult = _encoder.Compress(buffer, output, out bytesConsumed, out bytesWritten, isFinalBlock: false);
lastResult = _encoder.Compress(buffer, output, out bytesConsumed, out bytesWritten, isFinalBlock);
if (lastResult == OperationStatus.InvalidData)
throw new InvalidOperationException(SR.BrotliStream_Compress_InvalidData);
if (bytesConsumed > 0)
Expand Down