Skip to content
This repository was archived by the owner on Oct 10, 2018. It is now read-only.
This repository was archived by the owner on Oct 10, 2018. It is now read-only.

Possible performance improvement when dealing with HTTP Servers with small buffers #57

@mkosieradzki

Description

@mkosieradzki

I have discovered following issue in the Service Fabric Reverse Proxy.

Please see: microsoft/service-fabric-issues#210 for details.

However there is a possible performance improvement in ASP.NET Core Proxy to utilize the full buffer in case its underlying http server provides data slowly.

I have created an alternative CopyAsync implementation which uses buffer cyclically and more actively drains the source stream. It performs much better than original CopyAsync in scenario where source stream provides data in small chunks e.g. 4KiB and destination stream has Write with non-zero latency.

In that case CopyAsync has its throughput artificially decreased to: chunksize / latency. For example 4KiB / 50ms = 640kbps.

Below is the code of my proposed GreedyCopyToAsync based on the original Stream.CopyToAsync:

        public static async Task GreedyCopyToAsync(this Stream source, Stream destination, int bufferSize, CancellationToken cancellationToken)
        {
            var buffer = ArrayPool<byte>.Shared.Rent(bufferSize);

            long readPos = 0;
            long readReservedTo = 0;
            long writePos = 0;
            int bytesRead = -1;
            int bytesWritten = -1;

            var currentReadTask = Task.CompletedTask;
            var currentWriteTask = Task.CompletedTask;

            try
            {
                while (true)
                {
                    var writeCompleted = currentWriteTask.IsCompleted;
                    var readCompleted = currentReadTask.IsCompleted;

                    if (bytesWritten > 0 && writeCompleted)
                    {
                        writePos += bytesWritten;
                        bytesWritten = -1;
                    }

                    if (bytesRead > 0 && readCompleted)
                    {
                        readPos += bytesRead;
                        readReservedTo = readPos;
                        bytesRead = -1;
                    }

                    if (readCompleted)
                    {
                        if (bytesRead == 0) //End of stream reached
                        {
                            currentReadTask = currentWriteTask;
                        }
                        else
                        {
                            var readBuffBytesAvailable = buffer.Length - (int)(readPos - writePos);
                            if (readBuffBytesAvailable > 0)
                            {
                                var buffPos = (int)(readPos % buffer.Length);
                                var toRead = Math.Min(buffer.Length - buffPos, readBuffBytesAvailable);
                                currentReadTask = source.ReadAsync(buffer, buffPos, toRead, cancellationToken).ContinueWith(res => bytesRead = res.Result);
                                readReservedTo = readPos + toRead;
                            }
                            else
                            {
                                currentReadTask = currentWriteTask;
                            }
                        }
                    }

                    if (writeCompleted)
                    {
                        var writeBuffBytesAvailable = (int)(readPos - writePos);

                        if (writeBuffBytesAvailable > 0)
                        {
                            var buffPos = (int)(writePos % buffer.Length);
                            var toWrite = Math.Min(buffer.Length - buffPos, writeBuffBytesAvailable);
                            currentWriteTask = destination.WriteAsync(buffer, buffPos, toWrite, cancellationToken);
                            bytesWritten = toWrite;
                        }
                        else if (bytesRead == 0) //End of stream reached
                        {
                            return;
                        }
                        else
                        {
                            currentWriteTask = currentReadTask;
                        }
                    }

                    await Task.WhenAny(currentReadTask, currentWriteTask).ConfigureAwait(false);
                }
            }
            finally
            {
                Array.Clear(buffer, 0, readPos > bufferSize ? bufferSize : (int)readPos); // clear only the most we used
                ArrayPool<byte>.Shared.Return(buffer, clearArray: false);
            }
        }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions