Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit aca4676

Browse files
authored
TryGet for ReadOnlysequence (#27229)
1 parent cb00bee commit aca4676

20 files changed

+333
-120
lines changed

src/System.IO.Pipelines/ref/System.IO.Pipelines.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ protected PipeWriter() { }
8282
public abstract System.IO.Pipelines.PipeAwaiter<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
8383
public abstract System.Memory<byte> GetMemory(int minimumLength = 0);
8484
public abstract System.Span<byte> GetSpan(int minimumLength = 0);
85-
public abstract int MaxBufferSize { get; }
8685
public abstract void OnReaderCompleted(System.Action<System.Exception, object> callback, object state);
8786
public virtual System.IO.Pipelines.PipeAwaiter<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<byte> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
8887
}

src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.DefaultPipeWriter.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@ public DefaultPipeWriter(Pipe pipe)
3030

3131
public override void Advance(int bytes) => _pipe.Advance(bytes);
3232

33-
public override Memory<byte> GetMemory(int minimumLength = 0) => _pipe.GetMemory(minimumLength);
33+
public override Memory<byte> GetMemory(int sizeHint = 0) => _pipe.GetMemory(sizeHint);
3434

35-
public override Span<byte> GetSpan(int minimumLength = 0) => _pipe.GetMemory(minimumLength).Span;
36-
37-
public override int MaxBufferSize => _pipe._pool.MaxBufferSize;
35+
public override Span<byte> GetSpan(int sizeHint = 0) => _pipe.GetMemory(sizeHint).Span;
3836

3937
public bool IsCompleted => _pipe.IsFlushAsyncCompleted;
4038

src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -111,30 +111,29 @@ private void ResetState()
111111
_length = 0;
112112
}
113113

114-
internal Memory<byte> GetMemory(int minimumSize)
114+
internal Memory<byte> GetMemory(int sizeHint)
115115
{
116116
if (_writerCompletion.IsCompleted)
117117
{
118118
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
119119
}
120120

121-
if (minimumSize < 0)
121+
if (sizeHint < 0)
122122
{
123123
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize);
124124
}
125125

126126
lock (_sync)
127127
{
128-
BufferSegment segment = _writingHead ?? AllocateWriteHeadUnsynchronized(minimumSize);
128+
BufferSegment segment = _writingHead ?? AllocateWriteHeadUnsynchronized(sizeHint);
129129

130130
int bytesLeftInBuffer = segment.WritableBytes;
131131

132132
// If inadequate bytes left or if the segment is readonly
133-
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < minimumSize || segment.ReadOnly)
133+
if (bytesLeftInBuffer == 0 || bytesLeftInBuffer < sizeHint || segment.ReadOnly)
134134
{
135135
BufferSegment nextSegment = CreateSegmentUnsynchronized();
136-
137-
nextSegment.SetMemory(_pool.Rent(Math.Max(_minimumSegmentSize, minimumSize)));
136+
nextSegment.SetMemory(_pool.Rent(GetSegmentSize(sizeHint)));
138137

139138
segment.SetNext(nextSegment);
140139

@@ -145,7 +144,7 @@ internal Memory<byte> GetMemory(int minimumSize)
145144
return _writingHead.AvailableMemory.Slice(_writingHead.End, _writingHead.WritableBytes);
146145
}
147146

