Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Proposal: Add PipeReader.ReadAsync overloads to allow waiting for a specific amount of data #25063

Closed
davidfowl opened this issue Feb 16, 2018 · 40 comments · Fixed by #51979
Assignees
Labels
api-approved API was approved in API review, it can be implemented area-System.IO.Pipelines
Milestone

Comments

@davidfowl
Copy link
Member

davidfowl commented Feb 16, 2018

Often a protocol knows how much data it is waiting for (TLS frames being one example, size headered binary protocols being another).

Instead of ReadAsync() then you check the header and you see you don't have enough, so you ReadAsync() check if you have enough and continue doing loops until you have a complete frame. If you could do a "ReadAsync(xxxxx)" and it wouldn't return unless there was an error/complete or at least that amount of data was available it would solve extra looping in every downstream protocol

Proposed API

public abstract partial class PipeReader
{
+    public virtual ValueTask<ReadResult> ReadAsync(int minimumBytes, CancellationToken cancellationToken = default);
}

/cc @Drawaes

@benaadams
Copy link
Member

I like it

"ReadAsync(xxxxx)" and it wouldn't return unless there was an error/complete or at least that amount of data was available it

Should it be a suggestion/hint of size?

What happens if there will be a large delay before the next bit of data? (e.g. TLS may want to close the frame early and send anyway).

Often there are variable sized protocols, which have a preferred frame size (e.g. TCP at packet sizes, TLS at frame size), which can close the frame early and send to next pipe stage/output; however are more efficient doing the send on Frame boundaries.

@davidfowl
Copy link
Member Author

What happens if there will be a large delay before the next bit of data? (e.g. TLS may want to close the frame early and send anyway).

Then you cancel the read to yield it.

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

Then you cancel the read to yield it.

So the pattern would be?

public async Task Write15LinesAsync()
{
    PipeWriter buffer = Pipe.Writer;
    var length = 'z' - 'a' + 3;
    for (var i = 0; i < 15; i++)
    {
        var written = WriteAtoZ(buffer.GetMemory(length));
        buffer.Advance(written);
        await buffer.FlushAsync(); // Write data
    }
    await Pipe.Reader.CancelPendingRead(); // Clear any buffered data
}

public int WriteAtoZ(Memory<byte> memory)
{
    Span<byte> span = memory.Span;
    var i = 0;
    for (var c = 'a'; c <= 'z'; c++)
    {
        span[i] = (byte)c;
        i++;
    }
    span[i + 1] = (byte)'\r';
    span[i + 2] = (byte)'\n';

    return i + 2;
}

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

Or would cancel/wake up happen with Flush but not Write so it would be:

public async Task Write15LinesAsync()
{
    PipeWriter buffer = Pipe.Writer;
    var length = 'z' - 'a' + 3;
    for (var i = 0; i < 15; i++)
    {
        var written = WriteAtoZ(buffer.GetMemory(length));
        buffer.Advance(written);
        await buffer.WriteAsync(); // Write data
    }
    await buffer.FlushAsync(); // Clear any buffered data
}

e.g. how do you

Then you cancel the read to yield it.

@davidfowl
Copy link
Member Author

