Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.

Commit

Permalink
Better defaults for the Pipe (#35939)
Browse files Browse the repository at this point in the history
- Use a 4K buffer instead of 2K (4K is a very common buffer size and usually maps to system page size)
- Use a stack for the buffer segment pool and allow pooling more than 16 segments for large writes
  • Loading branch information
davidfowl committed Mar 11, 2019
1 parent 6eedd6f commit b49b512
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 17 deletions.
20 changes: 9 additions & 11 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information. // See the LICENSE file in the project root for more information.


using System.Buffers; using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics; using System.Diagnostics;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Threading; using System.Threading;
Expand All @@ -16,7 +17,8 @@ namespace System.IO.Pipelines
/// </summary> /// </summary>
public sealed partial class Pipe public sealed partial class Pipe
{ {
internal const int SegmentPoolSize = 16; internal const int InitialSegmentPoolSize = 16; // 65K
internal const int MaxPoolSize = 256; // 1MB


private static readonly Action<object> s_signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested(); private static readonly Action<object> s_signalReaderAwaitable = state => ((Pipe)state).ReaderCancellationRequested();
private static readonly Action<object> s_signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested(); private static readonly Action<object> s_signalWriterAwaitable = state => ((Pipe)state).WriterCancellationRequested();
Expand All @@ -42,7 +44,7 @@ public sealed partial class Pipe
private readonly PipeScheduler _readerScheduler; private readonly PipeScheduler _readerScheduler;
private readonly PipeScheduler _writerScheduler; private readonly PipeScheduler _writerScheduler;


private readonly BufferSegment[] _bufferSegmentPool; private readonly Stack<BufferSegment> _bufferSegmentPool;


private readonly DefaultPipeReader _reader; private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer; private readonly DefaultPipeWriter _writer;
Expand All @@ -52,8 +54,6 @@ public sealed partial class Pipe
private long _length; private long _length;
private long _currentWriteLength; private long _currentWriteLength;


private int _pooledSegmentCount;

private PipeAwaitable _readerAwaitable; private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable; private PipeAwaitable _writerAwaitable;


Expand Down Expand Up @@ -101,7 +101,7 @@ public Pipe(PipeOptions options)
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options); ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
} }


_bufferSegmentPool = new BufferSegment[SegmentPoolSize]; _bufferSegmentPool = new Stack<BufferSegment>(InitialSegmentPoolSize);


_operationState = default; _operationState = default;
_readerCompletion = default; _readerCompletion = default;
Expand Down Expand Up @@ -253,10 +253,9 @@ private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)


private BufferSegment CreateSegmentUnsynchronized() private BufferSegment CreateSegmentUnsynchronized()
{ {
if (_pooledSegmentCount > 0) if (_bufferSegmentPool.Count > 0)
{ {
_pooledSegmentCount--; return _bufferSegmentPool.Pop();
return _bufferSegmentPool[_pooledSegmentCount];
} }


return new BufferSegment(); return new BufferSegment();
Expand All @@ -269,10 +268,9 @@ private void ReturnSegmentUnsynchronized(BufferSegment segment)
Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!"); Debug.Assert(segment != _writingHead, "Returning _writingHead segment that's in use!");
Debug.Assert(segment != _lastExamined, "Returning _lastExamined segment that's in use!"); Debug.Assert(segment != _lastExamined, "Returning _lastExamined segment that's in use!");


if (_pooledSegmentCount < _bufferSegmentPool.Length) if (_bufferSegmentPool.Count < MaxPoolSize)
{ {
_bufferSegmentPool[_pooledSegmentCount] = segment; _bufferSegmentPool.Push(segment);
_pooledSegmentCount++;
} }
} }


Expand Down
Expand Up @@ -12,11 +12,11 @@ namespace System.IO.Pipelines
/// </summary> /// </summary>
public class PipeOptions public class PipeOptions
{ {
private const int DefaultMinimumSegmentSize = 2048; private const int DefaultMinimumSegmentSize = 4096;


private const int DefaultResumeWriterThreshold = DefaultMinimumSegmentSize * Pipe.SegmentPoolSize / 2; private const int DefaultResumeWriterThreshold = DefaultMinimumSegmentSize * Pipe.InitialSegmentPoolSize / 2;


private const int DefaultPauseWriterThreshold = DefaultMinimumSegmentSize * Pipe.SegmentPoolSize; private const int DefaultPauseWriterThreshold = DefaultMinimumSegmentSize * Pipe.InitialSegmentPoolSize;


/// <summary> /// <summary>
/// Default instance of <see cref="PipeOptions"/> /// Default instance of <see cref="PipeOptions"/>
Expand Down
4 changes: 2 additions & 2 deletions src/System.IO.Pipelines/tests/PipeOptionsTests.cs
Expand Up @@ -12,13 +12,13 @@ public class PipeOptionsTests
[Fact] [Fact]
public void DefaultPauseWriterThresholdIsSet() public void DefaultPauseWriterThresholdIsSet()
{ {
Assert.Equal(32768, PipeOptions.Default.PauseWriterThreshold); Assert.Equal(65536, PipeOptions.Default.PauseWriterThreshold);
} }


[Fact] [Fact]
public void DefaultResumeWriterThresholdIsSet() public void DefaultResumeWriterThresholdIsSet()
{ {
Assert.Equal(16384, PipeOptions.Default.ResumeWriterThreshold); Assert.Equal(32768, PipeOptions.Default.ResumeWriterThreshold);
} }


[Fact] [Fact]
Expand Down
7 changes: 6 additions & 1 deletion src/System.IO.Pipelines/tests/PipePoolTests.cs
Expand Up @@ -206,7 +206,12 @@ public void ReturnsWriteHeadOnComplete()
public void ReturnsWriteHeadWhenRequestingLargerBlock() public void ReturnsWriteHeadWhenRequestingLargerBlock()
{ {
var pool = new DisposeTrackingBufferPool(); var pool = new DisposeTrackingBufferPool();
var pipe = new Pipe(CreatePipeWithInlineSchedulers(pool)); var options = new PipeOptions(pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.Inline,
minimumSegmentSize: 2048);

var pipe = new Pipe(options);
pipe.Writer.GetMemory(512); pipe.Writer.GetMemory(512);
pipe.Writer.GetMemory(4096); pipe.Writer.GetMemory(4096);


Expand Down

0 comments on commit b49b512

Please sign in to comment.