148-
private BufferSegment AllocateWriteHeadUnsynchronized(int count)
147+
private BufferSegment AllocateWriteHeadUnsynchronized(int sizeHint)
149148
{
150149
BufferSegment segment = null;
151150

@@ -154,7 +153,7 @@ private BufferSegment AllocateWriteHeadUnsynchronized(int count)
154153
// Try to return the tail so the calling code can append to it
155154
int remaining = _commitHead.WritableBytes;
156155

157-
if (count <= remaining && remaining > 0)
156+
if (sizeHint <= remaining && remaining > 0)
158157
{
159158
// Free tail space of the right amount, use that
160159
segment = _commitHead;
@@ -165,7 +164,7 @@ private BufferSegment AllocateWriteHeadUnsynchronized(int count)
165164
{
166165
// No free tail space, allocate a new segment
167166
segment = CreateSegmentUnsynchronized();
168-
segment.SetMemory(_pool.Rent(Math.Max(_minimumSegmentSize, count)));
167+
segment.SetMemory(_pool.Rent(GetSegmentSize(sizeHint)));
169168
}
170169

171170
if (_commitHead == null)
@@ -186,6 +185,15 @@ private BufferSegment AllocateWriteHeadUnsynchronized(int count)
186185
return segment;
187186
}
188187

188+
private int GetSegmentSize(int sizeHint)
189+
{
190+
// First we need to handle case where hint is smaller than minimum segment size
191+
var adjustedToMinimumSize = Math.Max(_minimumSegmentSize, sizeHint);
192+
// After that adjust it to fit into pools max buffer size
193+
var adjustedToMaximumSize = Math.Min(_pool.MaxBufferSize, adjustedToMinimumSize);
194+
return adjustedToMaximumSize;
195+
}
196+
189197
private BufferSegment CreateSegmentUnsynchronized()
190198
{
191199
if (_pooledSegmentCount > 0)
@@ -338,26 +346,26 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
338346
lock (_sync)
339347
{
340348
var examinedEverything = false;
341-
if (examined.Segment == _commitHead)
349+
if (examined.GetObject() == _commitHead)
342350
{
343-
examinedEverything = _commitHead != null ? examined.Index == _commitHeadIndex - _commitHead.Start : examined.Index == 0;
351+
examinedEverything = _commitHead != null ? examined.GetInteger() == _commitHeadIndex - _commitHead.Start : examined.GetInteger() == 0;
344352
}
345353

346-
if (consumed.Segment != null)
354+
if (consumed.GetObject() != null)
347355
{
348356
if (_readHead == null)
349357
{
350358
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
351359
return;
352360
}
353361

354-
var consumedSegment = (BufferSegment)consumed.Segment;
362+
var consumedSegment = (BufferSegment)consumed.GetObject();
355363

356364
returnStart = _readHead;
357365
returnEnd = consumedSegment;
358366

359367
// Check if we crossed _maximumSizeLow and complete backpressure
360-
long consumedBytes = new ReadOnlySequence<byte>(returnStart, _readHeadIndex, consumedSegment, consumed.Index).Length;
368+
long consumedBytes = new ReadOnlySequence<byte>(returnStart, _readHeadIndex, consumedSegment, consumed.GetInteger()).Length;
361369
long oldLength = _length;
362370
_length -= consumedBytes;
363371

@@ -370,7 +378,7 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
370378
// Check if we consumed entire last segment
371379
// if we are going to return commit head we need to check that there is no writing operation that
372380
// might be using tailspace
373-
if (consumed.Index == returnEnd.Length && _writingHead != returnEnd)
381+
if (consumed.GetInteger() == returnEnd.Length && _writingHead != returnEnd)
374382
{
375383
BufferSegment nextBlock = returnEnd.NextSegment;
376384
if (_commitHead == returnEnd)
@@ -386,7 +394,7 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined
386394
else
387395
{
388396
_readHead = consumedSegment;
389-
_readHeadIndex = consumed.Index;
397+
_readHeadIndex = consumed.GetInteger();
390398
}
391399
}
392400

src/System.IO.Pipelines/src/System/IO/Pipelines/PipeWriter.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,10 @@ public abstract class PipeWriter : IBufferWriter<byte>
3737
public abstract void Advance(int bytes);
3838

3939
/// <inheritdoc />
40-
public abstract Memory<byte> GetMemory(int minimumLength = 0);
40+
public abstract Memory<byte> GetMemory(int sizeHint = 0);
4141

4242
/// <inheritdoc />
43-
public abstract Span<byte> GetSpan(int minimumLength = 0);
44-
45-
/// <inheritdoc />
46-
public abstract int MaxBufferSize { get; }
43+
public abstract Span<byte> GetSpan(int sizeHint = 0);
4744

4845
/// <summary>
4946
/// Writes <paramref name="source"/> to the pipe and makes data accessible to <see cref="PipeReader"/>

src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ public async Task HelloWorldAcrossTwoBlocks()
233233
// block 1 -> block2
234234
// [padding..hello] -> [ world ]
235235
PipeWriter writeBuffer = _pipe.Writer;
236-
var blockSize = _pipe.Writer.GetMemory(0).Length;
236+
var blockSize = _pipe.Writer.GetMemory().Length;
237237

238238
byte[] paddingBytes = Enumerable.Repeat((byte)'a', blockSize - 5).ToArray();
239239
byte[] bytes = Encoding.ASCII.GetBytes("Hello World");

src/System.IO.Pipelines/tests/PipeWriterTests.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,5 +197,13 @@ public void ThrowsOnAdvanceWithNoMemory()
197197
var exception = Assert.Throws<InvalidOperationException>(() => buffer.Advance(1));
198198
Assert.Equal("No writing operation. Make sure GetMemory() was called.", exception.Message);
199199
}
200+
201+
[Fact]
202+
public void GetMemory_AdjustsToPoolMaxBufferSize()
203+
{
204+
PipeWriter buffer = Pipe.Writer;
205+
var memory = buffer.GetMemory(int.MaxValue);
206+
Assert.True(memory.Length >= 4096);
207+
}
200208
}
201209
}

src/System.IO.Pipelines/tests/TestMemoryPool.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ protected override void Dispose(bool disposing)
2525
_disposed = true;
2626
}
2727

28-
public override int MaxBufferSize => _pool.MaxBufferSize;
28+
public override int MaxBufferSize => 4096;
2929