The question is really about where the knowledge is (reader or writer). In this simple example, you can just flush at the end and not Write for each Advance (so it's a bit contrived).

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

The question is really about where the knowledge is (reader or writer).

Both sizes have knowledge; the Writer knows when it has no more pending data; the Reader knows either its ideal chunk sizes; or whether it has enough data to parse.

In this simple example, you can just flush at the end and not Write for each Advance

Is simplified for ease of concepts; the point is you can't Advance forever or you'll run out of memory; and you get no backpressure feedback.

Ok. a more complex example:

public async Task ExecuteAsync()
{
    try
    {
        var bytesWritten = 0u;
        while (true)
        {
            if (!_pipe.Reader.TryRead(out var result))
            {
                // No waiting input data
                if (bytesWritten > 0)
                {
                    // Flush any written data before waiting for more input
                    var flushResult = _pipe.Writer.FlushAsync();
                    if (!flushResult.IsCompleted)
                    {
                        await flushResult;
                    }
                    bytesWritten = 0u;
                }
                // Await for more input
                result = await _pipe.Reader.ReadAsync();
            }

            var inputBuffer = result.Buffer;
            var consumed = inputBuffer.Start;
            var examined = inputBuffer.End;

            try
            {
                if (inputBuffer.IsEmpty && result.IsCompleted)
                {
                    break;
                }

                ParseRequest(inputBuffer, out consumed, out examined);

                if (_state != State.Body && result.IsCompleted)
                {
                    // Bad request, finish request processing
                    break;
                }

                if (_state == State.Body)
                {
                    bytesWritten += await ProcessRequestAsync();

                    _state = State.StartLine;
                }
            }
            finally
            {
                _pipe.Reader.AdvanceTo(consumed, examined);
            }
        }

        // Flush any pending data
        await _pipe.Writer.FlushAsync();
        _pipe.Reader.Complete();
    }
    catch (Exception ex)
    {
        _pipe.Reader.Complete(ex);
    }
    finally
    {
        _pipe.Writer.Complete();
    }
}

private async ValueTask<uint> ProcessRequestAsync()
{
    // Processing a request Writes, but doesn't Flush, 
    // unless it needs to trigger immediate sending
    // like after a websocket message, server sent event or
    // at end of </head> to get early browser read of css, fonts and preload, prefetch links
    await _pipe.Writer.WriteAsync(_plainTextBytes);
    return (uint)_plainTextBytes.Length;
}

So the question is how do you differentiate between a forced early wake up, and an enough data wakeup on the Writer and Reader ends.

I'm suggesting using WriteAsync and FlushAsync for the Writer side as they are well understood api tropes from Streams; so new users of Pipelines will naturally start using them correctly in that way as its already what they understand, there are no new concepts to use.

@benaadams
Copy link
Member

Also that you want to default the first param of WriteAsync

public virtual PipeAwaiter<FlushResult> WriteAsync(ReadOnlyMemory<byte> source = default, CancellationToken cancellationToken = default)

So you can do

await WriteAsync();

When you are only writing using Span and Advance

@davidfowl
Copy link
Member Author

Is simplified for ease of concepts; the point is you can't Advance forever or you'll run out of memory; and you get no backpressure feedback.

Then don't FlushAsync, use a reasonable threshold that won't run out of memory? What's the scenario where you will need to buffer so much that you could run out?

I'm suggesting using WriteAsync and FlushAsync for the Writer side as they are well understood api tropes from Streams; so new users of Pipelines will naturally start using them correctly in that way as its already what they understand, there are no new concepts to use.

WriteAsync without a buffer is not an easily understood concept that carries over to normal usage.

So the question is how do you differentiate between a forced early wake up, and an enough data wakeup on the Writer and Reader ends.

You need to describe how the other side of the writer wants to handle consuming the data written during request processing.

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

Second scenario, TextWriter without arrays as buffers (as generally the encoding happens in Flush so it needs to hold conversion buffers) ; and encoding directly to Pipe without intermediary copies.

Holding onto these arrays causes issues https://github.com/dotnet/coreclr/issues/16389 as well as contention getting the from the pool.

System.OutOfMemoryException:
   at System.Buffers.TlsOverPerCoreLockedStacksArrayPool`1.Rent
   at Microsoft.AspNetCore.WebUtilities.HttpResponseStreamWriter..ctor
   at Microsoft.AspNetCore.Mvc.Internal.MemoryPoolHttpResponseStreamWriterFactory.CreateWriter
   at Microsoft.AspNetCore.Mvc.Formatters.JsonOutputFormatter+<WriteResponseBodyAsync>d__11.MoveNext

With Pipes the arrays just aren't needed as the Pipe brings its own memory.

Stream that wraps Pipe

public class DuplexPipeStream : Stream, IDuplexPipe
{
    private Pipe _pipe;
    PipeReader IDuplexPipe.Input => _pipe.Reader;
    PipeWriter IDuplexPipe.Output => _pipe.Writer;

    public override async Task FlushAsync(CancellationToken cancellationToken)
        => await _pipe.Writer.FlushAsync(cancellationToken);

    public override Task WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default)
        => _pipe.Writer.WriteAsync(source, cancellationToken);

    public override async ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
    {
        while (true)
        {
            var result = await _pipe.Reader.ReadAsync();
            var readableBuffer = result.Buffer;
            try
            {
                if (!readableBuffer.IsEmpty)
                {
                    // buffer.Count is int
                    var count = (int)Math.Min(readableBuffer.Length, destination.Count);
                    readableBuffer = readableBuffer.Slice(0, count);
                    readableBuffer.CopyTo(destination);
                    return count;
                }
                else if (result.IsCompleted)
                {
                    return 0;
                }
            }
            finally
            {
                _pipe.Reader.AdvanceTo(readableBuffer.End, readableBuffer.End);
            }
        }
    }

    public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        => WriteAsync(new Memory<byte>(buffer, offset, count), cancellationToken);
    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
        => ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

    // Other stream overloads
}

Then TextWriter that works with that Stream; encoding on the go in WriteAsync rather than waiting for FlushAsync and having no internal; but it would rely on WriteAsync only occasionally acting as a Flush (as it does in StreamTextWriter currently) to maintain performance.

public class HttpResponseStreamWriter : TextWriter
{
    internal const int DefaultBufferSize = 16 * 1024;

    private IDuplexPipe _pipe;
    private StreamWriter _streamWriter;
    private readonly Encoder _encoder;

    public PipeStreamWriter(Stream stream, Encoding encoding)
        : this(stream, encoding, DefaultBufferSize) {}

    public PipeStreamWriter(Stream stream, Encoding encoding, int bufferSize)
    {
        Encoding = encoding;
        if (stream is IDuplexPipe pipe)
        {
            _pipe = pipe;
        }
        else
        {
            _streamWriter = new StreamWriter(stream, encoding, bufferSize);
        }
    }

    public override Encoding Encoding { get; }

    public unsafe override void Write(char value)
    {
        if (_pipe != null)
        {
            Span<byte> bytes = _pipe.Output.GetSpan(Encoding.GetMaxByteCount(1));
            var encoded = Encoding.GetBytes(new ReadOnlySpan<byte>(&value, sizeof(char)), bytes);
            _pipe.Output.Advance(encoded);
        }
        else
        {
            _streamWriter.Write(value);
        }
        }
        else
        {
            _streamWriter.Write(value);
        }
    }


    public override async Task WriteAsync(ReadOnlyMemory<char> source)
    {
        if (_pipe != null)
        {
            Write(source, default(CancellationToken));
            await _pipe.Output.WriteAsync();
        }
        else
        {
            await _streamWriter.WriteAsync(source);
        }
    }

    private void Write(ReadOnlyMemory<char> source, CancellationToken token)
    {
        ReadOnlySpan<char> input = source.Span;
        int minBytes = Encoding.GetMaxCharCount(1);
        while (input.Length > 0)
        {
            Span<byte> bytes = _pipe.Output.GetSpan(minBytes);
            int totalEncoded = 0;
            while (bytes.Length > 0)
            {
                int toEncode = Math.Min(Encoding.GetMaxCharCount(bytes.Length), input.Length);
                var encoded = Encoding.GetBytes(input.Slice(0, toEncode), bytes);
                input = input.Slice(toEncode);
                bytes = bytes.Slice(encoded);
                totalEncoded += encoded;
                if (bytes.Length < minBytes)
                {
                    break;
                }
            }
            _pipe.Output.Advance(totalEncoded);
        }
    }

    public override Task WriteAsync(char[] values, int index, int count)
        => WriteAsync(new ReadOnlyMemory<char>(values, index, count));

    public override Task WriteAsync(string value)
        => WriteAsync(value.AsReadOnlyMemory());

    public override Task FlushAsync()
    {
        if (_pipe != null)
        {
            await _pipe.Output.FlushAsync();
        }
        else
        {
            await _streamWriter.FlushAsync();
        }
    }

    // Other TextWriter overloads
}

@benaadams
Copy link
Member

So the question is if WriteAsync called FlushAsync as now; then TextWriter.FlushAsync wouldn't call FlushAsync; instead it would need to cancel the reader of the next stage in the pipeline

Then you cancel the read to yield it.

So it would additionally need with the wrapping Stream the next stage in the pipeline so it could call the void method CancelPendingRead?

    public override Task FlushAsync()
    {
        if (_pipe != null)
        {
           // Where does _nextPipe come from?
           // Is it a problem CancelPendingRead is acting as a non-awaitable Flush?
            _nextPipe.Input.CancelPendingRead();
        }
        else
        {
            await _streamWriter.FlushAsync();
        }
    }

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

btw I think "ReadAsync(xxxxx)" is a good api; I just want more detail on how to wake it up early; and how the Reader tells it was woken up early (other than first checking how much data there is and seeing if it agrees with the amount that was requested; then changing behaviour based on that).

@davidfowl
Copy link
Member Author

You still haven't shown the consuming side. What's on the other end of the pipe that's flushing? I'd like to see the transport code basically and how it uses these 2 features. What I hear you asking for is a force flush from the writer side even though the reader is looking for a specific buffer size.

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

What's on the other end of the pipe that's flushing? I'd like to see the transport code basically and how it uses these 2 features.

Something along the lines of:

const int PreferredFrameSize = 1024; // Varies based on Reader's framing size

public async Task WriteFramesAsync(CancellationToken cancellationToken)
{
    try
    {
        await WriteFramesAsync(_pipe.Reader, cancellationToken).ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        _pipe.Reader.Complete(ex);
    }
    finally
    {
        _pipe.Writer.Complete();
    }
}

private async Task WriteFramesAsync(PipeReader reader, CancellationToken cancellationToken)
{
    while (true)
    {
        // Read specifying preferred readsize hint
        ReadResult result = await reader.ReadAsync(PreferredFrameSize, cancellationToken);

        ReadOnlySequence<byte> inputBuffer = result.Buffer;
        SequencePosition consumed = inputBuffer.Start;
        SequencePosition examined = inputBuffer.End;

        bool shouldConsumeAll = (result.IsFlush || result.IsCompleted);

        try
        {
            while (true)
            {
                cancellationToken.ThrowIfCancellationRequested();
                // Output frame, passing on Flush if last frame of current data
                await OutputFrameAsync(
                    inputBuffer, 
                    flush: (inputBuffer.Length <= PreferredFrameSize ? shouldConsumeAll : false), 
                    out consumed, 
                    out examined, 
                    cancellationToken);

                inputBuffer = inputBuffer.Slice(consumed);
                if (inputBuffer.IsEmpty)
                {
                    // Frame completed and no more data, await more data 
                    break;
                }

                if (!result.IsCompleted && inputBuffer.Length < PreferredFrameSize)
                {
                    // Under preferred size, check if more input was made 
                    // available while writing Frames, Completed won't have more data
                    reader.AdvanceTo(consumed, examined);
                    if (reader.TryRead(out ReadResult newResult))
                    {
                        result = newResult;
                        inputBuffer = result.Buffer;
                        consumed = inputBuffer.Start;
                        examined = inputBuffer.End;

                        // Apply inital Flush if originally requested
                        shouldConsumeAll = (shouldConsumeAll || result.IsFlush || result.IsCompleted);
                    }
                }

                if (!shouldConsumeAll && inputBuffer.Length < PreferredFrameSize)
                {
                    // Not enough left to fill a frame and not Flush or Complete, await more data
                    break;
                }
            }

            if (inputBuffer.IsEmpty && result.IsCompleted)
            {
                // Finished, exit
                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    reader.Complete();
}

@davidfowl
Copy link
Member Author

davidfowl commented Feb 16, 2018

I think the problem here is that you need to guarantee that the writer absolutely is going to flush or you can end up in a situation where you don't have enough data for a frame and you haven't consumed enough to release back pressure. Thats' fundamentally what I think is broken about the reader using IsFlush to make decisions like this.

That's why I think you always need this timeout on the reader side that tries to consume data as a failsafe.

@benaadams
Copy link
Member

benaadams commented Feb 16, 2018

If you are in back pressure you have ReadAsync immediately complete with IsFlush automatically set.

If the reader stops making progress while in back pressure; then it should throw
ThrowHelper.ThrowInvalidOperationException_BackpressureDeadlock();
on Advance or perhaps better next ReadAsync (as its going to return the same data)?

Then it would tear everything down?

@davidfowl
Copy link
Member Author

If you are in back pressure you have ReadAsync immediately complete with IsFlush automatically set.

So IsFlush is also true if you WriteAsync and cross the threshold?

If the reader stops making progress while in back pressure; then it should throw
ThrowHelper.ThrowInvalidOperationException_BackpressureDeadlock();
on Advance?

But how do you code around that?

@benaadams
Copy link
Member

benaadams commented Feb 17, 2018

So IsFlush is also true if you WriteAsync and cross the threshold?

Yes; because the "buffer" is full. Much like with a TextWriter or a BufferedStream adds a FlushAsync to a WriteAsync when its internal buffer gets full.

on Advance?

But how do you code around that?

Added

or perhaps better next ReadAsync (as its going to return the same data)?

As you could call Advance on the Reader several times before going back to ReadAsync().

Thinking about it... Perhaps the values are already there and no more values needed? Expose the thresholds on PipeReader (so the reader can use them) and define ReadAsync() in terms of ReadAsync(xxxxx):

public abstract partial class PipeReader
{
    public long PauseWriterThreshold { get; } => _pauseWriterThreshold;
    public long ResumeWriterThreshold { get; } => _resumeWriterThreshold;

    public PipeAwaiter<ReadResult> ReadAsync()
        => ReadAsync(Math.Max(ResumeWriterThreshold, inputBuffer.Length + 1)); // semantically

    public abstract PipeAwaiter<ReadResult> ReadAsync(long waitForBytes);
}

Then the reader has a chance to decide whether it wants to defer while the pressure is going up; but has to consume once its hit the top.

A defaults for sockets could be something like

new PipeOptions(
    pauseWriterThreshold: 65535, // 64kB
    resumeWriterThreshold: 1400; // or MTU - TCP/IP Headers
);

But how do you code around that?

Basing it on the PipeOptions means its config (which would make @tmds happy) also using the same value for resumeWriterThreshold and PreferredFrameSize means they won't go out of sync

Also exposing the threasholds makes it easier for a wrapping stream as it can pick up the wrapped Readers limits and make decisions using them.

@benaadams
Copy link
Member

benaadams commented Feb 17, 2018

Minimal implementation using IsFlush (as it needs to be passed on)

private async Task PassThroughAsync(PipeReader reader, PipeWriter writer, CancellationToken cancellationToken)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);

        ReadOnlySequence<byte> inputBuffer = result.Buffer;
        SequencePosition consumed = inputBuffer.Start;
        SequencePosition examined = inputBuffer.End;

        try
        {
            cancellationToken.ThrowIfCancellationRequested();

            if (inputBuffer.IsEmpty && result.IsCompleted)
            {
                await writer.FlushAsync(cancellationToken);
                break;
            }

            // While loop could be a WriteAsync(ReadOnlySequence<byte> extension
            while (inputBuffer.TryGet(ref consumed, out var memory, advance: true))
            {
                await writer.WriteAsync(memory, cancellationToken);
            }

            if (result.IsFlush)
            {
                await writer.FlushAsync(cancellationToken);
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    reader.Complete();
}

public async Task PassThroughAsync(CancellationToken cancellationToken)
{
    try
    {
        await PassThroughAsync(_pipe.Reader, _pipe.Writer, cancellationToken).ConfigureAwait(false);
    }
    catch (Exception ex)
    {
        _pipe.Reader.Complete(ex);
    }
    finally
    {
        _pipe.Writer.Complete();
    }
}

@tmds
Copy link
Member

tmds commented Feb 19, 2018

I think we need to split up between:

  • The reader prefers to work in larger chunks, but is happy to read less if no more data is expected soon.
  • The reader knows how much data it expects, so it wants to read at least that amount or complete with an error.

For the first, I think the reader needs to respect the writer's FlushAsync (as it is the way to indicate no more data is expected soon). The way of working in larger chunks is to buffer data from WriteAsync up to an amount (cfr dotnet/apireviews#59 (comment)). The reader can use the IsFlush flag to distinguish between an explicit Flush by FlushAsync or an implicit flush by WriteAsync. I think we can use the low threshold as the WriteAsync buffering threshold.

For the second, the FlushAsyncs won't cause ReadAsync to complete until the amount is available (or an error occurs, e.g. timeout). ReadAsync(xxxxx) seems a good API for this.

@steji113
Copy link
Contributor

I didn't read through the whole thread, but the initial summary of this issue is something I have seen that Boost ASIO does in the C++ world fairly well: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/reference/read_until.html

This is very helpful when consuming protocols that have a header of what the total expected length is.

@Clockwork-Muse
Copy link
Contributor

This is very helpful when consuming protocols that have a header of what the total expected length is.

One of my projects at work was interfacing with two (different) IoT devices that had custom application protocols over TCP, and were length-prefixed. The farther up the application/message stack you get, the harder it is to do something like partial parsing; yes, you just want to pull the entire message off the stream and handle it.

Of course, I'm pretty sure most of the actual messages being passed were fitting inside a single TCP frame, so it's pretty likely my code only had to actually Read once anyways.

@davidfowl
Copy link
Member Author

@halter73 showed interested in doing this

@Drawaes
Copy link
Contributor

Drawaes commented Nov 1, 2019

I haven't back read the whole issue but.... There is never a scenario (outside cancelation/error) that if you say "I need 5k" but actually you want 3k ... In TLS due to the encrypted nature it's all or nothing.

Secondly surely if your "asked for" amount is > the backpressure threshold shouldn't you throw straight away.... As your requirement is already bigger than your backpressure anyway?

@davidfowl
Copy link
Member Author

@Drawaes the back pressure mechanic was changed completely in the last version so it's no longer a problem.

@halter73
Copy link
Member

halter73 commented Nov 1, 2019

Secondly surely if your "asked for" amount is > the backpressure threshold shouldn't you throw straight away.... As your requirement is already bigger than your backpressure anyway?

Given the old behavior where backpressure was based on the number of unconsumed bytes, this makes a lot of sense. Given the new behavior where backpressure is based on the number of unexamined bytes, I think it's fair to say you are "examining" all the bytes until you get at least the number of bytes you expected.

Really this feature is just meant to be a more optimized version of the following:

public async ValueTask<ReadResult> ReadAtLeast(PipeReader pipeReader, long minBytes)
{
    var result = await pipeReader.ReadAsync();

    while (result.Buffer.Length < minBytes)
    {
        pipeReader.AdvanceTo(result.Buffer.Start, result.Buffer.End);
        result = await pipeReader.ReadAsync();
    }

    return result;
}

@davidfowl
Copy link
Member Author

Also, that will be the implementation in the PipeReader abstract base class. I'll turn this into a formal API proposal.

@davidfowl davidfowl changed the title Ability to wait for a specific amount of data API Proposal: Add ReadAsync overloads to allow waiting for a specific amount of data Nov 2, 2019
@davidfowl davidfowl changed the title API Proposal: Add ReadAsync overloads to allow waiting for a specific amount of data API Proposal: Add PipeReader.ReadAsync overloads to allow waiting for a specific amount of data Nov 2, 2019
@msftgits msftgits transferred this issue from dotnet/corefx Jan 31, 2020
@halter73 halter73 modified the milestones: 5.0.0, Future Jul 8, 2020
@davidfowl davidfowl added api-ready-for-review API is ready for review, it is NOT ready for implementation and removed api-suggestion Early API idea and discussion, it is NOT ready for implementation labels Apr 19, 2021
@davidfowl davidfowl modified the milestones: Future, 6.0.0 Apr 19, 2021
@bartonjs
Copy link
Member

bartonjs commented Apr 27, 2021

Video

  • Since there's a parameter that should be validated (if negative, throw), it should use the Template Method Pattern (ReadAsync => virtual ReadAsyncCore)
  • To enhance discoverability we feel like not being an overload is better: ReadAsync => ReadAtLeastAsync
namespace System.IO.Pipelines
{
    partial class PipeReader
    {
        public ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken cancellationToken = default);
        protected virtual ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken);
    }
}

@GrabYourPitchforks seems to have had some existential concerns with this. The shape is approved as long as we don't outright axe the feature.

@bartonjs bartonjs added api-approved API was approved in API review, it can be implemented and removed api-ready-for-review API is ready for review, it is NOT ready for implementation labels Apr 27, 2021
@GrabYourPitchforks
Copy link
Member

My concern specifically was with this API being promoted as a way to deal with TLV-structured data. What I absolutely do not want to have happen is for this pattern to become commonplace:

int payloadLength = await reader.ReadInt32Async();
await reader.ReadAtLeastAsync(payloadLength); // buffer data
// consume buffer once await returns

If the untrusted payload contains a TLV header which says "2GB of data follows," then flowing this value as-is to ReadAtLeastAsync could result in a denial of service attack against the application.

I don't think this is an existential threat to the API as proposed, since we have lots of APIs which misbehave when given untrusted input. But we really need to be careful with documentation and sample code here. And if we don't think docs are sufficient then we should consider placing restrictions on how large a number this API is willing to accept by default.

@danmoseley
Copy link
Member

If we want to bound the size (4K or something?), the first release is the time to do it. I see length prefixed values were even called out in the top proposal as a scenario.

@davidfowl
Copy link
Member Author

davidfowl commented Apr 27, 2021

We don't need to bound anything, the nice thing about pipelines is that it'll give you nice split buffers up by without destroying the heap. Now we can talk about the fact that memory is being bloated but that's not a concern here, let the upper layers deal with maximum message sizes before calling this API. It's what all serializers of this form do already.

@GrabYourPitchforks
Copy link
Member

Now we can talk about the fact that memory is being bloated but that's not a concern here

That was my specific concern. Ultimately it doesn't matter whether the API allocates a single 100MB buffer or 10,000 x 10KB buffers. That's an implementation detail. The problem fundamentally is that we don't want the untrusted remote endpoint to coerce the application into doing significantly more work / consuming significantly more resources than the application intended.

let the upper layers deal with maximum message sizes before calling this API

If we put a blurb like this in the doc and sample code, I think that would address the concerns. Thanks!

@davidfowl
Copy link
Member Author

davidfowl commented Apr 27, 2021

That was my specific concern. Ultimately it doesn't matter whether the API allocates a single 100MB buffer or 10,000 x 10KB buffers. That's an implementation detail. The problem fundamentally is that we don't want the untrusted remote endpoint to coerce the application into doing significantly more work / consuming significantly more resources than the application intended.

It should be one of your concerns though because there are 2 common "issues" here. Code like this:

// This is bad code, do not copy it
int payloadLength = await stream.ReadInt32Async();
var buffer = new byte[payloadLength]; // Untrusted length, did the client even send this much data? Why are we assuming they did before actually reading it
await stream.ReadAsync(buffer); // read the data

This pattern is one of the traps this API helps you avoid. Plus the other issue you mention (not having a maximum message size). e.g. https://github.com/protocolbuffers/protobuf/blob/3f5fc4df1de8e12b2235c3006593e22d6993c3f5/csharp/src/Google.Protobuf/ParsingPrimitives.cs#L469-L475

If we put a blurb like this in the doc and sample code, I think that would address the concerns. Thanks!

That's absolutely reasonable since it's already a concern today but I can see why this API invites you to do it (since the pipe helps you buffer).

@davidfowl
Copy link
Member Author

davidfowl commented Apr 28, 2021

I notice we didn't define the behavior of ReadAtLeast(0). I think it should signify, wait until you have any data (since we have TryRead for synchronous attempts). But I guess that naturally falls out 😄

@ghost ghost added the in-pr There is an active PR which will close this issue when it is merged label Apr 28, 2021
@tmds
Copy link
Member

tmds commented Apr 28, 2021

let the upper layers deal with maximum message sizes before calling this API

When using this API it's important to have a timeout in case ReadAtLeast can't complete in a reasonable time.
Maybe that's also something upper layers will handle, and not something a user should do manually using the CancellationToken? It would be good to have something about it in the docs too.

@davidfowl
Copy link
Member Author

How is that different from any other async API? That is what cancellation is for

@tmds
Copy link
Member

tmds commented Apr 28, 2021

Some timeouts happen without the use of a CancellationToken.

It depends on the API how important the timeout case is. For this API (and related) it's important.

@davidfowl
Copy link
Member Author

What API times out without a token?

@tmds
Copy link
Member

tmds commented Apr 28, 2021

For the PipeWriter, KestrelServerLimits.MinResponseDataRate controls a timeout that works without the user specifying a CancellationToken.

@tmds
Copy link
Member

tmds commented Apr 28, 2021

I'm fine if it's done through a CancellationToken for ReadAtLeast. My request is to mention how timeouts should be handled in the docs/example.

@davidfowl
Copy link
Member Author

davidfowl commented Apr 28, 2021

Those aren't APIs in the same vein as this so not comparable (and the PipeWriter in ASP.NET Core is extremely specific and tied to Kestrel's implementation...). Timeouts are handled via cancellation for async APIs.

@davidfowl davidfowl assigned davidfowl and unassigned halter73 Apr 30, 2021
@ghost ghost removed the in-pr There is an active PR which will close this issue when it is merged label May 1, 2021
@ghost ghost locked as resolved and limited conversation to collaborators May 31, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-approved API was approved in API review, it can be implemented area-System.IO.Pipelines
Projects
None yet
Development

Successfully merging a pull request may close this issue.