From 0aaadbc2f5bd3d18b0bc67c94f609023cafecf87 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 06:54:40 +0000 Subject: [PATCH 1/7] Add ProcessOutputLine struct and Process.ReadAllLinesAsync method - Add ProcessOutputLine readonly struct with Content and StandardError properties - Implement ReadAllLinesAsync as async IAsyncEnumerable in Process.Multiplexing.cs - Add types to reference assembly - Add ProcessStreamingTests.cs with comprehensive tests - Remove unused usings from edited files Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/b28836b3-abff-4763-8aff-2c77ebe0e022 Co-authored-by: adamsitnik <6011991+adamsitnik@users.noreply.github.com> --- .../ref/System.Diagnostics.Process.cs | 7 + .../src/System.Diagnostics.Process.csproj | 1 + .../Diagnostics/Process.Multiplexing.cs | 103 +++++- .../System/Diagnostics/ProcessOutputLine.cs | 39 ++ .../tests/ProcessStreamingTests.cs | 339 ++++++++++++++++++ .../System.Diagnostics.Process.Tests.csproj | 1 + 6 files changed, 489 insertions(+), 1 deletion(-) create mode 100644 src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs create mode 100644 src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs diff --git a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs index 7cd0b3015d5f69..ea8b034805b912 100644 --- a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs +++ b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs @@ -161,6 +161,7 @@ protected void OnExited() { } public System.Threading.Tasks.Task<(byte[] StandardOutput, byte[] StandardError)> ReadAllBytesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public (string StandardOutput, string StandardError) ReadAllText(System.TimeSpan? timeout = default(System.TimeSpan?)) { throw null; } public System.Threading.Tasks.Task<(string StandardOutput, string StandardError)> ReadAllTextAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Collections.Generic.IAsyncEnumerable ReadAllLinesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public void Refresh() { } [System.Runtime.Versioning.UnsupportedOSPlatformAttribute("ios")] [System.Runtime.Versioning.UnsupportedOSPlatformAttribute("tvos")] @@ -225,6 +226,12 @@ public sealed partial class ProcessExitStatus public int ExitCode { get { throw null; } } public System.Runtime.InteropServices.PosixSignal? Signal { get { throw null; } } } + public readonly partial struct ProcessOutputLine + { + public ProcessOutputLine(string content, bool standardError) { throw null; } + public string Content { get { throw null; } } + public bool StandardError { get { throw null; } } + } public enum ProcessPriorityClass { Normal = 32, diff --git a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj index 6db2a72aeb08fa..d315d0a36f6064 100644 --- a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj +++ b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj @@ -22,6 +22,7 @@ + diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 71ec5a984a1096..4b9636751ac0a6 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -2,8 +2,9 @@ // The .NET Foundation licenses this file to you under the MIT license. using System.Buffers; +using System.Collections.Generic; using System.IO; -using System.Runtime.InteropServices; +using System.Runtime.CompilerServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -256,6 +257,106 @@ private static async Task> ReadPipeToBufferAsync(Stream strea } } + /// + /// Asynchronously reads all standard output and standard error of the process as lines of text, + /// interleaving them as they become available. + /// + /// + /// A token to cancel the asynchronous operation. + /// + /// + /// An async enumerable of instances representing the lines + /// read from standard output and standard error. + /// + /// + /// Standard output or standard error has not been redirected. + /// -or- + /// A redirected stream has already been used for synchronous or asynchronous reading. + /// + /// + /// The was canceled. + /// + /// + /// The process has been disposed. + /// + public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ValidateReadAllState(); + + StreamReader outputReader = _standardOutput!; + StreamReader errorReader = _standardError!; + + Task readOutput = outputReader.ReadLineAsync(cancellationToken).AsTask(); + Task readError = errorReader.ReadLineAsync(cancellationToken).AsTask(); + + while (true) + { + Task completedTask = await Task.WhenAny(readOutput, readError).ConfigureAwait(false); + + // When there is data available in both, handle error first. + bool isError = completedTask == readError || (readOutput.IsCompleted && readError.IsCompleted); + + string? line = isError + ? await readError.ConfigureAwait(false) + : await readOutput.ConfigureAwait(false); + + if (line is not null) + { + yield return new ProcessOutputLine(line, isError); + + // Continue reading from the same stream while data is immediately available. + StreamReader activeReader = isError ? errorReader : outputReader; + while (true) + { + ValueTask nextRead = activeReader.ReadLineAsync(cancellationToken); + + if (nextRead.IsCompleted) + { + line = await nextRead.ConfigureAwait(false); + if (line is null) + { + break; + } + + yield return new ProcessOutputLine(line, isError); + } + else + { + if (isError) + { + readError = nextRead.AsTask(); + } + else + { + readOutput = nextRead.AsTask(); + } + + break; + } + } + } + + if (line is null) + { + break; + } + } + + // One stream ended. Drain the remaining data from the other stream. + bool isErrorDone = readError.IsCompleted && await readError.ConfigureAwait(false) is null; + StreamReader remainingReader = isErrorDone ? outputReader : errorReader; + bool remainingIsError = !isErrorDone; + + Task remainingTask = isErrorDone ? readOutput : readError; + string? moreData = await remainingTask.ConfigureAwait(false); + + while (moreData is not null) + { + yield return new ProcessOutputLine(moreData, remainingIsError); + moreData = await remainingReader.ReadLineAsync(cancellationToken).ConfigureAwait(false); + } + } + /// /// Validates that the process is not disposed, both stdout and stderr are redirected, /// and neither stream has been used (mode must be Undefined). Sets both streams to sync mode. diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs new file mode 100644 index 00000000000000..1ba47cf8307b0f --- /dev/null +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs @@ -0,0 +1,39 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +namespace System.Diagnostics +{ + /// + /// Represents a single line of text read from a process's standard output or standard error stream. + /// + public readonly struct ProcessOutputLine + { + /// + /// Initializes a new instance of the struct. + /// + /// The text content of the output line. + /// + /// if the line was read from standard error; + /// otherwise, . + /// + public ProcessOutputLine(string content, bool standardError) + { + Content = content; + StandardError = standardError; + } + + /// + /// Gets the text content of the output line. + /// + public string Content { get; } + + /// + /// Gets a value that indicates whether the line was read from standard error. + /// + /// + /// if the line was read from standard error; + /// otherwise, . + /// + public bool StandardError { get; } + } +} diff --git a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs new file mode 100644 index 00000000000000..7bb9a185865af5 --- /dev/null +++ b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs @@ -0,0 +1,339 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.DotNet.RemoteExecutor; +using Xunit; + +namespace System.Diagnostics.Tests +{ + public class ProcessStreamingTests : ProcessTestBase + { + private const string DontPrintAnything = "DO_NOT_PRINT"; + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ThrowsAfterDispose() + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.Start(); + Assert.True(process.WaitForExit(WaitInMS)); + + process.Dispose(); + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ThrowsWhenNoStreamsRedirected() + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.Start(); + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAllLinesAsync_ThrowsWhenOnlyOutputOrErrorIsRedirected(bool standardOutput) + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.StartInfo.RedirectStandardOutput = standardOutput; + process.StartInfo.RedirectStandardError = !standardOutput; + process.Start(); + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAllLinesAsync_ThrowsWhenOutputOrErrorIsInSyncMode(bool standardOutput) + { + Process process = CreateProcess(RemotelyInvokable.Dummy); + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + // Access the StreamReader property to set the stream to sync mode + _ = standardOutput ? process.StandardOutput : process.StandardError; + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData(true)] + [InlineData(false)] + public async Task ReadAllLinesAsync_ThrowsWhenOutputOrErrorIsInAsyncMode(bool standardOutput) + { + Process process = CreateProcess(RemotelyInvokable.StreamBody); + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + if (standardOutput) + { + process.BeginOutputReadLine(); + } + else + { + process.BeginErrorReadLine(); + } + + await Assert.ThrowsAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + } + }); + + if (standardOutput) + { + process.CancelOutputRead(); + } + else + { + process.CancelErrorRead(); + } + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalTheory(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + [InlineData("hello", "world")] + [InlineData("just output", "")] + [InlineData("", "just error")] + [InlineData("", "")] + public async Task ReadAllLinesAsync_ReadsBothOutputAndError(string standardOutput, string standardError) + { + using Process process = StartLinePrintingProcess( + string.IsNullOrEmpty(standardOutput) ? DontPrintAnything : standardOutput, + string.IsNullOrEmpty(standardError) ? DontPrintAnything : standardError); + + List capturedOutput = new(); + List capturedError = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + if (line.StandardError) + { + capturedError.Add(line.Content); + } + else + { + capturedOutput.Add(line.Content); + } + } + + if (string.IsNullOrEmpty(standardOutput)) + { + Assert.Empty(capturedOutput); + } + else + { + Assert.Equal(new[] { standardOutput }, capturedOutput); + } + + if (string.IsNullOrEmpty(standardError)) + { + Assert.Empty(capturedError); + } + else + { + Assert.Equal(new[] { standardError }, capturedError); + } + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ReadsInterleavedOutput() + { + const int iterations = 100; + using Process process = CreateProcess(() => + { + for (int i = 0; i < iterations; i++) + { + Console.Out.WriteLine($"out{i}"); + Console.Out.Flush(); + Console.Error.WriteLine($"err{i}"); + Console.Error.Flush(); + } + + return RemoteExecutor.SuccessExitCode; + }); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + List capturedOutput = new(); + List capturedError = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + if (line.StandardError) + { + capturedError.Add(line.Content); + } + else + { + capturedOutput.Add(line.Content); + } + } + + List expectedOutput = new(); + List expectedError = new(); + for (int i = 0; i < iterations; i++) + { + expectedOutput.Add($"out{i}"); + expectedError.Add($"err{i}"); + } + + Assert.Equal(expectedOutput, capturedOutput); + Assert.Equal(expectedError, capturedError); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ReadsLargeOutput() + { + const int lineCount = 1000; + using Process process = CreateProcess(() => + { + for (int i = 0; i < lineCount; i++) + { + Console.Out.WriteLine($"line{i}"); + } + + return RemoteExecutor.SuccessExitCode; + }); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + List capturedOutput = new(); + List capturedError = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + if (line.StandardError) + { + capturedError.Add(line.Content); + } + else + { + capturedOutput.Add(line.Content); + } + } + + for (int i = 0; i < lineCount; i++) + { + Assert.Equal($"line{i}", capturedOutput[i]); + } + + Assert.Empty(capturedError); + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ThrowsOperationCanceledOnCancellation() + { + Process process = CreateProcess(RemotelyInvokable.ReadLine); + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.StartInfo.RedirectStandardInput = true; + process.Start(); + + try + { + using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); + + await Assert.ThrowsAnyAsync(async () => + { + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync(cts.Token)) + { + } + }); + } + finally + { + process.Kill(); + } + + Assert.True(process.WaitForExit(WaitInMS)); + } + + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_ProcessOutputLineProperties() + { + using Process process = StartLinePrintingProcess("stdout_line", "stderr_line"); + + List allLines = new(); + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + allLines.Add(line); + } + + Assert.Contains(allLines, line => line.Content == "stdout_line" && !line.StandardError); + Assert.Contains(allLines, line => line.Content == "stderr_line" && line.StandardError); + + Assert.True(process.WaitForExit(WaitInMS)); + } + + private Process StartLinePrintingProcess(string stdOutText, string stdErrText) + { + Process process = CreateProcess((stdOut, stdErr) => + { + if (stdOut != DontPrintAnything) + { + Console.Out.WriteLine(stdOut); + } + + if (stdErr != DontPrintAnything) + { + Console.Error.WriteLine(stdErr); + } + + return RemoteExecutor.SuccessExitCode; + }, stdOutText, stdErrText); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + return process; + } + } +} diff --git a/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj b/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj index dd73a7baa7f533..3cb2d575046965 100644 --- a/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj +++ b/src/libraries/System.Diagnostics.Process/tests/System.Diagnostics.Process.Tests.csproj @@ -32,6 +32,7 @@ + From 0c86843073985452fa1d0ccef04e3a5fce2c4f52 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 06:57:51 +0000 Subject: [PATCH 2/7] Fix drain logic to use isError tracking instead of stale task variables Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/b28836b3-abff-4763-8aff-2c77ebe0e022 Co-authored-by: adamsitnik <6011991+adamsitnik@users.noreply.github.com> --- .../src/System/Diagnostics/Process.Multiplexing.cs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 4b9636751ac0a6..3d5197b7e2b4f4 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -288,13 +288,14 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa Task readOutput = outputReader.ReadLineAsync(cancellationToken).AsTask(); Task readError = errorReader.ReadLineAsync(cancellationToken).AsTask(); + bool isError = false; while (true) { Task completedTask = await Task.WhenAny(readOutput, readError).ConfigureAwait(false); // When there is data available in both, handle error first. - bool isError = completedTask == readError || (readOutput.IsCompleted && readError.IsCompleted); + isError = completedTask == readError || (readOutput.IsCompleted && readError.IsCompleted); string? line = isError ? await readError.ConfigureAwait(false) @@ -343,12 +344,10 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa } // One stream ended. Drain the remaining data from the other stream. - bool isErrorDone = readError.IsCompleted && await readError.ConfigureAwait(false) is null; - StreamReader remainingReader = isErrorDone ? outputReader : errorReader; - bool remainingIsError = !isErrorDone; - - Task remainingTask = isErrorDone ? readOutput : readError; - string? moreData = await remainingTask.ConfigureAwait(false); + // isError tells us which stream returned null, so we drain the opposite stream. + string? moreData = await (isError ? readOutput : readError).ConfigureAwait(false); + StreamReader remainingReader = isError ? outputReader : errorReader; + bool remainingIsError = !isError; while (moreData is not null) { From a29272a5a0822c9e9d49f4bb9b7060fabaf889c7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 08:45:15 +0000 Subject: [PATCH 3/7] Address review feedback: sort ReadAllLinesAsync alphabetically in ref, use Assert.Single in test, fix IDE0059 Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/253bb871-3466-4fd3-bb3d-7f5fb4ece627 Co-authored-by: adamsitnik <6011991+adamsitnik@users.noreply.github.com> --- .../ref/System.Diagnostics.Process.cs | 2 +- .../src/System/Diagnostics/Process.Multiplexing.cs | 2 +- .../System.Diagnostics.Process/tests/ProcessStreamingTests.cs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs index ea8b034805b912..8f4b1e534ccb3d 100644 --- a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs +++ b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs @@ -159,9 +159,9 @@ public static void LeaveDebugMode() { } protected void OnExited() { } public (byte[] StandardOutput, byte[] StandardError) ReadAllBytes(System.TimeSpan? timeout = default(System.TimeSpan?)) { throw null; } public System.Threading.Tasks.Task<(byte[] StandardOutput, byte[] StandardError)> ReadAllBytesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Collections.Generic.IAsyncEnumerable ReadAllLinesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public (string StandardOutput, string StandardError) ReadAllText(System.TimeSpan? timeout = default(System.TimeSpan?)) { throw null; } public System.Threading.Tasks.Task<(string StandardOutput, string StandardError)> ReadAllTextAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public System.Collections.Generic.IAsyncEnumerable ReadAllLinesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public void Refresh() { } [System.Runtime.Versioning.UnsupportedOSPlatformAttribute("ios")] [System.Runtime.Versioning.UnsupportedOSPlatformAttribute("tvos")] diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 3d5197b7e2b4f4..4e88dfb38f2939 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -288,7 +288,7 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa Task readOutput = outputReader.ReadLineAsync(cancellationToken).AsTask(); Task readError = errorReader.ReadLineAsync(cancellationToken).AsTask(); - bool isError = false; + bool isError; while (true) { diff --git a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs index 7bb9a185865af5..708114bd4cd7f1 100644 --- a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs +++ b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs @@ -306,8 +306,8 @@ public async Task ReadAllLinesAsync_ProcessOutputLineProperties() allLines.Add(line); } - Assert.Contains(allLines, line => line.Content == "stdout_line" && !line.StandardError); - Assert.Contains(allLines, line => line.Content == "stderr_line" && line.StandardError); + Assert.Single(allLines, line => line.Content == "stdout_line" && !line.StandardError); + Assert.Single(allLines, line => line.Content == "stderr_line" && line.StandardError); Assert.True(process.WaitForExit(WaitInMS)); } From a79f65305005a099411341637ee02102d023e541 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:41:59 +0000 Subject: [PATCH 4/7] Add null validation, linked CTS for early stop, stderr priority docs, and early-break test Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/4ba7b698-1f25-4f03-9bfc-46024ae09633 Co-authored-by: adamsitnik <6011991+adamsitnik@users.noreply.github.com> --- .../Diagnostics/Process.Multiplexing.cs | 109 +++++++++++------- .../System/Diagnostics/ProcessOutputLine.cs | 2 +- .../tests/ProcessStreamingTests.cs | 33 ++++++ 3 files changed, 100 insertions(+), 44 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 4e88dfb38f2939..843880a7e2d61d 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -268,6 +268,11 @@ private static async Task> ReadPipeToBufferAsync(Stream strea /// An async enumerable of instances representing the lines /// read from standard output and standard error. /// + /// + /// When both standard output and standard error have data available at the same time, + /// standard error lines are yielded first. This is by design to ensure that error output + /// is not delayed behind standard output. + /// /// /// Standard output or standard error has not been redirected. /// -or- @@ -286,73 +291,91 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa StreamReader outputReader = _standardOutput!; StreamReader errorReader = _standardError!; - Task readOutput = outputReader.ReadLineAsync(cancellationToken).AsTask(); - Task readError = errorReader.ReadLineAsync(cancellationToken).AsTask(); + using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + CancellationToken linkedToken = linkedCts.Token; + + Task readOutput = outputReader.ReadLineAsync(linkedToken).AsTask(); + Task readError = errorReader.ReadLineAsync(linkedToken).AsTask(); bool isError; - while (true) + try { - Task completedTask = await Task.WhenAny(readOutput, readError).ConfigureAwait(false); - - // When there is data available in both, handle error first. - isError = completedTask == readError || (readOutput.IsCompleted && readError.IsCompleted); + while (true) + { + Task completedTask = await Task.WhenAny(readOutput, readError).ConfigureAwait(false); - string? line = isError - ? await readError.ConfigureAwait(false) - : await readOutput.ConfigureAwait(false); + // When there is data available in both, handle error first. + isError = completedTask == readError || (readOutput.IsCompleted && readError.IsCompleted); - if (line is not null) - { - yield return new ProcessOutputLine(line, isError); + string? line = isError + ? await readError.ConfigureAwait(false) + : await readOutput.ConfigureAwait(false); - // Continue reading from the same stream while data is immediately available. - StreamReader activeReader = isError ? errorReader : outputReader; - while (true) + if (line is not null) { - ValueTask nextRead = activeReader.ReadLineAsync(cancellationToken); + yield return new ProcessOutputLine(line, isError); - if (nextRead.IsCompleted) + // Continue reading from the same stream while data is immediately available. + StreamReader activeReader = isError ? errorReader : outputReader; + while (true) { - line = await nextRead.ConfigureAwait(false); - if (line is null) - { - break; - } + ValueTask nextRead = activeReader.ReadLineAsync(linkedToken); - yield return new ProcessOutputLine(line, isError); - } - else - { - if (isError) + if (nextRead.IsCompleted) { - readError = nextRead.AsTask(); + line = await nextRead.ConfigureAwait(false); + if (line is null) + { + break; + } + + yield return new ProcessOutputLine(line, isError); } else { - readOutput = nextRead.AsTask(); - } + if (isError) + { + readError = nextRead.AsTask(); + } + else + { + readOutput = nextRead.AsTask(); + } - break; + break; + } } } + + if (line is null) + { + break; + } } - if (line is null) + // One stream ended. Drain the remaining data from the other stream. + // isError tells us which stream returned null, so we drain the opposite stream. + string? moreData = await (isError ? readOutput : readError).ConfigureAwait(false); + StreamReader remainingReader = isError ? outputReader : errorReader; + bool remainingIsError = !isError; + + while (moreData is not null) { - break; + yield return new ProcessOutputLine(moreData, remainingIsError); + moreData = await remainingReader.ReadLineAsync(linkedToken).ConfigureAwait(false); } } + finally + { + // Cancel any in-flight reads when the consumer stops enumerating early + // (e.g., breaks out of await foreach without cancellation). + await linkedCts.CancelAsync().ConfigureAwait(false); - // One stream ended. Drain the remaining data from the other stream. - // isError tells us which stream returned null, so we drain the opposite stream. - string? moreData = await (isError ? readOutput : readError).ConfigureAwait(false); - StreamReader remainingReader = isError ? outputReader : errorReader; - bool remainingIsError = !isError; + try { await readOutput.ConfigureAwait(false); } + catch (OperationCanceledException) { } - while (moreData is not null) - { - yield return new ProcessOutputLine(moreData, remainingIsError); - moreData = await remainingReader.ReadLineAsync(cancellationToken).ConfigureAwait(false); + try { await readError.ConfigureAwait(false); } + catch (OperationCanceledException) { } } } diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs index 1ba47cf8307b0f..c8d9d9ad39bc2f 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/ProcessOutputLine.cs @@ -18,7 +18,7 @@ public readonly struct ProcessOutputLine /// public ProcessOutputLine(string content, bool standardError) { - Content = content; + Content = content ?? throw new ArgumentNullException(nameof(content)); StandardError = standardError; } diff --git a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs index 708114bd4cd7f1..4b2967e1085611 100644 --- a/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs +++ b/src/libraries/System.Diagnostics.Process/tests/ProcessStreamingTests.cs @@ -312,6 +312,39 @@ public async Task ReadAllLinesAsync_ProcessOutputLineProperties() Assert.True(process.WaitForExit(WaitInMS)); } + [ConditionalFact(typeof(RemoteExecutor), nameof(RemoteExecutor.IsSupported))] + public async Task ReadAllLinesAsync_StopsCleanlyWhenConsumerBreaksEarly() + { + using Process process = CreateProcess(() => + { + Console.Out.WriteLine("first"); + Console.Out.Flush(); + Console.Out.WriteLine("second"); + Console.Out.Flush(); + Console.Error.WriteLine("error1"); + Console.Error.Flush(); + + return RemoteExecutor.SuccessExitCode; + }); + + process.StartInfo.RedirectStandardOutput = true; + process.StartInfo.RedirectStandardError = true; + process.Start(); + + ProcessOutputLine? firstLine = null; + + await foreach (ProcessOutputLine line in process.ReadAllLinesAsync()) + { + firstLine = line; + break; // stop after first line + } + + Assert.NotNull(firstLine); + Assert.NotNull(firstLine.Value.Content); + + Assert.True(process.WaitForExit(WaitInMS)); + } + private Process StartLinePrintingProcess(string stdOutText, string stdErrText) { Process process = CreateProcess((stdOut, stdErr) => From f9199555e54a092dd9d95d95a8ecb4878fa7fc12 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:43:30 +0000 Subject: [PATCH 5/7] Add clarifying comment for observed task exception handling in finally block Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/4ba7b698-1f25-4f03-9bfc-46024ae09633 Co-authored-by: adamsitnik <6011991+adamsitnik@users.noreply.github.com> --- .../src/System/Diagnostics/Process.Multiplexing.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 843880a7e2d61d..4809b3100cac15 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -371,6 +371,8 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa // (e.g., breaks out of await foreach without cancellation). await linkedCts.CancelAsync().ConfigureAwait(false); + // Observe the pending tasks to prevent unobserved task exceptions. + // OperationCanceledException is expected from the cancellation above. try { await readOutput.ConfigureAwait(false); } catch (OperationCanceledException) { } From c3056846792b3bacf4144347e4a4ae8acaf1eebe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:21:35 +0000 Subject: [PATCH 6/7] Rewrite ReadAllLinesAsync using Channel-based approach and fix ref struct dummy fields Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/ad403c98-041e-4789-a64e-27b071d06213 Co-authored-by: adamsitnik <6011991+adamsitnik@users.noreply.github.com> --- .../ref/System.Diagnostics.Process.cs | 2 + .../src/System.Diagnostics.Process.csproj | 1 + .../Diagnostics/Process.Multiplexing.cs | 112 ++++++------------ 3 files changed, 39 insertions(+), 76 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs index 8f4b1e534ccb3d..6dcc244f569082 100644 --- a/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs +++ b/src/libraries/System.Diagnostics.Process/ref/System.Diagnostics.Process.cs @@ -228,6 +228,8 @@ public sealed partial class ProcessExitStatus } public readonly partial struct ProcessOutputLine { + private readonly object _dummy; + private readonly int _dummyPrimitive; public ProcessOutputLine(string content, bool standardError) { throw null; } public string Content { get { throw null; } } public bool StandardError { get { throw null; } } diff --git a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj index d315d0a36f6064..ed9d62efe66324 100644 --- a/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj +++ b/src/libraries/System.Diagnostics.Process/src/System.Diagnostics.Process.csproj @@ -424,6 +424,7 @@ + diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index 4809b3100cac15..c060b0181ff458 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -7,6 +7,7 @@ using System.Runtime.CompilerServices; using System.Text; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Win32.SafeHandles; @@ -269,9 +270,9 @@ private static async Task> ReadPipeToBufferAsync(Stream strea /// read from standard output and standard error. /// /// - /// When both standard output and standard error have data available at the same time, - /// standard error lines are yielded first. This is by design to ensure that error output - /// is not delayed behind standard output. + /// Lines from standard output and standard error are yielded as they become available. + /// When the consumer stops enumerating early (for example, by breaking out of + /// ), any pending read operations are canceled. /// /// /// Standard output or standard error has not been redirected. @@ -291,93 +292,52 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa StreamReader outputReader = _standardOutput!; StreamReader errorReader = _standardError!; - using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - CancellationToken linkedToken = linkedCts.Token; + Channel channel = Channel.CreateBounded(0); + int completedCount = 0; - Task readOutput = outputReader.ReadLineAsync(linkedToken).AsTask(); - Task readError = errorReader.ReadLineAsync(linkedToken).AsTask(); - bool isError; + CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + Task outputTask = ReadToChannelAsync(outputReader, standardError: false, linkedCts.Token); + Task errorTask = ReadToChannelAsync(errorReader, standardError: true, linkedCts.Token); try { - while (true) + await foreach (ProcessOutputLine line in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) { - Task completedTask = await Task.WhenAny(readOutput, readError).ConfigureAwait(false); - - // When there is data available in both, handle error first. - isError = completedTask == readError || (readOutput.IsCompleted && readError.IsCompleted); + yield return line; + } + } + finally + { + await linkedCts.CancelAsync().ConfigureAwait(false); - string? line = isError - ? await readError.ConfigureAwait(false) - : await readOutput.ConfigureAwait(false); + // Ensure both tasks complete before disposing the CancellationTokenSource. + // The tasks handle all exceptions internally, so they always run to completion. + await outputTask.ConfigureAwait(false); + await errorTask.ConfigureAwait(false); - if (line is not null) - { - yield return new ProcessOutputLine(line, isError); - - // Continue reading from the same stream while data is immediately available. - StreamReader activeReader = isError ? errorReader : outputReader; - while (true) - { - ValueTask nextRead = activeReader.ReadLineAsync(linkedToken); - - if (nextRead.IsCompleted) - { - line = await nextRead.ConfigureAwait(false); - if (line is null) - { - break; - } - - yield return new ProcessOutputLine(line, isError); - } - else - { - if (isError) - { - readError = nextRead.AsTask(); - } - else - { - readOutput = nextRead.AsTask(); - } - - break; - } - } - } + linkedCts.Dispose(); + } - if (line is null) + async Task ReadToChannelAsync(StreamReader reader, bool standardError, CancellationToken ct) + { + try + { + while (await reader.ReadLineAsync(ct).ConfigureAwait(false) is string line) { - break; + await channel.Writer.WriteAsync(new ProcessOutputLine(line, standardError), ct).ConfigureAwait(false); } } - - // One stream ended. Drain the remaining data from the other stream. - // isError tells us which stream returned null, so we drain the opposite stream. - string? moreData = await (isError ? readOutput : readError).ConfigureAwait(false); - StreamReader remainingReader = isError ? outputReader : errorReader; - bool remainingIsError = !isError; - - while (moreData is not null) + catch (Exception ex) { - yield return new ProcessOutputLine(moreData, remainingIsError); - moreData = await remainingReader.ReadLineAsync(linkedToken).ConfigureAwait(false); + channel.Writer.TryComplete(ex); + return; } - } - finally - { - // Cancel any in-flight reads when the consumer stops enumerating early - // (e.g., breaks out of await foreach without cancellation). - await linkedCts.CancelAsync().ConfigureAwait(false); - - // Observe the pending tasks to prevent unobserved task exceptions. - // OperationCanceledException is expected from the cancellation above. - try { await readOutput.ConfigureAwait(false); } - catch (OperationCanceledException) { } - try { await readError.ConfigureAwait(false); } - catch (OperationCanceledException) { } + if (Interlocked.Exchange(ref completedCount, 1) != 0) + { + channel.Writer.TryComplete(); + } } } From 7bb5258e9aca3816e28593908d09243fa5197a71 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 16 Apr 2026 16:30:11 +0000 Subject: [PATCH 7/7] Address MihaZupan's review: bool firstCompleted, Cancel(), WaitToReadAsync+TryRead loop Agent-Logs-Url: https://github.com/dotnet/runtime/sessions/921772ae-f4c2-44a3-9f62-2631c619eb62 Co-authored-by: MihaZupan <25307628+MihaZupan@users.noreply.github.com> --- .../src/System/Diagnostics/Process.Multiplexing.cs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs index c060b0181ff458..8ff5443f4e2e16 100644 --- a/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs +++ b/src/libraries/System.Diagnostics.Process/src/System/Diagnostics/Process.Multiplexing.cs @@ -293,7 +293,7 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa StreamReader errorReader = _standardError!; Channel channel = Channel.CreateBounded(0); - int completedCount = 0; + bool firstCompleted = false; CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); @@ -302,14 +302,17 @@ public async IAsyncEnumerable ReadAllLinesAsync([EnumeratorCa try { - await foreach (ProcessOutputLine line in channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + while (await channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { - yield return line; + while (channel.Reader.TryRead(out ProcessOutputLine line)) + { + yield return line; + } } } finally { - await linkedCts.CancelAsync().ConfigureAwait(false); + linkedCts.Cancel(); // Ensure both tasks complete before disposing the CancellationTokenSource. // The tasks handle all exceptions internally, so they always run to completion. @@ -334,7 +337,7 @@ async Task ReadToChannelAsync(StreamReader reader, bool standardError, Cancellat return; } - if (Interlocked.Exchange(ref completedCount, 1) != 0) + if (Interlocked.Exchange(ref firstCompleted, true)) { channel.Writer.TryComplete(); }