This will help with adoption of pipelines. The usual adapter code involves creating 2 pipes and creating 2 async loops that read from the stream and write into the pipe and reading from the pipe and writing into the Stream.
There are 2 directions: Stream -> Pipe and Pipe -> Stream.
public class PipeReader
{
public static PipeReader Create(Stream stream, StreamPipeReaderOptions readerOptions = null);
public virtual Stream AsStream();
public virtual Task CopyToAsync(Stream stream, CancellationToken cancellationToken = default);
}
public class PipeWriter
{
public static PipeWriter Create(Stream stream, StreamPipeWriterOptions writerOptions = null);
public virtual Stream AsStream();
protected internal virtual Task CopyFromAsync(Stream stream, CancellationToken cancellationToken = default);
}
public class StreamPipeWriterOptions
{
private const int DefaultMinimumBufferSize = 4096;
public StreamPipeWriterOptions(MemoryPool<byte> pool = null, int minimumBufferSize = DefaultMinimumBufferSize)
{
Pool = pool;
MinimumBufferSize = minimumBufferSize;
}
public int MinimumBufferSize { get; }
public MemoryPool<byte> Pool { get; }
}
public class StreamPipeReaderOptions
{
private const int DefaultBufferSize = 4096;
private const int DefaultMinimumReadSize = 1024;
public StreamPipeReaderOptions(MemoryPool<byte> memoryPool = null, int bufferSize = DefaultBufferSize, int minimumReadSize = DefaultMinimumReadSize)
{
Pool = pool;
BufferSize = bufferSize;
MinimumReadSize = minimumReadSize;
}
public int BufferSize { get; }
public int MinimumReadSize { get; }
public MemoryPool<byte> Pool { get; }
}
public static class StreamPipeExtensions
{
public static Task CopyToAsync(this Stream stream, PipeWriter pipeWriter, CancellationToken cancellationToken = default)
{
return pipeWriter.CopyFromAsync(stream, cancellationToken);
}
}
NOTES:
- Some streams buffer internally and we may end up copying from the Stream's internal buffer into the pipe's buffers.
- Even if we can avoid that, we'll end up allocating a Task per read and write operation (though some streams cache the result of the previous operation)
- We end up paying per read/write costs in general (for e.g. in FileStream allocating via ThreadPoolBoundHandle.AllocateNativeOverlapped per read/write pair).
Most of the per read/write costs can be mitigated by using CopyToAsync (if overridden by the Stream) but there are some downsides there as well.
- Using the default implementations CopyToAsync allocates an internal buffer if the Stream doesn't have one already and passes that buffer to the other stream. Using the default pipe implementation, we end up copying the Stream's buffer into the pipe's buffer which might be fine but is a bit unfortunate.
We can avoid some of these overheads if we implement a PipeReader on top of CopyToAsync that doesn't use the Pipe internally. The idea here is that we call CopyToAsync on a fake stream that forwards WriteAsync calls to the PipeReader consumer. This implementation would pass buffers directly from the Stream to the consumer. If the consumer doesn't process the entire buffer, only the unconsumed buffer is copied into an internal buffer for the next read.
The write side isn't as problematic because we need to be able to allocate memory to write into the Stream so reusing the pipe isn't so bad here. The implementation here would likely be using a Pipe internally, then writing to the Stream on FlushAsync.
Other implementations
This will help with adoption of pipelines. The usual adapter code involves creating 2 pipes and creating 2 async loops that read from the stream and write into the pipe and reading from the pipe and writing into the Stream.
There are 2 directions: Stream -> Pipe and Pipe -> Stream.
NOTES:
Most of the per read/write costs can be mitigated by using CopyToAsync (if overridden by the Stream) but there are some downsides there as well.
We can avoid some of these overheads if we implement a PipeReader on top of CopyToAsync that doesn't use the Pipe internally. The idea here is that we call CopyToAsync on a fake stream that forwards WriteAsync calls to the PipeReader consumer. This implementation would pass buffers directly from the Stream to the consumer. If the consumer doesn't process the entire buffer, only the unconsumed buffer is copied into an internal buffer for the next read.
The write side isn't as problematic because we need to be able to allocate memory to write into the Stream so reusing the pipe isn't so bad here. The implementation here would likely be using a Pipe internally, then writing to the Stream on FlushAsync.
Other implementations