Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DuplexStream and implement for NetworkStream and QuicStream #51434

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2938,6 +2938,72 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
}
}

/// <summary>Base class for a connected stream that can have writes completed.</summary>
public abstract class DuplexConnectedStreamConformanceTests : ConnectedStreamConformanceTests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about a test for calling Write after CompleteWrites?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, a test that CompleteWrites implicitly does a Flush? (i.e. call Write without Flush, then CompleteWrites and verify the unflushed data was sent)

{
[Theory]
[InlineData(true)]
[InlineData(false)]
public virtual async Task CompleteWrites_EndsReads(bool isAsync)
{
using StreamPair streams = await CreateConnectedStreamsAsync();
byte[] buffer = new byte[8];

foreach ((Stream writable, Stream readable) in GetReadWritePairs(streams))
{
await writable.WriteAsync(buffer);
await writable.FlushAsync();

int received = 0;
while (received != buffer.Length)
{
int len = await readable.ReadAsync(buffer.AsMemory(0, buffer.Length - received));
received += len;
}

ValueTask<int> readTask = readable.ReadAsync(buffer);

Assert.False(readTask.IsCompleted);
Assert.True(writable.CanWrite);

DuplexStream writableDuplexStream = Assert.IsAssignableFrom<DuplexStream>(writable);
if (isAsync) await writableDuplexStream.CompleteWritesAsync();
else writableDuplexStream.CompleteWrites();

Assert.Equal(0, await readTask);
Assert.False(writable.CanWrite);

await Assert.ThrowsAnyAsync<Exception>(async () => await writable.WriteAsync(buffer));
}
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public virtual async Task CompleteWrites_CalledTwice_Success(bool isAsync)
{
using StreamPair streams = await CreateConnectedStreamsAsync();
byte[] buffer = new byte[8];

foreach ((Stream writable, Stream readable) in GetReadWritePairs(streams))
{
DuplexStream writableDuplexStream = Assert.IsAssignableFrom<DuplexStream>(writable);

Assert.True(writable.CanWrite);

if (isAsync) await writableDuplexStream.CompleteWritesAsync();
else writableDuplexStream.CompleteWrites();

Assert.False(writable.CanWrite);

if (isAsync) await writableDuplexStream.CompleteWritesAsync();
else writableDuplexStream.CompleteWrites();

Assert.False(writable.CanWrite);
}
}
}

/// <summary>Provides a disposable, enumerable tuple of two streams.</summary>
public class StreamPair : IDisposable, IEnumerable<Stream>
{
Expand Down
24 changes: 19 additions & 5 deletions src/libraries/Common/tests/System/IO/ConnectedStreams.cs
Expand Up @@ -46,13 +46,13 @@ public static (Stream Writer, Stream Reader) CreateUnidirectional(int initialBuf

/// <summary>Creates a pair of streams that are connected for bidirectional communication.</summary>
/// <remarks>Writing to one stream produces data readable by the either, and vice versa.</remarks>
public static (Stream Stream1, Stream Stream2) CreateBidirectional() =>
public static (DuplexStream Stream1, DuplexStream Stream2) CreateBidirectional() =>
CreateBidirectional(StreamBuffer.DefaultInitialBufferSize, StreamBuffer.DefaultMaxBufferSize);

/// <summary>Creates a pair of streams that are connected for bidirectional communication.</summary>
/// <param name="initialBufferSize">The initial buffer size to use when storing data in the connection.</param>
/// <remarks>Writing to one stream produces data readable by the either, and vice versa.</remarks>
public static (Stream Stream1, Stream Stream2) CreateBidirectional(int initialBufferSize) =>
public static (DuplexStream Stream1, DuplexStream Stream2) CreateBidirectional(int initialBufferSize) =>
CreateBidirectional(initialBufferSize, StreamBuffer.DefaultMaxBufferSize);

/// <summary>Creates a pair of streams that are connected for bidirectional communication.</summary>
Expand All @@ -62,7 +62,7 @@ public static (Stream Writer, Stream Reader) CreateUnidirectional(int initialBuf
/// writes will block until additional space becomes available.
/// </param>
/// <remarks>Writing to one stream produces data readable by the either, and vice versa.</remarks>
public static (Stream Stream1, Stream Stream2) CreateBidirectional(int initialBufferSize, int maxBufferSize)
public static (DuplexStream Stream1, DuplexStream Stream2) CreateBidirectional(int initialBufferSize, int maxBufferSize)
{
// Each direction needs a buffer; one stream will use b1 for reading and b2 for writing,
// and the other stream will do the inverse.
Expand Down Expand Up @@ -275,7 +275,7 @@ private void ThrowIfWritingNotSupported()
throw new NotSupportedException();
}

private sealed class BidirectionalStreamBufferStream : Stream
private sealed class BidirectionalStreamBufferStream : DuplexStream
{
private readonly StreamBuffer _readBuffer;
private readonly StreamBuffer _writeBuffer;
Expand Down Expand Up @@ -309,7 +309,7 @@ protected override void Dispose(bool disposing)
}

public override bool CanRead => !_disposed;
public override bool CanWrite => !_disposed;
public override bool CanWrite => !_disposed && !_writeBuffer.IsComplete;
public override bool CanSeek => false;

public override void Flush() => ThrowIfDisposed();
Expand All @@ -325,6 +325,20 @@ public override Task FlushAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public override void CompleteWrites()
{
ThrowIfDisposed();
_writeBuffer.EndWrite();
}

public override ValueTask CompleteWritesAsync(CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken);
_writeBuffer.EndWrite();
return default;
}

public override int Read(byte[] buffer, int offset, int count)
{
ValidateBufferArguments(buffer, offset, count);
Expand Down
Expand Up @@ -118,7 +118,7 @@ public async Task SendFrameAsync(long frameType, ReadOnlyMemory<byte> framePaylo

public async Task ShutdownSendAsync()
{
_stream.Shutdown();
await _stream.CompleteWritesAsync().ConfigureAwait(false);
await _stream.ShutdownWriteCompleted().ConfigureAwait(false);
}

Expand Down
Expand Up @@ -14,7 +14,7 @@ public class UnidirectionalConnectedStreamsTests : ConnectedStreamConformanceTes
Task.FromResult<StreamPair>(ConnectedStreams.CreateUnidirectional());
}

public class BidirectionalConnectedStreamsTests : ConnectedStreamConformanceTests
public class BidirectionalConnectedStreamsTests : DuplexConnectedStreamConformanceTests
{
protected override int BufferedSize => StreamBuffer.DefaultMaxBufferSize;
protected override bool FlushRequiredToWriteData => false;
Expand Down
Expand Up @@ -152,7 +152,7 @@ public async Task<HttpResponseMessage> SendAsync(CancellationToken cancellationT
}
else
{
_stream.Shutdown();
await _stream.CompleteWritesAsync(cancellationToken).ConfigureAwait(false);
}
}

Expand Down Expand Up @@ -372,7 +372,7 @@ private async Task SendContentAsync(HttpContent content, CancellationToken cance
_sendBuffer.Discard(_sendBuffer.ActiveLength);
}

_stream.Shutdown();
await _stream.CompleteWritesAsync(cancellationToken).ConfigureAwait(false);
}

private async ValueTask WriteRequestContentAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
Expand Down
5 changes: 3 additions & 2 deletions src/libraries/System.Net.Quic/ref/System.Net.Quic.cs
Expand Up @@ -78,7 +78,7 @@ public partial class QuicOptions
public long MaxBidirectionalStreams { get { throw null; } set { } }
public long MaxUnidirectionalStreams { get { throw null; } set { } }
}
public sealed partial class QuicStream : System.IO.Stream
public sealed partial class QuicStream : System.IO.DuplexStream
{
internal QuicStream() { }
public override bool CanRead { get { throw null; } }
Expand All @@ -91,6 +91,8 @@ public sealed partial class QuicStream : System.IO.Stream
public void AbortWrite(long errorCode) { }
public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
public override System.IAsyncResult BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
public override void CompleteWrites() { }
public override System.Threading.Tasks.ValueTask CompleteWritesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected override void Dispose(bool disposing) { }
public override int EndRead(System.IAsyncResult asyncResult) { throw null; }
public override void EndWrite(System.IAsyncResult asyncResult) { }
Expand All @@ -102,7 +104,6 @@ public sealed partial class QuicStream : System.IO.Stream
public override System.Threading.Tasks.ValueTask<int> ReadAsync(System.Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override long Seek(long offset, System.IO.SeekOrigin origin) { throw null; }
public override void SetLength(long value) { }
public void Shutdown() { }
public System.Threading.Tasks.ValueTask ShutdownWriteCompleted(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public override void Write(byte[] buffer, int offset, int count) { }
public override void Write(System.ReadOnlySpan<byte> buffer) { }
Expand Down
Expand Up @@ -178,12 +178,19 @@ internal override ValueTask ShutdownWriteCompleted(CancellationToken cancellatio
return default;
}

internal override void Shutdown()
public override void CompleteWrites()
{
CheckDisposed();
WriteStreamBuffer?.EndWrite();
}

public override ValueTask CompleteWritesAsync(CancellationToken cancellationToken)
{
CheckDisposed();
if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken);

// This seems to mean shutdown send, in particular, not both.
WriteStreamBuffer?.EndWrite();
return default;
}

private void CheckDisposed()
Expand All @@ -198,7 +205,7 @@ public override void Dispose()
{
if (!_disposed)
{
Shutdown();
CompleteWrites();

_disposed = true;
}
Expand All @@ -208,7 +215,7 @@ public override ValueTask DisposeAsync()
{
if (!_disposed)
{
Shutdown();
CompleteWrites();

_disposed = true;
}
Expand Down
Expand Up @@ -4,7 +4,9 @@
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Quic.Implementations.MsQuic.Internal;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -396,10 +398,37 @@ internal override async ValueTask ShutdownWriteCompleted(CancellationToken cance
await _state.ShutdownWriteCompletionSource.Task.ConfigureAwait(false);
}

internal override void Shutdown()
public override void CompleteWrites()
{
ThrowIfDisposed();
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
try
{
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
}
catch (Exception ex) when (ex is not OutOfMemoryException)
{
throw new IOException("Unable to complete writes: " + ex.Message, ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll do a pass to move all resources into a resx subsequently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there's a QUIC issue for it.

}
}

// We don't wait for QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE event here,
// because it is only sent to us once the peer has acknowledged the shutdown.
// Instead, this method acts more like shutdown(SD_SEND) in that it only "queues"
// the shutdown packet to be sent without any waiting for completion.
public override ValueTask CompleteWritesAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
if (cancellationToken.IsCancellationRequested) return ValueTask.FromCanceled(cancellationToken);

try
{
StartShutdown(QUIC_STREAM_SHUTDOWN_FLAGS.GRACEFUL, errorCode: 0);
return default;
}
catch (Exception ex) when (ex is not OutOfMemoryException)
{
return ValueTask.FromException(ExceptionDispatchInfo.SetCurrentStackTrace(new IOException("Unable to complete writes: " + ex.Message, ex)));
}
}

// TODO consider removing sync-over-async with blocking calls.
Expand Down
Expand Up @@ -39,12 +39,14 @@ internal abstract class QuicStreamProvider : IDisposable, IAsyncDisposable

internal abstract ValueTask ShutdownWriteCompleted(CancellationToken cancellationToken = default);

internal abstract void Shutdown();

internal abstract void Flush();

internal abstract Task FlushAsync(CancellationToken cancellationToken);

public abstract void CompleteWrites();

public abstract ValueTask CompleteWritesAsync(CancellationToken cancellationToken);

public abstract void Dispose();

public abstract ValueTask DisposeAsync();
Expand Down
Expand Up @@ -9,7 +9,7 @@

namespace System.Net.Quic
{
public sealed class QuicStream : Stream
public sealed class QuicStream : DuplexStream
{
private readonly QuicStreamProvider _provider;

Expand Down Expand Up @@ -101,7 +101,9 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati

public ValueTask ShutdownWriteCompleted(CancellationToken cancellationToken = default) => _provider.ShutdownWriteCompleted(cancellationToken);

public void Shutdown() => _provider.Shutdown();
public override void CompleteWrites() => _provider.CompleteWrites();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we replacing Shutdown? I thought this is a replacement for ShutdownWriteCompleted? Or did I completely miss the whole point of this change somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShutdownWriteCompleted will become something like Task Completed { get; } to denote when both sides have finished using the stream (i.e. full graceful shutdown), which is what its current behavior is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that original ShutdownWriteCompleted means "I'm finished writing you can start sending response", which I thought is the meaning of the new CompleteWrites?
I also thought that original Shutdown is the final step of the communication, i.e.: after the response has been sent.

I don't know anything about Task Completed { get; }, I don't think we got that far with the QUIC API reviews, but if I understand the meaning that should correspond to Shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shutdown is "shutdown writes, send EOF to peer".

ShutdownWriteCompleted was "wait for peer to acknowledge that shutdown" which we found to be mostly useless so I believe we changed it to mean "wait for peer to acknowledge that shutdown AND to shutdown their side too". It has a terrible name. This is what I'd like to see changed to a Task Completed { get; }.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShutdownWriteCompleted actually remained for now, I've added new ShutdownCompleted alongside... But I agree ShutdownWriteCompleted doesn't have any use for us anymore, so it can just be removed anytime

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was completely bamboozled by the naming and made assumption based on that.


public override ValueTask CompleteWritesAsync(CancellationToken cancellationToken = default) => _provider.CompleteWritesAsync(cancellationToken);

protected override void Dispose(bool disposing)
{
Expand Down
Expand Up @@ -125,7 +125,7 @@ public async Task WriteTests(int[][] writes, WriteType writeType)
}
}

stream.Shutdown();
await stream.CompleteWritesAsync();
await stream.ShutdownWriteCompleted();
},
async serverConnection =>
Expand All @@ -143,7 +143,7 @@ public async Task WriteTests(int[][] writes, WriteType writeType)
int expectedTotalBytes = writes.SelectMany(x => x).Sum();
Assert.Equal(expectedTotalBytes, totalBytes);

stream.Shutdown();
await stream.CompleteWritesAsync();
await stream.ShutdownWriteCompleted();
});
}
Expand Down
Expand Up @@ -81,7 +81,7 @@ public sealed class MsQuicQuicStreamConformanceTests : QuicStreamConformanceTest

}

public abstract class QuicStreamConformanceTests : ConnectedStreamConformanceTests
public abstract class QuicStreamConformanceTests : DuplexConnectedStreamConformanceTests
{
public SslServerAuthenticationOptions GetSslServerAuthenticationOptions()
{
Expand Down
Expand Up @@ -450,7 +450,7 @@ public async Task ReadWrite_Random_Success(int readSize, int writeSize)
sendBuffer = sendBuffer.Slice(chunk.Length);
}

clientStream.Shutdown();
await clientStream.CompleteWritesAsync();
await clientStream.ShutdownWriteCompleted();
},
async serverConnection =>
Expand Down
4 changes: 3 additions & 1 deletion src/libraries/System.Net.Sockets/ref/System.Net.Sockets.cs
Expand Up @@ -114,7 +114,7 @@ public partial class MulticastOption
public int InterfaceIndex { get { throw null; } set { } }
public System.Net.IPAddress? LocalAddress { get { throw null; } set { } }
}
public partial class NetworkStream : System.IO.Stream
public partial class NetworkStream : System.IO.DuplexStream
{
public NetworkStream(System.Net.Sockets.Socket socket) { }
public NetworkStream(System.Net.Sockets.Socket socket, bool ownsSocket) { }
Expand All @@ -135,6 +135,8 @@ public partial class NetworkStream : System.IO.Stream
public override System.IAsyncResult BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
public override System.IAsyncResult BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback? callback, object? state) { throw null; }
public void Close(int timeout) { }
public override void CompleteWrites() { }
public override System.Threading.Tasks.ValueTask CompleteWritesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected override void Dispose(bool disposing) { }
public override int EndRead(System.IAsyncResult asyncResult) { throw null; }
public override void EndWrite(System.IAsyncResult asyncResult) { }
Expand Down
3 changes: 3 additions & 0 deletions src/libraries/System.Net.Sockets/src/Resources/Strings.resx
Expand Up @@ -330,4 +330,7 @@
<data name="net_sockets_handle_already_used" xml:space="preserve">
<value>Handle is already used by another Socket.</value>
</data>
<data name="net_io_shutdownfailure" xml:space="preserve">
<value>Unable to complete writes on the transport connection: {0}.</value>
</data>
</root>