diff --git a/src/System.IO.Pipelines/src/Properties/InternalsVisibleTo.cs b/src/System.IO.Pipelines/src/Properties/InternalsVisibleTo.cs new file mode 100644 index 000000000000..76ab3f9ae612 --- /dev/null +++ b/src/System.IO.Pipelines/src/Properties/InternalsVisibleTo.cs @@ -0,0 +1,9 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +//------------------------------------------------------------ + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("System.IO.Pipelines.Tests, PublicKey=002400000480000094000000060200000024000052534131000400000100010015c01ae1f50e8cc09ba9eac9147cf8fd9fce2cfe9f8dce4f7301c4132ca9fb50ce8cbf1df4dc18dd4d210e4345c744ecb3365ed327efdbc52603faa5e21daa11234c8c4a73e51f03bf192544581ebe107adee3a34928e39d04e524a9ce729d5090bfd7dad9d10c722c0def9ccc08ff0a03790e48bcd1f9b6c476063e1966a1c4")] diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs index 5a844e79d87e..7cf3ad719472 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/Pipe.cs @@ -419,14 +419,14 @@ internal void AdvanceReader(SequencePosition consumed, SequencePosition examined _readerAwaitable.Reset(); } - _readingState.End(); - while (returnStart != null && returnStart != returnEnd) { returnStart.ResetMemory(); ReturnSegmentUnsynchronized(returnStart); returnStart = returnStart.NextSegment; } + + _readingState.End(); } TrySchedule(_writerScheduler, continuation); @@ -467,7 +467,7 @@ internal void OnWriterCompleted(Action callback, object state { if (callback == null) { - ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.callback); + ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback); } PipeCompletionCallbacks completionCallbacks; diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs index b0fcb397e8e6..3bdcacbd7285 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeAwaitable.cs @@ -2,11 +2,13 @@ // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; namespace System.IO.Pipelines { + [DebuggerDisplay("CancelledState: {_canceledState}, IsCompleted: {IsCompleted}")] internal struct PipeAwaitable { private static readonly Action _awaitableIsCompleted = () => { }; @@ -134,11 +136,6 @@ public bool ObserveCancelation() return false; } - public override string ToString() - { - return $"CancelledState: {_canceledState}, {nameof(IsCompleted)}: {IsCompleted}"; - } - private enum CanceledState { NotCanceled = 0, diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs index 321554bb765b..6e9600be95df 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeCompletion.cs @@ -9,6 +9,7 @@ namespace System.IO.Pipelines { + [DebuggerDisplay("IsCompleted: {" + nameof(IsCompleted) + "}")] internal struct PipeCompletion { private static readonly ArrayPool CompletionCallbackPool = ArrayPool.Shared; diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderState.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderState.cs index 2a8e195be70c..223a1e58a387 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderState.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/PipeReaderState.cs @@ -1,10 +1,12 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Diagnostics; using System.Runtime.CompilerServices; namespace System.IO.Pipelines { + [DebuggerDisplay("State: {_state}")] internal struct PipeReaderState { private State _state; @@ -38,18 +40,13 @@ public void End() { if (_state == State.Inactive) { - ThrowHelper.CreateInvalidOperationException_NoReadToComplete(); + ThrowHelper.ThrowInvalidOperationException_NoReadToComplete(); } _state = State.Inactive; } public bool IsActive => _state == State.Active; - - public override string ToString() - { - return $"State: {_state}"; - } } internal enum State: byte diff --git a/src/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs b/src/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs index a33fd8fe3c1f..fa43618d45b8 100644 --- a/src/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs +++ b/src/System.IO.Pipelines/src/System/IO/Pipelines/ThrowHelper.cs @@ -14,7 +14,7 @@ internal class ThrowHelper internal static void ThrowArgumentNullException(ExceptionArgument argument) { throw CreateArgumentNullException(argument); } [MethodImpl(MethodImplOptions.NoInlining)] - private static Exception CreateArgumentNullException(ExceptionArgument argument) { return new ArgumentOutOfRangeException(argument.ToString()); } + private static Exception CreateArgumentNullException(ExceptionArgument argument) { return new ArgumentNullException(argument.ToString()); } public static void ThrowInvalidOperationException_NotWritingNoAlloc() { @@ -157,7 +157,7 @@ public static void ThrowInvalidOperationException_ResetIncompleteReaderWriter() [MethodImpl(MethodImplOptions.NoInlining)] public static Exception CreateInvalidOperationException_ResetIncompleteReaderWriter() { - return new InvalidOperationException(SR.AdvanceToInvalidCursor); + return new InvalidOperationException(SR.ReaderAndWriterHasToBeCompleted); } } diff --git a/src/System.IO.Pipelines/tests/PipeLengthTests.cs b/src/System.IO.Pipelines/tests/PipeLengthTests.cs index 9901da9d5583..fe413dae817e 100644 --- a/src/System.IO.Pipelines/tests/PipeLengthTests.cs +++ b/src/System.IO.Pipelines/tests/PipeLengthTests.cs @@ -1,102 +1,102 @@ -//// Licensed to the .NET Foundation under one or more agreements. -//// The .NET Foundation licenses this file to you under the MIT license. -//// See the LICENSE file in the project root for more information. - -//using Xunit; - -//namespace System.IO.Pipelines.Tests -//{ -// public class PipeLengthTests : IDisposable -// { -// public PipeLengthTests() -// { -// _pool = new TestMemoryPool(); -// _pipe = new Pipe(new PipeOptions(_pool)); -// } - -// public void Dispose() -// { -// _pipe.Writer.Complete(); -// _pipe.Reader.Complete(); -// _pool?.Dispose(); -// } - -// private readonly TestMemoryPool _pool; - -// private readonly Pipe _pipe; - -// [Fact] -// public void ByteByByteTest() -// { -// for (var i = 1; i <= 1024 * 1024; i++) -// { -// _pipe.Writer.GetMemory(100); -// _pipe.Writer.Advance(1); -// _pipe.Writer.Commit(); - -// Assert.Equal(i, _pipe.Length); -// } - -// _pipe.Writer.FlushAsync(); - -// for (int i = 1024 * 1024 - 1; i >= 0; i--) -// { -// ReadResult result = _pipe.Reader.ReadAsync().GetResult(); -// SequencePosition consumed = result.Buffer.Slice(1).Start; - -// Assert.Equal(i + 1, result.Buffer.Length); - -// _pipe.Reader.AdvanceTo(consumed, consumed); - -// Assert.Equal(i, _pipe.Length); -// } -// } - -// [Fact] -// public void LengthCorrectAfterAlloc0AdvanceCommit() -// { -// _pipe.Writer.GetMemory(0); -// _pipe.Writer.WriteEmpty(10); -// _pipe.Writer.Commit(); - -// Assert.Equal(10, _pipe.Length); -// } - -// [Fact] -// public void LengthCorrectAfterAllocAdvanceCommit() -// { -// PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(10); -// writableBuffer.Commit(); - -// Assert.Equal(10, _pipe.Length); -// } - -// [Fact] -// public void LengthDecreasedAfterReadAdvanceConsume() -// { -// _pipe.Writer.GetMemory(100); -// _pipe.Writer.Advance(10); -// _pipe.Writer.Commit(); -// _pipe.Writer.FlushAsync(); - -// ReadResult result = _pipe.Reader.ReadAsync().GetResult(); -// SequencePosition consumed = result.Buffer.Slice(5).Start; -// _pipe.Reader.AdvanceTo(consumed, consumed); - -// Assert.Equal(5, _pipe.Length); -// } - -// [Fact] -// public void LengthNotChangeAfterReadAdvanceExamine() -// { -// PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(10); -// writableBuffer.Commit(); -// writableBuffer.FlushAsync(); - -// ReadResult result = _pipe.Reader.ReadAsync().GetResult(); -// _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End); - -// Assert.Equal(10, _pipe.Length); -// } -// } -//} +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using Xunit; + +namespace System.IO.Pipelines.Tests +{ + public class PipeLengthTests : IDisposable + { + public PipeLengthTests() + { + _pool = new TestMemoryPool(); + _pipe = new Pipe(new PipeOptions(_pool)); + } + + public void Dispose() + { + _pipe.Writer.Complete(); + _pipe.Reader.Complete(); + _pool?.Dispose(); + } + + private readonly TestMemoryPool _pool; + + private readonly Pipe _pipe; + + [Fact] + public void ByteByByteTest() + { + for (var i = 1; i <= 1024 * 1024; i++) + { + _pipe.Writer.GetMemory(100); + _pipe.Writer.Advance(1); + _pipe.Writer.Commit(); + + Assert.Equal(i, _pipe.Length); + } + + _pipe.Writer.FlushAsync(); + + for (int i = 1024 * 1024 - 1; i >= 0; i--) + { + ReadResult result = _pipe.Reader.ReadAsync().GetResult(); + SequencePosition consumed = result.Buffer.Slice(1).Start; + + Assert.Equal(i + 1, result.Buffer.Length); + + _pipe.Reader.AdvanceTo(consumed, consumed); + + Assert.Equal(i, _pipe.Length); + } + } + + [Fact] + public void LengthCorrectAfterAlloc0AdvanceCommit() + { + _pipe.Writer.GetMemory(0); + _pipe.Writer.WriteEmpty(10); + _pipe.Writer.Commit(); + + Assert.Equal(10, _pipe.Length); + } + + [Fact] + public void LengthCorrectAfterAllocAdvanceCommit() + { + PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(10); + writableBuffer.Commit(); + + Assert.Equal(10, _pipe.Length); + } + + [Fact] + public void LengthDecreasedAfterReadAdvanceConsume() + { + _pipe.Writer.GetMemory(100); + _pipe.Writer.Advance(10); + _pipe.Writer.Commit(); + _pipe.Writer.FlushAsync(); + + ReadResult result = _pipe.Reader.ReadAsync().GetResult(); + SequencePosition consumed = result.Buffer.Slice(5).Start; + _pipe.Reader.AdvanceTo(consumed, consumed); + + Assert.Equal(5, _pipe.Length); + } + + [Fact] + public void LengthNotChangeAfterReadAdvanceExamine() + { + PipeWriter writableBuffer = _pipe.Writer.WriteEmpty(10); + writableBuffer.Commit(); + writableBuffer.FlushAsync(); + + ReadResult result = _pipe.Reader.ReadAsync().GetResult(); + _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.End); + + Assert.Equal(10, _pipe.Length); + } + } +} diff --git a/src/System.IO.Pipelines/tests/PipelineReaderWriterFacts.cs b/src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs similarity index 91% rename from src/System.IO.Pipelines/tests/PipelineReaderWriterFacts.cs rename to src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs index e069e856af7c..88603a7519bd 100644 --- a/src/System.IO.Pipelines/tests/PipelineReaderWriterFacts.cs +++ b/src/System.IO.Pipelines/tests/PipeReaderWriterFacts.cs @@ -557,5 +557,56 @@ public async Task WritingDataMakesDataReadableViaPipeline() _pipe.Reader.AdvanceTo(buffer.Start, buffer.Start); } + + [Fact] + public async Task DoubleReadThrows() + { + await _pipe.Writer.WriteAsync(new byte[1]); + PipeAwaiter awaiter = _pipe.Reader.ReadAsync(); + ReadResult result = awaiter.GetAwaiter().GetResult(); + + Assert.Throws(() => awaiter.GetAwaiter().GetResult()); + + _pipe.Reader.AdvanceTo(result.Buffer.Start, result.Buffer.Start); + } + + [Fact] + public void GetResultBeforeCompletedThrows() + { + PipeAwaiter awaiter = _pipe.Reader.ReadAsync(); + + Assert.Throws(() => awaiter.GetAwaiter().GetResult()); + } + + [Fact] + public void CompleteAfterAdvanceThrows() + { + _pipe.Writer.WriteEmpty(4); + + Assert.Throws(() => _pipe.Writer.Complete()); + + _pipe.Commit(); + } + + [Fact] + public async Task AdvanceWithoutReadThrows() + { + await _pipe.Writer.WriteAsync(new byte[3]); + ReadResult readResult = await _pipe.Reader.ReadAsync(); + _pipe.Reader.AdvanceTo(readResult.Buffer.Start); + + InvalidOperationException exception = Assert.Throws(() => _pipe.Reader.AdvanceTo(readResult.Buffer.End)); + Assert.Equal("No reading operation to complete.", exception.Message); + } + + [Fact] + public async Task TryReadAfterReadAsyncThrows() + { + await _pipe.Writer.WriteAsync(new byte[3]); + ReadResult readResult = await _pipe.Reader.ReadAsync(); + + Assert.Throws(() => _pipe.Reader.TryRead(out _)); + _pipe.Reader.AdvanceTo(readResult.Buffer.Start); + } } } diff --git a/src/System.IO.Pipelines/tests/PipeResetTests.cs b/src/System.IO.Pipelines/tests/PipeResetTests.cs index 83c0019ca29c..40659e3c6f91 100644 --- a/src/System.IO.Pipelines/tests/PipeResetTests.cs +++ b/src/System.IO.Pipelines/tests/PipeResetTests.cs @@ -39,7 +39,7 @@ public async Task LengthIsReseted() _pipe.Reset(); - //Assert.Equal(0, _pipe.Length); + Assert.Equal(0, _pipe.Length); } [Fact] diff --git a/src/System.IO.Pipelines/tests/PipeWriterTests.cs b/src/System.IO.Pipelines/tests/PipeWriterTests.cs index 825c7ddce450..edad163479ad 100644 --- a/src/System.IO.Pipelines/tests/PipeWriterTests.cs +++ b/src/System.IO.Pipelines/tests/PipeWriterTests.cs @@ -187,7 +187,7 @@ public void ThrowsOnAdvanceOverMemorySize() { Memory buffer = Pipe.Writer.GetMemory(1); var exception = Assert.Throws(() => Pipe.Writer.Advance(buffer.Length + 1)); - Assert.Equal("Can't advance past buffer size", exception.Message); + Assert.Equal("Can't advance past buffer size.", exception.Message); } [Fact] diff --git a/src/System.IO.Pipelines/tests/SchedulerFacts.cs b/src/System.IO.Pipelines/tests/SchedulerFacts.cs index c3bd3f12e9b1..6cc350742913 100644 --- a/src/System.IO.Pipelines/tests/SchedulerFacts.cs +++ b/src/System.IO.Pipelines/tests/SchedulerFacts.cs @@ -52,34 +52,31 @@ private void Work(object state) [Fact] public async Task DefaultReaderSchedulerRunsInline() { - using (var pool = new TestMemoryPool()) - { - var pipe = new Pipe(new PipeOptions(pool)); + var pipe = new Pipe(); - var id = 0; + var id = 0; - Func doRead = async () => { - ReadResult result = await pipe.Reader.ReadAsync(); + Func doRead = async () => { + ReadResult result = await pipe.Reader.ReadAsync(); - Assert.Equal(Thread.CurrentThread.ManagedThreadId, id); + Assert.Equal(Thread.CurrentThread.ManagedThreadId, id); - pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.End); + pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.End); - pipe.Reader.Complete(); - }; + pipe.Reader.Complete(); + }; - Task reading = doRead(); + Task reading = doRead(); - id = Thread.CurrentThread.ManagedThreadId; + id = Thread.CurrentThread.ManagedThreadId; - PipeWriter buffer = pipe.Writer; - buffer.Write(Encoding.UTF8.GetBytes("Hello World")); - await buffer.FlushAsync(); + PipeWriter buffer = pipe.Writer; + buffer.Write(Encoding.UTF8.GetBytes("Hello World")); + await buffer.FlushAsync(); - pipe.Writer.Complete(); + pipe.Writer.Complete(); - await reading; - } + await reading; } [Fact] @@ -200,5 +197,39 @@ public async Task ReadAsyncCallbackRunsOnReaderScheduler() } } } + + [Fact] + public async Task ThreadPoolScheduler_SchedulesOnThreadPool() + { + var pipe = new Pipe(new PipeOptions(readerScheduler: PipeScheduler.ThreadPool)); + + async Task DoRead() + { + int oid = Thread.CurrentThread.ManagedThreadId; + + ReadResult result = await pipe.Reader.ReadAsync(); + + Assert.NotEqual(oid, Thread.CurrentThread.ManagedThreadId); + Assert.True(Thread.CurrentThread.IsThreadPoolThread); + pipe.Reader.AdvanceTo(result.Buffer.End, result.Buffer.End); + pipe.Reader.Complete(); + } + + bool callbackRan = false; + Task reading = DoRead(); + + PipeWriter buffer = pipe.Writer; + pipe.Writer.OnReaderCompleted((state, exception) => + { + callbackRan = true; + Assert.True(Thread.CurrentThread.IsThreadPoolThread); + }, null); + buffer.Write(Encoding.UTF8.GetBytes("Hello World")); + await buffer.FlushAsync(); + + await reading; + + Assert.True(callbackRan); + } } } diff --git a/src/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj b/src/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj index 2a7524e707c1..ffbfb1305576 100644 --- a/src/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj +++ b/src/System.IO.Pipelines/tests/System.IO.Pipelines.Tests.csproj @@ -13,7 +13,7 @@ - + @@ -27,5 +27,10 @@ + + + + + \ No newline at end of file