3030
internal void CheckDisposed()
3131
{

src/System.Memory/ref/System.Memory.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,10 @@ public ref partial struct Enumerator
165165
{
166166
private readonly object _dummy;
167167
public SequencePosition(object segment, int index) { throw null; }
168-
public int Index { get { throw null; } }
169-
public object Segment { get { throw null; } }
168+
[System.ComponentModel.EditorBrowsableAttribute((System.ComponentModel.EditorBrowsableState)(1))]
169+
public int GetInteger() { throw null; }
170+
[System.ComponentModel.EditorBrowsableAttribute((System.ComponentModel.EditorBrowsableState)(1))]
171+
public object GetObject() { throw null; }
170172
[System.ComponentModel.EditorBrowsableAttribute((System.ComponentModel.EditorBrowsableState)(1))]
171173
public override bool Equals(object obj) { throw null; }
172174
public bool Equals(System.SequencePosition position) { throw null; }
@@ -219,9 +221,8 @@ namespace System.Buffers
219221
public partial interface IBufferWriter<T>
220222
{
221223
void Advance(int count);
222-
System.Memory<T> GetMemory(int minimumLength = 0);
223-
System.Span<T> GetSpan(int minimumLength = 0);
224-
int MaxBufferSize { get; }
224+
System.Memory<T> GetMemory(int sizeHint = 0);
225+
System.Span<T> GetSpan(int sizeHint = 0);
225226
}
226227
public partial interface IMemoryList<T>
227228
{
@@ -512,4 +513,12 @@ public static bool TryGetOwnedMemory<T, TOwner>(ReadOnlyMemory<T> readOnlyMemory
512513
public static bool TryGetOwnedMemory<T, TOwner>(ReadOnlyMemory<T> readOnlyMemory, out TOwner ownedMemory, out int index, out int length)
513514
where TOwner : System.Buffers.OwnedMemory<T> { throw null; }
514515
}
516+
517+
public static partial class SequenceMarshal
518+
{
519+
public static bool TryGetArray<T>(System.Buffers.ReadOnlySequence<T> sequence, out System.ArraySegment<T> arraySegment) { throw null; }
520+
public static bool TryGetMemoryList<T>(System.Buffers.ReadOnlySequence<T> sequence, out System.Buffers.IMemoryList<T> startSegment, out int startIndex, out System.Buffers.IMemoryList<T> endSegment, out int endIndex) { throw null; }
521+
public static bool TryGetOwnedMemory<T>(System.Buffers.ReadOnlySequence<T> sequence, out System.Buffers.OwnedMemory<T> ownedMemory, out int start, out int length) { throw null; }
522+
public static bool TryGetReadOnlyMemory<T>(System.Buffers.ReadOnlySequence<T> sequence, out System.ReadOnlyMemory<T> readOnlyMemory) { throw null; }
523+
}
515524
}

src/System.Memory/src/System.Memory.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
<Compile Include="System\Buffers\Binary\WriterLittleEndian.cs" />
4646
<Compile Include="System\Buffers\Text\Base64Decoder.cs" />
4747
<Compile Include="System\Buffers\Text\Base64Encoder.cs" />
48+
<Compile Include="System\Runtime\InteropServices\SequenceMarshal.cs" />
4849
</ItemGroup>
4950
<!-- Utf8 Formatter/Parser -->
5051
<ItemGroup>
@@ -159,4 +160,4 @@
159160
<ProjectReference Include="..\..\System.Numerics.Vectors\src\System.Numerics.Vectors.csproj" />
160161
</ItemGroup>
161162
<Import Project="$([MSBuild]::GetDirectoryNameOfFileAbove($(MSBuildThisFileDirectory), dir.targets))\dir.targets" />
162-
</Project>
163+
</Project>

src/System.Memory/src/System/Buffers/BuffersExtensions.cs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,24 @@ public static void Write<T>(this IBufferWriter<T> bufferWriter, ReadOnlySpan<T>
7070
return;
7171
}
7272

73-
while (source.Length > 0)
74-
{
75-
int writeSize = destination.Length;
76-
77-
if (destination.Length == 0)
78-
{
79-
writeSize = Math.Min(source.Length, bufferWriter.MaxBufferSize);
80-
destination = bufferWriter.GetSpan(writeSize);
81-
}
73+
WriteMultiSegment(bufferWriter, source, destination);
74+
}
8275

76+
private static void WriteMultiSegment<T>(IBufferWriter<T> bufferWriter, ReadOnlySpan<T> source, Span<T> destination)
77+
{
78+
while (true)
79+
{
80+
int writeSize = Math.Min(destination.Length, source.Length);
8381
source.Slice(0, writeSize).CopyTo(destination);
8482
bufferWriter.Advance(writeSize);
8583
source = source.Slice(writeSize);
86-
destination = default;
84+
if (source.Length > 0)
85+
{
86+
destination = bufferWriter.GetSpan(source.Length);
87+
continue;
88+
}
89+
90+
return;
8791
}
8892
}
8993
}

0 commit comments

Comments
 (0)